nautilus_okx/http/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides an ergonomic wrapper around the **OKX v5 REST API** –
17//! <https://www.okx.com/docs-v5/en/>.
18//!
19//! The core type exported by this module is [`OKXHttpClient`].  It offers an
20//! interface to all exchange endpoints currently required by NautilusTrader.
21//!
22//! Key responsibilities handled internally:
23//! • Request signing and header composition for private routes (HMAC-SHA256).
24//! • Rate-limiting based on the public OKX specification.
25//! • Zero-copy deserialization of large JSON payloads into domain models.
26//! • Conversion of raw exchange errors into the rich [`OKXHttpError`] enum.
27//!
28//! # Quick links to official docs
29//! | Domain                               | OKX reference                                          |
30//! |--------------------------------------|--------------------------------------------------------|
31//! | Market data                          | <https://www.okx.com/docs-v5/en/#rest-api-market-data> |
32//! | Account & positions                  | <https://www.okx.com/docs-v5/en/#rest-api-account>     |
33//! | Funding & asset balances             | <https://www.okx.com/docs-v5/en/#rest-api-funding>     |
34
35use std::{
36    collections::HashMap,
37    fmt::Debug,
38    num::NonZeroU32,
39    sync::{Arc, LazyLock, Mutex},
40};
41
42use ahash::{AHashMap, AHashSet};
43use chrono::{DateTime, Utc};
44use nautilus_core::{
45    UnixNanos, consts::NAUTILUS_USER_AGENT, env::get_or_env_var, time::get_atomic_clock_realtime,
46};
47use nautilus_model::{
48    data::{Bar, BarType, IndexPriceUpdate, MarkPriceUpdate, TradeTick},
49    enums::{AggregationSource, BarAggregation, OrderSide, OrderType, TriggerType},
50    events::AccountState,
51    identifiers::{AccountId, ClientOrderId, InstrumentId},
52    instruments::{Instrument, InstrumentAny},
53    reports::{FillReport, OrderStatusReport, PositionStatusReport},
54    types::{Price, Quantity},
55};
56use nautilus_network::{
57    http::HttpClient,
58    ratelimiter::quota::Quota,
59    retry::{RetryConfig, RetryManager},
60};
61use reqwest::{Method, StatusCode, header::USER_AGENT};
62use serde::{Deserialize, Serialize, de::DeserializeOwned};
63use tokio_util::sync::CancellationToken;
64use ustr::Ustr;
65
66use super::{
67    error::OKXHttpError,
68    models::{
69        OKXAccount, OKXCancelAlgoOrderRequest, OKXCancelAlgoOrderResponse, OKXFeeRate,
70        OKXIndexTicker, OKXMarkPrice, OKXOrderAlgo, OKXOrderHistory, OKXPlaceAlgoOrderRequest,
71        OKXPlaceAlgoOrderResponse, OKXPosition, OKXPositionHistory, OKXPositionTier, OKXServerTime,
72        OKXTransactionDetail,
73    },
74    query::{
75        GetAlgoOrdersParams, GetAlgoOrdersParamsBuilder, GetCandlesticksParams,
76        GetCandlesticksParamsBuilder, GetIndexTickerParams, GetIndexTickerParamsBuilder,
77        GetInstrumentsParams, GetInstrumentsParamsBuilder, GetMarkPriceParams,
78        GetMarkPriceParamsBuilder, GetOrderHistoryParams, GetOrderHistoryParamsBuilder,
79        GetOrderListParams, GetOrderListParamsBuilder, GetPositionTiersParams,
80        GetPositionsHistoryParams, GetPositionsParams, GetPositionsParamsBuilder,
81        GetTradeFeeParams, GetTradesParams, GetTradesParamsBuilder, GetTransactionDetailsParams,
82        GetTransactionDetailsParamsBuilder, SetPositionModeParams, SetPositionModeParamsBuilder,
83    },
84};
85use crate::{
86    common::{
87        consts::{OKX_HTTP_URL, OKX_NAUTILUS_BROKER_ID, should_retry_error_code},
88        credential::Credential,
89        enums::{
90            OKXAlgoOrderType, OKXInstrumentType, OKXOrderStatus, OKXPositionMode, OKXSide,
91            OKXTradeMode, OKXTriggerType, OKXVipLevel,
92        },
93        models::OKXInstrument,
94        parse::{
95            okx_instrument_type, parse_account_state, parse_candlestick, parse_fill_report,
96            parse_index_price_update, parse_instrument_any, parse_mark_price_update,
97            parse_order_status_report, parse_position_status_report, parse_trade_tick,
98        },
99    },
100    http::{
101        models::{OKXCandlestick, OKXTrade},
102        query::{GetOrderParams, GetPendingOrdersParams},
103    },
104    websocket::{messages::OKXAlgoOrderMsg, parse::parse_algo_order_status_report},
105};
106
107const OKX_SUCCESS_CODE: &str = "0";
108
109/// Default OKX REST API rate limit: 500 requests per 2 seconds.
110///
111/// - Sub-account order limit: 1000 requests per 2 seconds.
112/// - Account balance: 10 requests per 2 seconds.
113/// - Account instruments: 20 requests per 2 seconds.
114///
115/// We use a conservative 250 requests per second (500 per 2 seconds) as a general limit
116/// that should accommodate most use cases while respecting OKX's documented limits.
117pub static OKX_REST_QUOTA: LazyLock<Quota> =
118    LazyLock::new(|| Quota::per_second(NonZeroU32::new(250).unwrap()));
119
120const OKX_GLOBAL_RATE_KEY: &str = "okx:global";
121
122/// Represents an OKX HTTP response.
123#[derive(Debug, Serialize, Deserialize)]
124pub struct OKXResponse<T> {
125    /// The OKX response code, which is `"0"` for success.
126    pub code: String,
127    /// A message string which can be informational or describe an error cause.
128    pub msg: String,
129    /// The typed data returned by the OKX endpoint.
130    pub data: Vec<T>,
131}
132
133/// Provides a HTTP client for connecting to the [OKX](https://okx.com) REST API.
134///
135/// This client wraps the underlying [`HttpClient`] to handle functionality
136/// specific to OKX, such as request signing (for authenticated endpoints),
137/// forming request URLs, and deserializing responses into specific data models.
138pub struct OKXHttpInnerClient {
139    base_url: String,
140    client: HttpClient,
141    credential: Option<Credential>,
142    retry_manager: RetryManager<OKXHttpError>,
143    cancellation_token: CancellationToken,
144    is_demo: bool,
145}
146
147impl Default for OKXHttpInnerClient {
148    fn default() -> Self {
149        Self::new(None, Some(60), None, None, None, false)
150            .expect("Failed to create default OKXHttpInnerClient")
151    }
152}
153
154impl Debug for OKXHttpInnerClient {
155    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
156        let credential = self.credential.as_ref().map(|_| "<redacted>");
157        f.debug_struct(stringify!(OKXHttpInnerClient))
158            .field("base_url", &self.base_url)
159            .field("credential", &credential)
160            .finish_non_exhaustive()
161    }
162}
163
164impl OKXHttpInnerClient {
165    fn rate_limiter_quotas() -> Vec<(String, Quota)> {
166        vec![
167            (OKX_GLOBAL_RATE_KEY.to_string(), *OKX_REST_QUOTA),
168            (
169                "okx:/api/v5/account/balance".to_string(),
170                Quota::per_second(NonZeroU32::new(5).unwrap()),
171            ),
172            (
173                "okx:/api/v5/public/instruments".to_string(),
174                Quota::per_second(NonZeroU32::new(10).unwrap()),
175            ),
176            (
177                "okx:/api/v5/market/candles".to_string(),
178                Quota::per_second(NonZeroU32::new(50).unwrap()),
179            ),
180            (
181                "okx:/api/v5/market/history-candles".to_string(),
182                Quota::per_second(NonZeroU32::new(20).unwrap()),
183            ),
184            (
185                "okx:/api/v5/market/history-trades".to_string(),
186                Quota::per_second(NonZeroU32::new(30).unwrap()),
187            ),
188            (
189                "okx:/api/v5/trade/order".to_string(),
190                Quota::per_second(NonZeroU32::new(30).unwrap()), // 60 requests / 2 seconds (per instrument)
191            ),
192            (
193                "okx:/api/v5/trade/orders-pending".to_string(),
194                Quota::per_second(NonZeroU32::new(20).unwrap()),
195            ),
196            (
197                "okx:/api/v5/trade/orders-history".to_string(),
198                Quota::per_second(NonZeroU32::new(20).unwrap()),
199            ),
200            (
201                "okx:/api/v5/trade/fills".to_string(),
202                Quota::per_second(NonZeroU32::new(30).unwrap()),
203            ),
204            (
205                "okx:/api/v5/trade/order-algo".to_string(),
206                Quota::per_second(NonZeroU32::new(10).unwrap()),
207            ),
208            (
209                "okx:/api/v5/trade/cancel-algos".to_string(),
210                Quota::per_second(NonZeroU32::new(10).unwrap()),
211            ),
212        ]
213    }
214
215    fn rate_limit_keys(endpoint: &str) -> Vec<Ustr> {
216        let normalized = endpoint.split('?').next().unwrap_or(endpoint);
217        let route = format!("okx:{normalized}");
218
219        vec![Ustr::from(OKX_GLOBAL_RATE_KEY), Ustr::from(route.as_str())]
220    }
221
222    /// Cancel all pending HTTP requests.
223    pub fn cancel_all_requests(&self) {
224        self.cancellation_token.cancel();
225    }
226
227    /// Get the cancellation token for this client.
228    pub fn cancellation_token(&self) -> &CancellationToken {
229        &self.cancellation_token
230    }
231
232    /// Creates a new [`OKXHttpClient`] using the default OKX HTTP URL,
233    /// optionally overridden with a custom base URL.
234    ///
235    /// This version of the client has **no credentials**, so it can only
236    /// call publicly accessible endpoints.
237    ///
238    /// # Errors
239    ///
240    /// Returns an error if the retry manager cannot be created.
241    pub fn new(
242        base_url: Option<String>,
243        timeout_secs: Option<u64>,
244        max_retries: Option<u32>,
245        retry_delay_ms: Option<u64>,
246        retry_delay_max_ms: Option<u64>,
247        is_demo: bool,
248    ) -> Result<Self, OKXHttpError> {
249        let retry_config = RetryConfig {
250            max_retries: max_retries.unwrap_or(3),
251            initial_delay_ms: retry_delay_ms.unwrap_or(1000),
252            max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
253            backoff_factor: 2.0,
254            jitter_ms: 1000,
255            operation_timeout_ms: Some(60_000),
256            immediate_first: false,
257            max_elapsed_ms: Some(180_000),
258        };
259
260        let retry_manager = RetryManager::new(retry_config).map_err(|e| {
261            OKXHttpError::ValidationError(format!("Failed to create retry manager: {e}"))
262        })?;
263
264        Ok(Self {
265            base_url: base_url.unwrap_or(OKX_HTTP_URL.to_string()),
266            client: HttpClient::new(
267                Self::default_headers(is_demo),
268                vec![],
269                Self::rate_limiter_quotas(),
270                Some(*OKX_REST_QUOTA),
271                timeout_secs,
272            ),
273            credential: None,
274            retry_manager,
275            cancellation_token: CancellationToken::new(),
276            is_demo,
277        })
278    }
279
280    /// Creates a new [`OKXHttpClient`] configured with credentials
281    /// for authenticated requests, optionally using a custom base URL.
282    ///
283    /// # Errors
284    ///
285    /// Returns an error if the retry manager cannot be created.
286    #[allow(clippy::too_many_arguments)]
287    pub fn with_credentials(
288        api_key: String,
289        api_secret: String,
290        api_passphrase: String,
291        base_url: String,
292        timeout_secs: Option<u64>,
293        max_retries: Option<u32>,
294        retry_delay_ms: Option<u64>,
295        retry_delay_max_ms: Option<u64>,
296        is_demo: bool,
297    ) -> Result<Self, OKXHttpError> {
298        let retry_config = RetryConfig {
299            max_retries: max_retries.unwrap_or(3),
300            initial_delay_ms: retry_delay_ms.unwrap_or(1000),
301            max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
302            backoff_factor: 2.0,
303            jitter_ms: 1000,
304            operation_timeout_ms: Some(60_000),
305            immediate_first: false,
306            max_elapsed_ms: Some(180_000),
307        };
308
309        let retry_manager = RetryManager::new(retry_config).map_err(|e| {
310            OKXHttpError::ValidationError(format!("Failed to create retry manager: {e}"))
311        })?;
312
313        Ok(Self {
314            base_url,
315            client: HttpClient::new(
316                Self::default_headers(is_demo),
317                vec![],
318                Self::rate_limiter_quotas(),
319                Some(*OKX_REST_QUOTA),
320                timeout_secs,
321            ),
322            credential: Some(Credential::new(api_key, api_secret, api_passphrase)),
323            retry_manager,
324            cancellation_token: CancellationToken::new(),
325            is_demo,
326        })
327    }
328
329    /// Builds the default headers to include with each request (e.g., `User-Agent`).
330    fn default_headers(is_demo: bool) -> HashMap<String, String> {
331        let mut headers =
332            HashMap::from([(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())]);
333
334        if is_demo {
335            headers.insert("x-simulated-trading".to_string(), "1".to_string());
336        }
337
338        headers
339    }
340
341    /// Combine a base path with a `serde_urlencoded` query string if one exists.
342    ///
343    /// # Errors
344    ///
345    /// Returns an error if the query string serialization fails.
346    fn build_path<S: Serialize>(base: &str, params: &S) -> Result<String, OKXHttpError> {
347        let query = serde_urlencoded::to_string(params)
348            .map_err(|e| OKXHttpError::JsonError(e.to_string()))?;
349        if query.is_empty() {
350            Ok(base.to_owned())
351        } else {
352            Ok(format!("{base}?{query}"))
353        }
354    }
355
356    /// Signs an OKX request with timestamp, API key, passphrase, and signature.
357    ///
358    /// # Errors
359    ///
360    /// Returns [`OKXHttpError::MissingCredentials`] if no credentials are set
361    /// but the request requires authentication.
362    fn sign_request(
363        &self,
364        method: &Method,
365        path: &str,
366        body: Option<&[u8]>,
367    ) -> Result<HashMap<String, String>, OKXHttpError> {
368        let credential = match self.credential.as_ref() {
369            Some(c) => c,
370            None => return Err(OKXHttpError::MissingCredentials),
371        };
372
373        let api_key = credential.api_key.to_string();
374        let api_passphrase = credential.api_passphrase.to_string();
375
376        // OKX requires milliseconds in the timestamp (ISO 8601 with milliseconds)
377        let now = Utc::now();
378        let millis = now.timestamp_subsec_millis();
379        let timestamp = now.format("%Y-%m-%dT%H:%M:%S").to_string() + &format!(".{:03}Z", millis);
380        let signature = credential.sign_bytes(&timestamp, method.as_str(), path, body);
381
382        let mut headers = HashMap::new();
383        headers.insert("OK-ACCESS-KEY".to_string(), api_key.clone());
384        headers.insert("OK-ACCESS-PASSPHRASE".to_string(), api_passphrase);
385        headers.insert("OK-ACCESS-TIMESTAMP".to_string(), timestamp.clone());
386        headers.insert("OK-ACCESS-SIGN".to_string(), signature);
387
388        Ok(headers)
389    }
390
391    /// Sends an HTTP request to OKX and parses the response into `Vec<T>`.
392    ///
393    /// Internally, this method handles:
394    /// - Building the URL from `base_url` + `path`.
395    /// - Optionally signing the request.
396    /// - Deserializing JSON responses into typed models, or returning a [`OKXHttpError`].
397    /// - Retrying with exponential backoff on transient errors.
398    ///
399    /// # Errors
400    ///
401    /// Returns an error if:
402    /// - The HTTP request fails.
403    /// - Authentication is required but credentials are missing.
404    /// - The response cannot be deserialized into the expected type.
405    /// - The OKX API returns an error response.
406    async fn send_request<T: DeserializeOwned>(
407        &self,
408        method: Method,
409        path: &str,
410        body: Option<Vec<u8>>,
411        authenticate: bool,
412    ) -> Result<Vec<T>, OKXHttpError> {
413        let url = format!("{}{path}", self.base_url);
414        let endpoint = path.to_string();
415        let method_clone = method.clone();
416        let body_clone = body.clone();
417
418        let operation = || {
419            let url = url.clone();
420            let method = method_clone.clone();
421            let body = body_clone.clone();
422            let endpoint = endpoint.clone();
423
424            async move {
425                let mut headers = if authenticate {
426                    self.sign_request(&method, endpoint.as_str(), body.as_deref())?
427                } else {
428                    HashMap::new()
429                };
430
431                // Always set Content-Type header when body is present
432                if body.is_some() {
433                    headers.insert("Content-Type".to_string(), "application/json".to_string());
434                }
435
436                let rate_keys = Self::rate_limit_keys(endpoint.as_str());
437                let resp = self
438                    .client
439                    .request_with_ustr_keys(
440                        method.clone(),
441                        url,
442                        Some(headers),
443                        body,
444                        None,
445                        Some(rate_keys),
446                    )
447                    .await?;
448
449                tracing::trace!("Response: {resp:?}");
450
451                if resp.status.is_success() {
452                    let okx_response: OKXResponse<T> =
453                        serde_json::from_slice(&resp.body).map_err(|e| {
454                            tracing::error!("Failed to deserialize OKXResponse: {e}");
455                            OKXHttpError::JsonError(e.to_string())
456                        })?;
457
458                    if okx_response.code != OKX_SUCCESS_CODE {
459                        return Err(OKXHttpError::OkxError {
460                            error_code: okx_response.code,
461                            message: okx_response.msg,
462                        });
463                    }
464
465                    Ok(okx_response.data)
466                } else {
467                    let error_body = String::from_utf8_lossy(&resp.body);
468                    if resp.status.as_u16() == StatusCode::NOT_FOUND.as_u16() {
469                        tracing::debug!("HTTP 404 with body: {error_body}");
470                    } else {
471                        tracing::error!(
472                            "HTTP error {} with body: {error_body}",
473                            resp.status.as_str()
474                        );
475                    }
476
477                    if let Ok(parsed_error) = serde_json::from_slice::<OKXResponse<T>>(&resp.body) {
478                        return Err(OKXHttpError::OkxError {
479                            error_code: parsed_error.code,
480                            message: parsed_error.msg,
481                        });
482                    }
483
484                    Err(OKXHttpError::UnexpectedStatus {
485                        status: StatusCode::from_u16(resp.status.as_u16()).unwrap(),
486                        body: error_body.to_string(),
487                    })
488                }
489            }
490        };
491
492        // Retry strategy based on OKX error responses and HTTP status codes:
493        //
494        // 1. Network errors: always retry (transient connection issues)
495        // 2. HTTP 5xx/429: server errors and rate limiting should be retried
496        // 3. OKX specific retryable error codes (defined in common::consts)
497        //
498        // Note: OKX returns many permanent errors which should NOT be retried
499        // (e.g., "Invalid instrument", "Insufficient balance", "Invalid API Key")
500        let should_retry = |error: &OKXHttpError| -> bool {
501            match error {
502                OKXHttpError::HttpClientError(_) => true,
503                OKXHttpError::UnexpectedStatus { status, .. } => {
504                    status.as_u16() >= 500 || status.as_u16() == 429
505                }
506                OKXHttpError::OkxError { error_code, .. } => should_retry_error_code(error_code),
507                _ => false,
508            }
509        };
510
511        let create_error = |msg: String| -> OKXHttpError {
512            if msg == "canceled" {
513                OKXHttpError::ValidationError("Request canceled".to_string())
514            } else {
515                OKXHttpError::ValidationError(msg)
516            }
517        };
518
519        self.retry_manager
520            .execute_with_retry_with_cancel(
521                endpoint.as_str(),
522                operation,
523                should_retry,
524                create_error,
525                &self.cancellation_token,
526            )
527            .await
528    }
529
530    /// Sets the position mode for an account.
531    ///
532    /// # Errors
533    ///
534    /// Returns an error if JSON serialization of `params` fails, if the HTTP
535    /// request fails, or if the response body cannot be deserialized.
536    ///
537    /// # References
538    ///
539    /// <https://www.okx.com/docs-v5/en/#trading-account-rest-api-set-position-mode>
540    pub async fn http_set_position_mode(
541        &self,
542        params: SetPositionModeParams,
543    ) -> Result<Vec<serde_json::Value>, OKXHttpError> {
544        let path = "/api/v5/account/set-position-mode";
545        let body = serde_json::to_vec(&params)?;
546        self.send_request(Method::POST, path, Some(body), true)
547            .await
548    }
549
550    /// Requests position tiers information, maximum leverage depends on your borrowings and margin ratio.
551    ///
552    /// # Errors
553    ///
554    /// Returns an error if the HTTP request fails, authentication is rejected
555    /// or the response cannot be deserialized.
556    ///
557    /// # References
558    ///
559    /// <https://www.okx.com/docs-v5/en/#public-data-rest-api-get-position-tiers>
560    pub async fn http_get_position_tiers(
561        &self,
562        params: GetPositionTiersParams,
563    ) -> Result<Vec<OKXPositionTier>, OKXHttpError> {
564        let path = Self::build_path("/api/v5/public/position-tiers", &params)?;
565        self.send_request(Method::GET, &path, None, false).await
566    }
567
568    /// Requests a list of instruments with open contracts.
569    ///
570    /// # Errors
571    ///
572    /// Returns an error if JSON serialization of `params` fails, if the HTTP
573    /// request fails, or if the response body cannot be deserialized.
574    ///
575    /// # References
576    ///
577    /// <https://www.okx.com/docs-v5/en/#public-data-rest-api-get-instruments>
578    pub async fn http_get_instruments(
579        &self,
580        params: GetInstrumentsParams,
581    ) -> Result<Vec<OKXInstrument>, OKXHttpError> {
582        let path = Self::build_path("/api/v5/public/instruments", &params)?;
583        self.send_request(Method::GET, &path, None, false).await
584    }
585
586    /// Requests the current server time from OKX.
587    ///
588    /// Retrieves the OKX system time in Unix timestamp (milliseconds). This is useful for
589    /// synchronizing local clocks with the exchange server and logging time drift.
590    ///
591    /// # Errors
592    ///
593    /// Returns an error if the HTTP request fails or if the response body
594    /// cannot be parsed into [`OKXServerTime`].
595    ///
596    /// # References
597    ///
598    /// <https://www.okx.com/docs-v5/en/#public-data-rest-api-get-system-time>
599    pub async fn http_get_server_time(&self) -> Result<u64, OKXHttpError> {
600        let response: Vec<OKXServerTime> = self
601            .send_request(Method::GET, "/api/v5/public/time", None, false)
602            .await?;
603        response
604            .first()
605            .map(|t| t.ts)
606            .ok_or_else(|| OKXHttpError::JsonError("Empty server time response".to_string()))
607    }
608
609    /// Requests a mark price.
610    ///
611    /// We set the mark price based on the SPOT index and at a reasonable basis to prevent individual
612    /// users from manipulating the market and causing the contract price to fluctuate.
613    ///
614    /// # Errors
615    ///
616    /// Returns an error if the HTTP request fails or if the response body
617    /// cannot be parsed into [`OKXMarkPrice`].
618    ///
619    /// # References
620    ///
621    /// <https://www.okx.com/docs-v5/en/#public-data-rest-api-get-mark-price>
622    pub async fn http_get_mark_price(
623        &self,
624        params: GetMarkPriceParams,
625    ) -> Result<Vec<OKXMarkPrice>, OKXHttpError> {
626        let path = Self::build_path("/api/v5/public/mark-price", &params)?;
627        self.send_request(Method::GET, &path, None, false).await
628    }
629
630    /// Requests the latest index price.
631    ///
632    /// # Errors
633    ///
634    /// Returns an error if the operation fails.
635    ///
636    /// # References
637    ///
638    /// <https://www.okx.com/docs-v5/en/#public-data-rest-api-get-index-tickers>
639    pub async fn http_get_index_ticker(
640        &self,
641        params: GetIndexTickerParams,
642    ) -> Result<Vec<OKXIndexTicker>, OKXHttpError> {
643        let path = Self::build_path("/api/v5/market/index-tickers", &params)?;
644        self.send_request(Method::GET, &path, None, false).await
645    }
646
647    /// Requests trades history.
648    ///
649    /// # Errors
650    ///
651    /// Returns an error if the operation fails.
652    ///
653    /// # References
654    ///
655    /// <https://www.okx.com/docs-v5/en/#order-book-trading-market-data-get-trades-history>
656    pub async fn http_get_trades(
657        &self,
658        params: GetTradesParams,
659    ) -> Result<Vec<OKXTrade>, OKXHttpError> {
660        let path = Self::build_path("/api/v5/market/history-trades", &params)?;
661        self.send_request(Method::GET, &path, None, false).await
662    }
663
664    /// Requests recent candlestick data.
665    ///
666    /// # Errors
667    ///
668    /// Returns an error if the operation fails.
669    ///
670    /// # References
671    ///
672    /// <https://www.okx.com/docs-v5/en/#order-book-trading-market-data-get-candlesticks>
673    pub async fn http_get_candlesticks(
674        &self,
675        params: GetCandlesticksParams,
676    ) -> Result<Vec<OKXCandlestick>, OKXHttpError> {
677        let path = Self::build_path("/api/v5/market/candles", &params)?;
678        self.send_request(Method::GET, &path, None, false).await
679    }
680
681    /// Requests historical candlestick data.
682    ///
683    /// # Errors
684    ///
685    /// Returns an error if the operation fails.
686    ///
687    /// # References
688    ///
689    /// <https://www.okx.com/docs-v5/en/#order-book-trading-market-data-get-candlesticks-history>
690    pub async fn http_get_candlesticks_history(
691        &self,
692        params: GetCandlesticksParams,
693    ) -> Result<Vec<OKXCandlestick>, OKXHttpError> {
694        let path = Self::build_path("/api/v5/market/history-candles", &params)?;
695        self.send_request(Method::GET, &path, None, false).await
696    }
697
698    /// Lists current open orders.
699    ///
700    /// # Errors
701    ///
702    /// Returns an error if the operation fails.
703    ///
704    /// # References
705    ///
706    /// <https://www.okx.com/docs-v5/en/#order-book-trading-trade-get-orders-pending>
707    pub async fn http_get_pending_orders(
708        &self,
709        params: GetPendingOrdersParams,
710    ) -> Result<Vec<OKXOrderHistory>, OKXHttpError> {
711        let path = Self::build_path("/api/v5/trade/orders-pending", &params)?;
712        self.send_request(Method::GET, &path, None, true).await
713    }
714
715    /// Retrieves a single order’s details.
716    ///
717    /// # Errors
718    ///
719    /// Returns an error if the operation fails.
720    ///
721    /// # References
722    ///
723    /// <https://www.okx.com/docs-v5/en/#order-book-trading-trade-get-order>
724    pub async fn http_get_order(
725        &self,
726        params: GetOrderParams,
727    ) -> Result<Vec<OKXOrderHistory>, OKXHttpError> {
728        let path = Self::build_path("/api/v5/trade/order", &params)?;
729        self.send_request(Method::GET, &path, None, true).await
730    }
731
732    /// Requests a list of assets (with non-zero balance), remaining balance, and available amount
733    /// in the trading account.
734    ///
735    /// # Errors
736    ///
737    /// Returns an error if the operation fails.
738    ///
739    /// # References
740    ///
741    /// <https://www.okx.com/docs-v5/en/#trading-account-rest-api-get-balance>
742    pub async fn http_get_balance(&self) -> Result<Vec<OKXAccount>, OKXHttpError> {
743        let path = "/api/v5/account/balance";
744        self.send_request(Method::GET, path, None, true).await
745    }
746
747    /// Requests fee rates for the account.
748    ///
749    /// Returns fee rates for the specified instrument type and the user's VIP level.
750    ///
751    /// # Errors
752    ///
753    /// Returns an error if the operation fails.
754    ///
755    /// # References
756    ///
757    /// <https://www.okx.com/docs-v5/en/#trading-account-rest-api-get-fee-rates>
758    pub async fn http_get_trade_fee(
759        &self,
760        params: GetTradeFeeParams,
761    ) -> Result<Vec<OKXFeeRate>, OKXHttpError> {
762        let path = Self::build_path("/api/v5/account/trade-fee", &params)?;
763        self.send_request(Method::GET, &path, None, true).await
764    }
765
766    /// Requests historical order records.
767    ///
768    /// # Errors
769    ///
770    /// Returns an error if the operation fails.
771    ///
772    /// # References
773    ///
774    /// <https://www.okx.com/docs-v5/en/#order-book-trading-trade-get-orders-history>
775    pub async fn http_get_order_history(
776        &self,
777        params: GetOrderHistoryParams,
778    ) -> Result<Vec<OKXOrderHistory>, OKXHttpError> {
779        let path = Self::build_path("/api/v5/trade/orders-history", &params)?;
780        self.send_request(Method::GET, &path, None, true).await
781    }
782
783    /// Requests order list (pending orders).
784    ///
785    /// # Errors
786    ///
787    /// Returns an error if the operation fails.
788    ///
789    /// # References
790    ///
791    /// <https://www.okx.com/docs-v5/en/#order-book-trading-trade-get-order-list>
792    pub async fn http_get_order_list(
793        &self,
794        params: GetOrderListParams,
795    ) -> Result<Vec<OKXOrderHistory>, OKXHttpError> {
796        let path = Self::build_path("/api/v5/trade/orders-pending", &params)?;
797        self.send_request(Method::GET, &path, None, true).await
798    }
799
800    /// Requests pending algo orders.
801    ///
802    /// # Errors
803    ///
804    /// Returns an error if the operation fails.
805    pub async fn http_get_order_algo_pending(
806        &self,
807        params: GetAlgoOrdersParams,
808    ) -> Result<Vec<OKXOrderAlgo>, OKXHttpError> {
809        let path = Self::build_path("/api/v5/trade/order-algo-pending", &params)?;
810        self.send_request(Method::GET, &path, None, true).await
811    }
812
813    /// Requests historical algo orders.
814    ///
815    /// # Errors
816    ///
817    /// Returns an error if the operation fails.
818    pub async fn http_get_order_algo_history(
819        &self,
820        params: GetAlgoOrdersParams,
821    ) -> Result<Vec<OKXOrderAlgo>, OKXHttpError> {
822        let path = Self::build_path("/api/v5/trade/order-algo-history", &params)?;
823        self.send_request(Method::GET, &path, None, true).await
824    }
825
826    /// Requests information on your positions. When the account is in net mode, net positions will
827    /// be displayed, and when the account is in long/short mode, long or short positions will be
828    /// displayed. Returns in reverse chronological order using ctime.
829    ///
830    /// # Errors
831    ///
832    /// Returns an error if the operation fails.
833    ///
834    /// # References
835    ///
836    /// <https://www.okx.com/docs-v5/en/#trading-account-rest-api-get-positions>
837    pub async fn http_get_positions(
838        &self,
839        params: GetPositionsParams,
840    ) -> Result<Vec<OKXPosition>, OKXHttpError> {
841        let path = Self::build_path("/api/v5/account/positions", &params)?;
842        self.send_request(Method::GET, &path, None, true).await
843    }
844
845    /// Requests closed or historical position data.
846    ///
847    /// # Errors
848    ///
849    /// Returns an error if the operation fails.
850    ///
851    /// # References
852    ///
853    /// <https://www.okx.com/docs-v5/en/#trading-account-rest-api-get-positions-history>
854    pub async fn http_get_position_history(
855        &self,
856        params: GetPositionsHistoryParams,
857    ) -> Result<Vec<OKXPositionHistory>, OKXHttpError> {
858        let path = Self::build_path("/api/v5/account/positions-history", &params)?;
859        self.send_request(Method::GET, &path, None, true).await
860    }
861
862    /// Requests transaction details (fills) for the given parameters.
863    ///
864    /// # Errors
865    ///
866    /// Returns an error if the operation fails.
867    ///
868    /// # References
869    ///
870    /// <https://www.okx.com/docs-v5/en/#order-book-trading-trade-get-transaction-details-last-3-days>
871    pub async fn http_get_transaction_details(
872        &self,
873        params: GetTransactionDetailsParams,
874    ) -> Result<Vec<OKXTransactionDetail>, OKXHttpError> {
875        let path = Self::build_path("/api/v5/trade/fills", &params)?;
876        self.send_request(Method::GET, &path, None, true).await
877    }
878}
879
880/// Provides a higher-level HTTP client for the [OKX](https://okx.com) REST API.
881///
882/// This client wraps the underlying `OKXHttpInnerClient` to handle conversions
883/// into the Nautilus domain model.
884#[derive(Clone, Debug)]
885#[cfg_attr(
886    feature = "python",
887    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.adapters")
888)]
889pub struct OKXHttpClient {
890    pub(crate) inner: Arc<OKXHttpInnerClient>,
891    pub(crate) instruments_cache: Arc<Mutex<HashMap<Ustr, InstrumentAny>>>,
892    cache_initialized: bool,
893}
894
895impl Default for OKXHttpClient {
896    fn default() -> Self {
897        Self::new(None, Some(60), None, None, None, false)
898            .expect("Failed to create default OKXHttpClient")
899    }
900}
901
902impl OKXHttpClient {
903    /// Creates a new [`OKXHttpClient`] using the default OKX HTTP URL,
904    /// optionally overridden with a custom base url.
905    ///
906    /// This version of the client has **no credentials**, so it can only
907    /// call publicly accessible endpoints.
908    ///
909    /// # Errors
910    ///
911    /// Returns an error if the retry manager cannot be created.
912    pub fn new(
913        base_url: Option<String>,
914        timeout_secs: Option<u64>,
915        max_retries: Option<u32>,
916        retry_delay_ms: Option<u64>,
917        retry_delay_max_ms: Option<u64>,
918        is_demo: bool,
919    ) -> anyhow::Result<Self> {
920        Ok(Self {
921            inner: Arc::new(OKXHttpInnerClient::new(
922                base_url,
923                timeout_secs,
924                max_retries,
925                retry_delay_ms,
926                retry_delay_max_ms,
927                is_demo,
928            )?),
929            instruments_cache: Arc::new(Mutex::new(HashMap::new())),
930            cache_initialized: false,
931        })
932    }
933
934    /// Creates a new authenticated [`OKXHttpClient`] using environment variables and
935    /// the default OKX HTTP base url.
936    ///
937    /// # Errors
938    ///
939    /// Returns an error if the operation fails.
940    pub fn from_env() -> anyhow::Result<Self> {
941        Self::with_credentials(None, None, None, None, None, None, None, None, false)
942    }
943
944    /// Creates a new [`OKXHttpClient`] configured with credentials
945    /// for authenticated requests, optionally using a custom base url.
946    ///
947    /// # Errors
948    ///
949    /// Returns an error if the operation fails.
950    #[allow(clippy::too_many_arguments)]
951    pub fn with_credentials(
952        api_key: Option<String>,
953        api_secret: Option<String>,
954        api_passphrase: Option<String>,
955        base_url: Option<String>,
956        timeout_secs: Option<u64>,
957        max_retries: Option<u32>,
958        retry_delay_ms: Option<u64>,
959        retry_delay_max_ms: Option<u64>,
960        is_demo: bool,
961    ) -> anyhow::Result<Self> {
962        let api_key = get_or_env_var(api_key, "OKX_API_KEY")?;
963        let api_secret = get_or_env_var(api_secret, "OKX_API_SECRET")?;
964        let api_passphrase = get_or_env_var(api_passphrase, "OKX_API_PASSPHRASE")?;
965        let base_url = base_url.unwrap_or(OKX_HTTP_URL.to_string());
966
967        Ok(Self {
968            inner: Arc::new(OKXHttpInnerClient::with_credentials(
969                api_key,
970                api_secret,
971                api_passphrase,
972                base_url,
973                timeout_secs,
974                max_retries,
975                retry_delay_ms,
976                retry_delay_max_ms,
977                is_demo,
978            )?),
979            instruments_cache: Arc::new(Mutex::new(HashMap::new())),
980            cache_initialized: false,
981        })
982    }
983
984    /// Retrieves an instrument from the cache.
985    ///
986    /// # Errors
987    ///
988    /// Returns an error if the instrument is not found in the cache.
989    fn get_instrument_from_cache(&self, symbol: Ustr) -> anyhow::Result<InstrumentAny> {
990        self.instruments_cache
991            .lock()
992            .expect("`instruments_cache` lock poisoned")
993            .get(&symbol)
994            .cloned()
995            .ok_or_else(|| anyhow::anyhow!("Instrument {symbol} not in cache"))
996    }
997
998    async fn instrument_or_fetch(&self, symbol: Ustr) -> anyhow::Result<InstrumentAny> {
999        if let Ok(inst) = self.get_instrument_from_cache(symbol) {
1000            return Ok(inst);
1001        }
1002
1003        for group in [
1004            OKXInstrumentType::Spot,
1005            OKXInstrumentType::Margin,
1006            OKXInstrumentType::Futures,
1007        ] {
1008            if let Ok(instruments) = self.request_instruments(group, None).await {
1009                let mut guard = self.instruments_cache.lock().unwrap();
1010                for inst in instruments {
1011                    guard.insert(inst.raw_symbol().inner(), inst);
1012                }
1013                drop(guard);
1014
1015                if let Ok(inst) = self.get_instrument_from_cache(symbol) {
1016                    return Ok(inst);
1017                }
1018            }
1019        }
1020
1021        anyhow::bail!("Instrument {symbol} not in cache and fetch failed");
1022    }
1023
1024    /// Cancel all pending HTTP requests.
1025    pub fn cancel_all_requests(&self) {
1026        self.inner.cancel_all_requests();
1027    }
1028
1029    /// Get the cancellation token for this client.
1030    pub fn cancellation_token(&self) -> &CancellationToken {
1031        self.inner.cancellation_token()
1032    }
1033
1034    /// Returns the base url being used by the client.
1035    pub fn base_url(&self) -> &str {
1036        self.inner.base_url.as_str()
1037    }
1038
1039    /// Returns the public API key being used by the client.
1040    pub fn api_key(&self) -> Option<&str> {
1041        self.inner.credential.as_ref().map(|c| c.api_key.as_str())
1042    }
1043
1044    /// Returns whether the client is configured for demo trading.
1045    #[must_use]
1046    pub fn is_demo(&self) -> bool {
1047        self.inner.is_demo
1048    }
1049
1050    /// Requests the current server time from OKX.
1051    ///
1052    /// Returns the OKX system time as a Unix timestamp in milliseconds.
1053    ///
1054    /// # Errors
1055    ///
1056    /// Returns an error if the HTTP request fails or if the response cannot be parsed.
1057    pub async fn http_get_server_time(&self) -> Result<u64, OKXHttpError> {
1058        self.inner.http_get_server_time().await
1059    }
1060
1061    /// Checks if the client is initialized.
1062    ///
1063    /// The client is considered initialized if any instruments have been cached from the venue.
1064    #[must_use]
1065    pub const fn is_initialized(&self) -> bool {
1066        self.cache_initialized
1067    }
1068
1069    /// Generates a timestamp for initialization.
1070    fn generate_ts_init(&self) -> UnixNanos {
1071        get_atomic_clock_realtime().get_time_ns()
1072    }
1073
1074    /// Returns the cached instrument symbols.
1075    #[must_use]
1076    /// Returns a snapshot of all instrument symbols currently held in the
1077    /// internal cache.
1078    ///
1079    /// # Panics
1080    ///
1081    /// Panics if the internal mutex guarding the instrument cache is poisoned
1082    /// (which would indicate a previous panic while the lock was held).
1083    pub fn get_cached_symbols(&self) -> Vec<String> {
1084        self.instruments_cache
1085            .lock()
1086            .unwrap()
1087            .keys()
1088            .map(std::string::ToString::to_string)
1089            .collect()
1090    }
1091
1092    /// Adds the `instruments` to the clients instrument cache.
1093    ///
1094    /// Any existing instruments will be replaced.
1095    /// Inserts multiple instruments into the local cache.
1096    ///
1097    /// # Panics
1098    ///
1099    /// Panics if the instruments cache mutex is poisoned.
1100    pub fn add_instruments(&mut self, instruments: Vec<InstrumentAny>) {
1101        for inst in instruments {
1102            self.instruments_cache
1103                .lock()
1104                .unwrap()
1105                .insert(inst.raw_symbol().inner(), inst);
1106        }
1107        self.cache_initialized = true;
1108    }
1109
1110    /// Adds the `instrument` to the clients instrument cache.
1111    ///
1112    /// Any existing instrument will be replaced.
1113    /// Inserts a single instrument into the local cache.
1114    ///
1115    /// # Panics
1116    ///
1117    /// Panics if the instruments cache mutex is poisoned.
1118    pub fn add_instrument(&mut self, instrument: InstrumentAny) {
1119        self.instruments_cache
1120            .lock()
1121            .unwrap()
1122            .insert(instrument.raw_symbol().inner(), instrument);
1123        self.cache_initialized = true;
1124    }
1125
1126    /// Requests the account state for the `account_id` from OKX.
1127    ///
1128    /// # Errors
1129    ///
1130    /// Returns an error if the HTTP request fails or no account state is returned.
1131    pub async fn request_account_state(
1132        &self,
1133        account_id: AccountId,
1134    ) -> anyhow::Result<AccountState> {
1135        let resp = self
1136            .inner
1137            .http_get_balance()
1138            .await
1139            .map_err(|e| anyhow::anyhow!(e))?;
1140
1141        let ts_init = self.generate_ts_init();
1142        let raw = resp
1143            .first()
1144            .ok_or_else(|| anyhow::anyhow!("No account state returned from OKX"))?;
1145        let account_state = parse_account_state(raw, account_id, ts_init)?;
1146
1147        Ok(account_state)
1148    }
1149
1150    /// Requests the fee rates and VIP level from OKX.
1151    ///
1152    /// Returns the VIP level (0-9) from the fee rate response.
1153    /// Returns `None` if the fee rates cannot be retrieved.
1154    ///
1155    /// # Errors
1156    ///
1157    /// Returns an error if the HTTP request fails.
1158    pub async fn request_vip_level(&self) -> anyhow::Result<Option<OKXVipLevel>> {
1159        // Try different instrument types (VIP level is the same across all)
1160        // Try SPOT and MARGIN first (no inst_family required)
1161        let simple_types = [OKXInstrumentType::Spot, OKXInstrumentType::Margin];
1162
1163        for inst_type in simple_types {
1164            let params = GetTradeFeeParams {
1165                inst_type,
1166                inst_family: None,
1167                uly: None,
1168            };
1169
1170            match self.inner.http_get_trade_fee(params).await {
1171                Ok(resp) => {
1172                    if let Some(fee_rate) = resp.first() {
1173                        tracing::info!("Detected OKX VIP level: {}", fee_rate.level);
1174                        return Ok(Some(fee_rate.level));
1175                    }
1176                }
1177                Err(e) => {
1178                    tracing::debug!(
1179                        "Failed to query fee rates for {inst_type:?}: {e}, trying next type"
1180                    );
1181                    continue;
1182                }
1183            }
1184        }
1185
1186        // Try derivatives types with common instrument families
1187        let derivatives_types = [
1188            OKXInstrumentType::Swap,
1189            OKXInstrumentType::Futures,
1190            OKXInstrumentType::Option,
1191        ];
1192
1193        // Common instrument families to try
1194        let inst_families = ["BTC-USD", "ETH-USD", "BTC-USDT", "ETH-USDT"];
1195
1196        for inst_type in derivatives_types {
1197            for family in inst_families {
1198                let params = GetTradeFeeParams {
1199                    inst_type,
1200                    inst_family: Some(family.to_string()),
1201                    uly: None,
1202                };
1203
1204                match self.inner.http_get_trade_fee(params).await {
1205                    Ok(resp) => {
1206                        if let Some(fee_rate) = resp.first() {
1207                            tracing::info!("Detected OKX VIP level: {}", fee_rate.level);
1208                            return Ok(Some(fee_rate.level));
1209                        }
1210                    }
1211                    Err(e) => {
1212                        tracing::debug!(
1213                            "Failed to query fee rates for {inst_type:?} family {family}: {e}"
1214                        );
1215                        continue;
1216                    }
1217                }
1218            }
1219        }
1220
1221        tracing::warn!("Unable to query VIP level from any instrument type or family");
1222        Ok(None)
1223    }
1224
1225    /// Sets the position mode for the account.
1226    ///
1227    /// Defaults to NetMode if no position mode is provided.
1228    ///
1229    /// # Errors
1230    ///
1231    /// Returns an error if the HTTP request fails or the position mode cannot be set.
1232    ///
1233    /// # Note
1234    ///
1235    /// This endpoint only works for accounts with derivatives trading enabled.
1236    /// If the account only has spot trading, this will return an error.
1237    pub async fn set_position_mode(&self, position_mode: OKXPositionMode) -> anyhow::Result<()> {
1238        let mut params = SetPositionModeParamsBuilder::default();
1239        params.pos_mode(position_mode);
1240        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1241
1242        match self.inner.http_set_position_mode(params).await {
1243            Ok(_) => Ok(()),
1244            Err(e) => {
1245                if let OKXHttpError::OkxError {
1246                    error_code,
1247                    message,
1248                } = &e
1249                    && error_code == "50115"
1250                {
1251                    tracing::warn!(
1252                        "Account does not support position mode setting (derivatives trading not enabled): {message}"
1253                    );
1254                    return Ok(()); // Gracefully handle this case
1255                }
1256                anyhow::bail!(e)
1257            }
1258        }
1259    }
1260
1261    /// Requests all instruments for the `instrument_type` from OKX.
1262    ///
1263    /// # Errors
1264    ///
1265    /// Returns an error if the HTTP request fails or instrument parsing fails.
1266    pub async fn request_instruments(
1267        &self,
1268        instrument_type: OKXInstrumentType,
1269        instrument_family: Option<String>,
1270    ) -> anyhow::Result<Vec<InstrumentAny>> {
1271        let mut params = GetInstrumentsParamsBuilder::default();
1272        params.inst_type(instrument_type);
1273
1274        if let Some(family) = instrument_family {
1275            params.inst_family(family);
1276        }
1277
1278        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1279
1280        let resp = self
1281            .inner
1282            .http_get_instruments(params)
1283            .await
1284            .map_err(|e| anyhow::anyhow!(e))?;
1285
1286        let ts_init = self.generate_ts_init();
1287
1288        let mut instruments: Vec<InstrumentAny> = Vec::new();
1289        for inst in &resp {
1290            if let Some(instrument_any) = parse_instrument_any(inst, ts_init)? {
1291                instruments.push(instrument_any);
1292            }
1293        }
1294
1295        Ok(instruments)
1296    }
1297
1298    /// Requests the latest mark price for the `instrument_type` from OKX.
1299    ///
1300    /// # Errors
1301    ///
1302    /// Returns an error if the HTTP request fails or no mark price is returned.
1303    pub async fn request_mark_price(
1304        &self,
1305        instrument_id: InstrumentId,
1306    ) -> anyhow::Result<MarkPriceUpdate> {
1307        let mut params = GetMarkPriceParamsBuilder::default();
1308        params.inst_id(instrument_id.symbol.inner());
1309        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1310
1311        let resp = self
1312            .inner
1313            .http_get_mark_price(params)
1314            .await
1315            .map_err(|e| anyhow::anyhow!(e))?;
1316
1317        let raw = resp
1318            .first()
1319            .ok_or_else(|| anyhow::anyhow!("No mark price returned from OKX"))?;
1320        let inst = self
1321            .instrument_or_fetch(instrument_id.symbol.inner())
1322            .await?;
1323        let ts_init = self.generate_ts_init();
1324
1325        let mark_price =
1326            parse_mark_price_update(raw, instrument_id, inst.price_precision(), ts_init)
1327                .map_err(|e| anyhow::anyhow!(e))?;
1328        Ok(mark_price)
1329    }
1330
1331    /// Requests the latest index price for the `instrument_id` from OKX.
1332    ///
1333    /// # Errors
1334    ///
1335    /// Returns an error if the HTTP request fails or no index price is returned.
1336    pub async fn request_index_price(
1337        &self,
1338        instrument_id: InstrumentId,
1339    ) -> anyhow::Result<IndexPriceUpdate> {
1340        let mut params = GetIndexTickerParamsBuilder::default();
1341        params.inst_id(instrument_id.symbol.inner());
1342        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1343
1344        let resp = self
1345            .inner
1346            .http_get_index_ticker(params)
1347            .await
1348            .map_err(|e| anyhow::anyhow!(e))?;
1349
1350        let raw = resp
1351            .first()
1352            .ok_or_else(|| anyhow::anyhow!("No index price returned from OKX"))?;
1353        let inst = self
1354            .instrument_or_fetch(instrument_id.symbol.inner())
1355            .await?;
1356        let ts_init = self.generate_ts_init();
1357
1358        let index_price =
1359            parse_index_price_update(raw, instrument_id, inst.price_precision(), ts_init)
1360                .map_err(|e| anyhow::anyhow!(e))?;
1361        Ok(index_price)
1362    }
1363
1364    /// Requests trades for the `instrument_id` and `start` -> `end` time range.
1365    ///
1366    /// # Errors
1367    ///
1368    /// Returns an error if the HTTP request fails or trade parsing fails.
1369    pub async fn request_trades(
1370        &self,
1371        instrument_id: InstrumentId,
1372        start: Option<DateTime<Utc>>,
1373        end: Option<DateTime<Utc>>,
1374        limit: Option<u32>,
1375    ) -> anyhow::Result<Vec<TradeTick>> {
1376        let mut params = GetTradesParamsBuilder::default();
1377
1378        params.inst_id(instrument_id.symbol.inner());
1379        if let Some(s) = start {
1380            params.after(s.timestamp_millis().to_string());
1381        }
1382        if let Some(e) = end {
1383            params.before(e.timestamp_millis().to_string());
1384        }
1385        // OKX expects the optional `limit` parameter to be between 1 and 100 (default 100).
1386        // The request layer uses 0 to express "no explicit limit", so we omit the field in that case
1387        // and clamp any larger value to the documented maximum to avoid 51000 parameter errors.
1388        const OKX_TRADES_MAX_LIMIT: u32 = 100;
1389        if let Some(l) = limit
1390            && l > 0
1391        {
1392            params.limit(l.min(OKX_TRADES_MAX_LIMIT));
1393        }
1394
1395        let params = params.build().map_err(anyhow::Error::new)?;
1396
1397        // Fetch raw trades
1398        let raw_trades = self
1399            .inner
1400            .http_get_trades(params)
1401            .await
1402            .map_err(anyhow::Error::new)?;
1403
1404        let ts_init = self.generate_ts_init();
1405        let inst = self
1406            .instrument_or_fetch(instrument_id.symbol.inner())
1407            .await?;
1408
1409        let mut trades = Vec::with_capacity(raw_trades.len());
1410        for raw in raw_trades {
1411            match parse_trade_tick(
1412                &raw,
1413                instrument_id,
1414                inst.price_precision(),
1415                inst.size_precision(),
1416                ts_init,
1417            ) {
1418                Ok(trade) => trades.push(trade),
1419                Err(e) => tracing::error!("{e}"),
1420            }
1421        }
1422
1423        Ok(trades)
1424    }
1425
1426    /// Requests historical bars for the given bar type and time range.
1427    ///
1428    /// The aggregation source must be `EXTERNAL`. Time range validation ensures start < end.
1429    /// Returns bars sorted oldest to newest.
1430    ///
1431    /// # Errors
1432    ///
1433    /// Returns an error if the request fails.
1434    ///
1435    /// # Endpoint Selection
1436    ///
1437    /// The OKX API has different endpoints with different limits:
1438    /// - Regular endpoint (`/api/v5/market/candles`): ≤ 300 rows/call, ≤ 40 req/2s
1439    ///   - Used when: start is None OR age ≤ 100 days
1440    /// - History endpoint (`/api/v5/market/history-candles`): ≤ 100 rows/call, ≤ 20 req/2s
1441    ///   - Used when: start is Some AND age > 100 days
1442    ///
1443    /// Age is calculated as `Utc::now() - start` at the time of the first request.
1444    ///
1445    /// # Supported Aggregations
1446    ///
1447    /// Maps to OKX bar query parameter:
1448    /// - `Second` → `{n}s`
1449    /// - `Minute` → `{n}m`
1450    /// - `Hour` → `{n}H`
1451    /// - `Day` → `{n}D`
1452    /// - `Week` → `{n}W`
1453    /// - `Month` → `{n}M`
1454    ///
1455    /// # Pagination
1456    ///
1457    /// - Uses `before` parameter for backwards pagination
1458    /// - Pages backwards from end time (or now) to start time
1459    /// - Stops when: limit reached, time window covered, or API returns empty
1460    /// - Rate limit safety: ≥ 50ms between requests
1461    ///
1462    /// # Panics
1463    ///
1464    /// May panic if internal data structures are in an unexpected state.
1465    ///
1466    /// # References
1467    ///
1468    /// - <https://tr.okx.com/docs-v5/en/#order-book-trading-market-data-get-candlesticks>
1469    /// - <https://tr.okx.com/docs-v5/en/#order-book-trading-market-data-get-candlesticks-history>
1470    pub async fn request_bars(
1471        &self,
1472        bar_type: BarType,
1473        start: Option<DateTime<Utc>>,
1474        mut end: Option<DateTime<Utc>>,
1475        limit: Option<u32>,
1476    ) -> anyhow::Result<Vec<Bar>> {
1477        const HISTORY_SPLIT_DAYS: i64 = 100;
1478        const MAX_PAGES_SOFT: usize = 500;
1479
1480        let limit = if limit == Some(0) { None } else { limit };
1481
1482        anyhow::ensure!(
1483            bar_type.aggregation_source() == AggregationSource::External,
1484            "Only EXTERNAL aggregation is supported"
1485        );
1486        if let (Some(s), Some(e)) = (start, end) {
1487            anyhow::ensure!(s < e, "Invalid time range: start={s:?} end={e:?}");
1488        }
1489
1490        let now = Utc::now();
1491        if let Some(s) = start
1492            && s > now
1493        {
1494            return Ok(Vec::new());
1495        }
1496        if let Some(e) = end
1497            && e > now
1498        {
1499            end = Some(now);
1500        }
1501
1502        let spec = bar_type.spec();
1503        let step = spec.step.get();
1504        let bar_param = match spec.aggregation {
1505            BarAggregation::Second => format!("{step}s"),
1506            BarAggregation::Minute => format!("{step}m"),
1507            BarAggregation::Hour => format!("{step}H"),
1508            BarAggregation::Day => format!("{step}D"),
1509            BarAggregation::Week => format!("{step}W"),
1510            BarAggregation::Month => format!("{step}M"),
1511            a => anyhow::bail!("OKX does not support {a:?} aggregation"),
1512        };
1513
1514        let slot_ms: i64 = match spec.aggregation {
1515            BarAggregation::Second => (step as i64) * 1_000,
1516            BarAggregation::Minute => (step as i64) * 60_000,
1517            BarAggregation::Hour => (step as i64) * 3_600_000,
1518            BarAggregation::Day => (step as i64) * 86_400_000,
1519            BarAggregation::Week => (step as i64) * 7 * 86_400_000,
1520            BarAggregation::Month => (step as i64) * 30 * 86_400_000,
1521            _ => unreachable!("Unsupported aggregation should have been caught above"),
1522        };
1523        let slot_ns: i64 = slot_ms * 1_000_000;
1524
1525        #[derive(Clone, Copy, Debug, PartialEq, Eq)]
1526        enum Mode {
1527            Latest,
1528            Backward,
1529            Range,
1530        }
1531
1532        let mode = match (start, end) {
1533            (None, None) => Mode::Latest,
1534            (Some(_), None) => Mode::Backward, // Changed: when only start is provided, work backward from now
1535            (None, Some(_)) => Mode::Backward,
1536            (Some(_), Some(_)) => Mode::Range,
1537        };
1538
1539        let start_ns = start.and_then(|s| s.timestamp_nanos_opt());
1540        let end_ns = end.and_then(|e| e.timestamp_nanos_opt());
1541
1542        // Floor start and ceiling end to bar boundaries for cleaner API requests
1543        let start_ms = start.map(|s| {
1544            let ms = s.timestamp_millis();
1545            if slot_ms > 0 {
1546                (ms / slot_ms) * slot_ms // Floor to nearest bar boundary
1547            } else {
1548                ms
1549            }
1550        });
1551        let end_ms = end.map(|e| {
1552            let ms = e.timestamp_millis();
1553            if slot_ms > 0 {
1554                ((ms + slot_ms - 1) / slot_ms) * slot_ms // Ceiling to nearest bar boundary
1555            } else {
1556                ms
1557            }
1558        });
1559        let now_ms = now.timestamp_millis();
1560
1561        let symbol = bar_type.instrument_id().symbol;
1562        let inst = self.instrument_or_fetch(symbol.inner()).await?;
1563
1564        let mut out: Vec<Bar> = Vec::new();
1565        let mut pages = 0usize;
1566
1567        // IMPORTANT: OKX API behavior:
1568        // - With 'after' parameter: returns bars with timestamp > after (going forward)
1569        // - With 'before' parameter: returns bars with timestamp < before (going backward)
1570        // For Range mode, we use 'before' starting from the end time to get bars before it
1571        let mut after_ms: Option<i64> = None;
1572        let mut before_ms: Option<i64> = match mode {
1573            Mode::Backward => end_ms.map(|v| v.saturating_sub(1)),
1574            Mode::Range => {
1575                // For Range, start from the end time (or current time if no end specified)
1576                // The API will return bars with timestamp < before_ms
1577                Some(end_ms.unwrap_or(now_ms))
1578            }
1579            Mode::Latest => None,
1580        };
1581
1582        // For Range mode, we'll paginate backwards like Backward mode
1583        let mut forward_prepend_mode = matches!(mode, Mode::Range);
1584
1585        // Adjust before_ms to ensure we get data from the API
1586        // OKX API might not have bars for the very recent past
1587        // This handles both explicit end=now and the actor layer setting end=now when it's None
1588        if matches!(mode, Mode::Backward | Mode::Range)
1589            && let Some(b) = before_ms
1590        {
1591            // OKX endpoints have different data availability windows:
1592            // - Regular endpoint: has most recent data but limited depth
1593            // - History endpoint: has deep history but lags behind current time
1594            // Use a small buffer to avoid the "dead zone"
1595            let buffer_ms = slot_ms.max(60_000); // At least 1 minute or 1 bar
1596            if b >= now_ms.saturating_sub(buffer_ms) {
1597                before_ms = Some(now_ms.saturating_sub(buffer_ms));
1598            }
1599        }
1600
1601        let mut have_latest_first_page = false;
1602        let mut progressless_loops = 0u8;
1603
1604        loop {
1605            if let Some(lim) = limit
1606                && lim > 0
1607                && out.len() >= lim as usize
1608            {
1609                break;
1610            }
1611            if pages >= MAX_PAGES_SOFT {
1612                break;
1613            }
1614
1615            let pivot_ms = if let Some(a) = after_ms {
1616                a
1617            } else if let Some(b) = before_ms {
1618                b
1619            } else {
1620                now_ms
1621            };
1622            // Choose endpoint based on how old the data is:
1623            // - Use regular endpoint for recent data (< 1 hour old)
1624            // - Use history endpoint for older data (> 1 hour old)
1625            // This avoids the "gap" where history endpoint has no recent data
1626            // and regular endpoint has limited depth
1627            let age_ms = now_ms.saturating_sub(pivot_ms);
1628            let age_hours = age_ms / (60 * 60 * 1000);
1629            let using_history = age_hours > 1; // Use history if data is > 1 hour old
1630
1631            let page_ceiling = if using_history { 100 } else { 300 };
1632            let remaining = limit
1633                .filter(|&l| l > 0) // Treat limit=0 as no limit
1634                .map(|l| (l as usize).saturating_sub(out.len()))
1635                .unwrap_or(page_ceiling);
1636            let page_cap = remaining.min(page_ceiling);
1637
1638            let mut p = GetCandlesticksParamsBuilder::default();
1639            p.inst_id(symbol.as_str())
1640                .bar(&bar_param)
1641                .limit(page_cap as u32);
1642
1643            // Track whether this planned request uses BEFORE or AFTER.
1644            let mut req_used_before = false;
1645
1646            match mode {
1647                Mode::Latest => {
1648                    if have_latest_first_page && let Some(b) = before_ms {
1649                        p.before_ms(b);
1650                        req_used_before = true;
1651                    }
1652                }
1653                Mode::Backward => {
1654                    if let Some(b) = before_ms {
1655                        p.before_ms(b);
1656                        req_used_before = true;
1657                    }
1658                }
1659                Mode::Range => {
1660                    // For first request with regular endpoint, try without parameters
1661                    // to get the most recent bars, then filter
1662                    if pages == 0 && !using_history {
1663                        // Don't set any time parameters on first request
1664                        // This gets the most recent bars available
1665                    } else if forward_prepend_mode {
1666                        if let Some(b) = before_ms {
1667                            p.before_ms(b);
1668                            req_used_before = true;
1669                        }
1670                    } else if let Some(a) = after_ms {
1671                        p.after_ms(a);
1672                    }
1673                }
1674            }
1675
1676            let params = p.build().map_err(anyhow::Error::new)?;
1677
1678            let mut raw = if using_history {
1679                self.inner
1680                    .http_get_candlesticks_history(params.clone())
1681                    .await
1682                    .map_err(anyhow::Error::new)?
1683            } else {
1684                self.inner
1685                    .http_get_candlesticks(params.clone())
1686                    .await
1687                    .map_err(anyhow::Error::new)?
1688            };
1689
1690            // --- Fallbacks on empty page ---
1691            if raw.is_empty() {
1692                // LATEST: retry same cursor via history, then step back a page-interval before giving up
1693                if matches!(mode, Mode::Latest)
1694                    && have_latest_first_page
1695                    && !using_history
1696                    && let Some(b) = before_ms
1697                {
1698                    let mut p2 = GetCandlesticksParamsBuilder::default();
1699                    p2.inst_id(symbol.as_str())
1700                        .bar(&bar_param)
1701                        .limit(page_cap as u32);
1702                    p2.before_ms(b);
1703                    let params2 = p2.build().map_err(anyhow::Error::new)?;
1704                    let raw2 = self
1705                        .inner
1706                        .http_get_candlesticks_history(params2)
1707                        .await
1708                        .map_err(anyhow::Error::new)?;
1709                    if !raw2.is_empty() {
1710                        raw = raw2;
1711                    } else {
1712                        // Step back one page interval and retry loop
1713                        let jump = (page_cap as i64).saturating_mul(slot_ms.max(1));
1714                        before_ms = Some(b.saturating_sub(jump));
1715                        progressless_loops = progressless_loops.saturating_add(1);
1716                        if progressless_loops >= 3 {
1717                            break;
1718                        }
1719                        continue;
1720                    }
1721                }
1722
1723                // Range mode doesn't need special bootstrap - it uses the normal flow with before_ms set
1724
1725                // If still empty: for Range after first page, try a single backstep window using BEFORE
1726                if raw.is_empty() && matches!(mode, Mode::Range) && pages > 0 {
1727                    let backstep_ms = (page_cap as i64).saturating_mul(slot_ms.max(1));
1728                    let pivot_back = after_ms.unwrap_or(now_ms).saturating_sub(backstep_ms);
1729
1730                    let mut p2 = GetCandlesticksParamsBuilder::default();
1731                    p2.inst_id(symbol.as_str())
1732                        .bar(&bar_param)
1733                        .limit(page_cap as u32)
1734                        .before_ms(pivot_back);
1735                    let params2 = p2.build().map_err(anyhow::Error::new)?;
1736                    let raw2 = if (now_ms.saturating_sub(pivot_back)) / (24 * 60 * 60 * 1000)
1737                        > HISTORY_SPLIT_DAYS
1738                    {
1739                        self.inner.http_get_candlesticks_history(params2).await
1740                    } else {
1741                        self.inner.http_get_candlesticks(params2).await
1742                    }
1743                    .map_err(anyhow::Error::new)?;
1744                    if raw2.is_empty() {
1745                        break;
1746                    } else {
1747                        raw = raw2;
1748                        forward_prepend_mode = true;
1749                        req_used_before = true;
1750                    }
1751                }
1752
1753                // First LATEST page empty: jump back >100d to force history, then continue loop
1754                if raw.is_empty()
1755                    && matches!(mode, Mode::Latest)
1756                    && !have_latest_first_page
1757                    && !using_history
1758                {
1759                    let jump_days_ms = (HISTORY_SPLIT_DAYS + 1) * 86_400_000;
1760                    before_ms = Some(now_ms.saturating_sub(jump_days_ms));
1761                    have_latest_first_page = true;
1762                    continue;
1763                }
1764
1765                // Still empty for any other case? Just break.
1766                if raw.is_empty() {
1767                    break;
1768                }
1769            }
1770            // --- end fallbacks ---
1771
1772            pages += 1;
1773
1774            // Parse, oldest → newest
1775            let ts_init = self.generate_ts_init();
1776            let mut page: Vec<Bar> = Vec::with_capacity(raw.len());
1777            for r in &raw {
1778                page.push(parse_candlestick(
1779                    r,
1780                    bar_type,
1781                    inst.price_precision(),
1782                    inst.size_precision(),
1783                    ts_init,
1784                )?);
1785            }
1786            page.reverse();
1787
1788            let page_oldest_ms = page.first().map(|b| b.ts_event.as_i64() / 1_000_000);
1789            let page_newest_ms = page.last().map(|b| b.ts_event.as_i64() / 1_000_000);
1790
1791            // Range filter (inclusive)
1792            // For Range mode, if we have no bars yet and this is an early page,
1793            // be more tolerant with the start boundary to handle gaps in data
1794            let mut filtered: Vec<Bar> = if matches!(mode, Mode::Range)
1795                && out.is_empty()
1796                && pages < 2
1797            {
1798                // On first pages of Range mode with no data yet, include the most recent bar
1799                // even if it's slightly before our start time (within 2 bar periods)
1800                // BUT we want ALL bars in the page that are within our range
1801                let tolerance_ns = slot_ns * 2; // Allow up to 2 bar periods before start
1802
1803                // Debug: log the page range
1804                if !page.is_empty() {
1805                    tracing::debug!(
1806                        "Range mode bootstrap page: {} bars from {} to {}, filtering with start={:?} end={:?}",
1807                        page.len(),
1808                        page.first().unwrap().ts_event.as_i64() / 1_000_000,
1809                        page.last().unwrap().ts_event.as_i64() / 1_000_000,
1810                        start_ms,
1811                        end_ms
1812                    );
1813                }
1814
1815                let result: Vec<Bar> = page
1816                    .clone()
1817                    .into_iter()
1818                    .filter(|b| {
1819                        let ts = b.ts_event.as_i64();
1820                        // Accept bars from (start - tolerance) to end
1821                        let ok_after =
1822                            start_ns.is_none_or(|sns| ts >= sns.saturating_sub(tolerance_ns));
1823                        let ok_before = end_ns.is_none_or(|ens| ts <= ens);
1824                        ok_after && ok_before
1825                    })
1826                    .collect();
1827
1828                result
1829            } else {
1830                // Normal filtering
1831                page.clone()
1832                    .into_iter()
1833                    .filter(|b| {
1834                        let ts = b.ts_event.as_i64();
1835                        let ok_after = start_ns.is_none_or(|sns| ts >= sns);
1836                        let ok_before = end_ns.is_none_or(|ens| ts <= ens);
1837                        ok_after && ok_before
1838                    })
1839                    .collect()
1840            };
1841
1842            if !page.is_empty() && filtered.is_empty() {
1843                // For Range mode, if all bars are before our start time, there's no point continuing
1844                if matches!(mode, Mode::Range)
1845                    && !forward_prepend_mode
1846                    && let (Some(newest_ms), Some(start_ms)) = (page_newest_ms, start_ms)
1847                    && newest_ms < start_ms.saturating_sub(slot_ms * 2)
1848                {
1849                    // Bars are too old (more than 2 bar periods before start), stop
1850                    break;
1851                }
1852            }
1853
1854            // Track contribution for progress guard
1855            let contribution;
1856
1857            if out.is_empty() {
1858                contribution = filtered.len();
1859                out = filtered;
1860            } else {
1861                match mode {
1862                    Mode::Backward | Mode::Latest => {
1863                        if let Some(first) = out.first() {
1864                            filtered.retain(|b| b.ts_event < first.ts_event);
1865                        }
1866                        contribution = filtered.len();
1867                        if contribution != 0 {
1868                            let mut new_out = Vec::with_capacity(out.len() + filtered.len());
1869                            new_out.extend_from_slice(&filtered);
1870                            new_out.extend_from_slice(&out);
1871                            out = new_out;
1872                        }
1873                    }
1874                    Mode::Range => {
1875                        if forward_prepend_mode || req_used_before {
1876                            // We are backfilling older pages: prepend them.
1877                            if let Some(first) = out.first() {
1878                                filtered.retain(|b| b.ts_event < first.ts_event);
1879                            }
1880                            contribution = filtered.len();
1881                            if contribution != 0 {
1882                                let mut new_out = Vec::with_capacity(out.len() + filtered.len());
1883                                new_out.extend_from_slice(&filtered);
1884                                new_out.extend_from_slice(&out);
1885                                out = new_out;
1886                            }
1887                        } else {
1888                            // Normal forward: append newer pages.
1889                            if let Some(last) = out.last() {
1890                                filtered.retain(|b| b.ts_event > last.ts_event);
1891                            }
1892                            contribution = filtered.len();
1893                            out.extend(filtered);
1894                        }
1895                    }
1896                }
1897            }
1898
1899            // Duplicate-window mitigation for Latest/Backward
1900            if contribution == 0
1901                && matches!(mode, Mode::Latest | Mode::Backward)
1902                && let Some(b) = before_ms
1903            {
1904                let jump = (page_cap as i64).saturating_mul(slot_ms.max(1));
1905                let new_b = b.saturating_sub(jump);
1906                if new_b != b {
1907                    before_ms = Some(new_b);
1908                }
1909            }
1910
1911            if contribution == 0 {
1912                progressless_loops = progressless_loops.saturating_add(1);
1913                if progressless_loops >= 3 {
1914                    break;
1915                }
1916            } else {
1917                progressless_loops = 0;
1918
1919                // Advance cursors only when we made progress
1920                match mode {
1921                    Mode::Latest | Mode::Backward => {
1922                        if let Some(oldest) = page_oldest_ms {
1923                            before_ms = Some(oldest.saturating_sub(1));
1924                            have_latest_first_page = true;
1925                        } else {
1926                            break;
1927                        }
1928                    }
1929                    Mode::Range => {
1930                        if forward_prepend_mode || req_used_before {
1931                            if let Some(oldest) = page_oldest_ms {
1932                                // Move back by at least one bar period to avoid getting the same data
1933                                let jump_back = slot_ms.max(60_000); // At least 1 minute
1934                                before_ms = Some(oldest.saturating_sub(jump_back));
1935                                after_ms = None;
1936                            } else {
1937                                break;
1938                            }
1939                        } else if let Some(newest) = page_newest_ms {
1940                            after_ms = Some(newest.saturating_add(1));
1941                            before_ms = None;
1942                        } else {
1943                            break;
1944                        }
1945                    }
1946                }
1947            }
1948
1949            // Stop conditions
1950            if let Some(lim) = limit
1951                && lim > 0
1952                && out.len() >= lim as usize
1953            {
1954                break;
1955            }
1956            if let Some(ens) = end_ns
1957                && let Some(last) = out.last()
1958                && last.ts_event.as_i64() >= ens
1959            {
1960                break;
1961            }
1962            if let Some(sns) = start_ns
1963                && let Some(first) = out.first()
1964                && (matches!(mode, Mode::Backward) || forward_prepend_mode)
1965                && first.ts_event.as_i64() <= sns
1966            {
1967                // For Range mode, check if we have all bars up to the end time
1968                if matches!(mode, Mode::Range) {
1969                    // Don't stop if we haven't reached the end time yet
1970                    if let Some(ens) = end_ns
1971                        && let Some(last) = out.last()
1972                    {
1973                        let last_ts = last.ts_event.as_i64();
1974                        if last_ts < ens {
1975                            // We have bars before start but haven't reached end, need to continue forward
1976                            // Switch from backward to forward pagination
1977                            forward_prepend_mode = false;
1978                            after_ms = Some((last_ts / 1_000_000).saturating_add(1));
1979                            before_ms = None;
1980                            continue;
1981                        }
1982                    }
1983                }
1984                break;
1985            }
1986
1987            tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
1988        }
1989
1990        // Final rescue for FORWARD/RANGE when nothing gathered
1991        if out.is_empty() && matches!(mode, Mode::Range) {
1992            let pivot = end_ms.unwrap_or(now_ms.saturating_sub(1));
1993            let hist = (now_ms.saturating_sub(pivot)) / (24 * 60 * 60 * 1000) > HISTORY_SPLIT_DAYS;
1994            let mut p = GetCandlesticksParamsBuilder::default();
1995            p.inst_id(symbol.as_str())
1996                .bar(&bar_param)
1997                .limit(300)
1998                .before_ms(pivot);
1999            let params = p.build().map_err(anyhow::Error::new)?;
2000            let raw = if hist {
2001                self.inner.http_get_candlesticks_history(params).await
2002            } else {
2003                self.inner.http_get_candlesticks(params).await
2004            }
2005            .map_err(anyhow::Error::new)?;
2006            if !raw.is_empty() {
2007                let ts_init = self.generate_ts_init();
2008                let mut page: Vec<Bar> = Vec::with_capacity(raw.len());
2009                for r in &raw {
2010                    page.push(parse_candlestick(
2011                        r,
2012                        bar_type,
2013                        inst.price_precision(),
2014                        inst.size_precision(),
2015                        ts_init,
2016                    )?);
2017                }
2018                page.reverse();
2019                out = page
2020                    .into_iter()
2021                    .filter(|b| {
2022                        let ts = b.ts_event.as_i64();
2023                        let ok_after = start_ns.is_none_or(|sns| ts >= sns);
2024                        let ok_before = end_ns.is_none_or(|ens| ts <= ens);
2025                        ok_after && ok_before
2026                    })
2027                    .collect();
2028            }
2029        }
2030
2031        // Trim against end bound if needed (keep ≤ end)
2032        if let Some(ens) = end_ns {
2033            while out.last().is_some_and(|b| b.ts_event.as_i64() > ens) {
2034                out.pop();
2035            }
2036        }
2037
2038        // Clamp first bar for Range when using forward pagination
2039        if matches!(mode, Mode::Range)
2040            && !forward_prepend_mode
2041            && let Some(sns) = start_ns
2042        {
2043            let lower = sns.saturating_sub(slot_ns);
2044            while out.first().is_some_and(|b| b.ts_event.as_i64() < lower) {
2045                out.remove(0);
2046            }
2047        }
2048
2049        if let Some(lim) = limit
2050            && lim > 0
2051            && out.len() > lim as usize
2052        {
2053            out.truncate(lim as usize);
2054        }
2055
2056        Ok(out)
2057    }
2058
2059    /// Requests historical order status reports for the given parameters.
2060    ///
2061    /// # Errors
2062    ///
2063    /// Returns an error if the request fails.
2064    ///
2065    /// # References
2066    ///
2067    /// - <https://www.okx.com/docs-v5/en/#order-book-trading-trade-get-order-history-last-7-days>.
2068    /// - <https://www.okx.com/docs-v5/en/#order-book-trading-trade-get-order-history-last-3-months>.
2069    #[allow(clippy::too_many_arguments)]
2070    pub async fn request_order_status_reports(
2071        &self,
2072        account_id: AccountId,
2073        instrument_type: Option<OKXInstrumentType>,
2074        instrument_id: Option<InstrumentId>,
2075        start: Option<DateTime<Utc>>,
2076        end: Option<DateTime<Utc>>,
2077        open_only: bool,
2078        limit: Option<u32>,
2079    ) -> anyhow::Result<Vec<OrderStatusReport>> {
2080        let mut history_params = GetOrderHistoryParamsBuilder::default();
2081
2082        let instrument_type = if let Some(instrument_type) = instrument_type {
2083            instrument_type
2084        } else {
2085            let instrument_id = instrument_id.ok_or_else(|| {
2086                anyhow::anyhow!("Instrument ID required if `instrument_type` not provided")
2087            })?;
2088            let instrument = self
2089                .instrument_or_fetch(instrument_id.symbol.inner())
2090                .await?;
2091            okx_instrument_type(&instrument)?
2092        };
2093
2094        history_params.inst_type(instrument_type);
2095
2096        if let Some(instrument_id) = instrument_id.as_ref() {
2097            history_params.inst_id(instrument_id.symbol.inner().to_string());
2098        }
2099
2100        if let Some(limit) = limit {
2101            history_params.limit(limit);
2102        }
2103
2104        let history_params = history_params.build().map_err(|e| anyhow::anyhow!(e))?;
2105
2106        let mut pending_params = GetOrderListParamsBuilder::default();
2107        pending_params.inst_type(instrument_type);
2108
2109        if let Some(instrument_id) = instrument_id.as_ref() {
2110            pending_params.inst_id(instrument_id.symbol.inner().to_string());
2111        }
2112
2113        if let Some(limit) = limit {
2114            pending_params.limit(limit);
2115        }
2116
2117        let pending_params = pending_params.build().map_err(|e| anyhow::anyhow!(e))?;
2118
2119        let combined_resp = if open_only {
2120            // Only request pending/open orders
2121            self.inner
2122                .http_get_order_list(pending_params)
2123                .await
2124                .map_err(|e| anyhow::anyhow!(e))?
2125        } else {
2126            // Make both requests concurrently
2127            let (history_resp, pending_resp) = tokio::try_join!(
2128                self.inner.http_get_order_history(history_params),
2129                self.inner.http_get_order_list(pending_params)
2130            )
2131            .map_err(|e| anyhow::anyhow!(e))?;
2132
2133            // Combine both responses
2134            let mut combined_resp = history_resp;
2135            combined_resp.extend(pending_resp);
2136            combined_resp
2137        };
2138
2139        // Prepare time range filter
2140        let start_ns = start.map(UnixNanos::from);
2141        let end_ns = end.map(UnixNanos::from);
2142
2143        let ts_init = self.generate_ts_init();
2144        let mut reports = Vec::with_capacity(combined_resp.len());
2145
2146        // Use a seen filter in case pending orders are within the histories "2hr reserve window"
2147        let mut seen: AHashSet<String> = AHashSet::new();
2148
2149        for order in combined_resp {
2150            let seen_key = if !order.cl_ord_id.is_empty() {
2151                order.cl_ord_id.as_str().to_string()
2152            } else if let Some(algo_cl_ord_id) = order
2153                .algo_cl_ord_id
2154                .as_ref()
2155                .filter(|value| !value.as_str().is_empty())
2156            {
2157                algo_cl_ord_id.as_str().to_string()
2158            } else if let Some(algo_id) = order
2159                .algo_id
2160                .as_ref()
2161                .filter(|value| !value.as_str().is_empty())
2162            {
2163                algo_id.as_str().to_string()
2164            } else {
2165                order.ord_id.as_str().to_string()
2166            };
2167
2168            if !seen.insert(seen_key) {
2169                continue; // Reserved pending already reported
2170            }
2171
2172            let inst = self.instrument_or_fetch(order.inst_id).await?;
2173
2174            let report = parse_order_status_report(
2175                &order,
2176                account_id,
2177                inst.id(),
2178                inst.price_precision(),
2179                inst.size_precision(),
2180                ts_init,
2181            );
2182
2183            if let Some(start_ns) = start_ns
2184                && report.ts_last < start_ns
2185            {
2186                continue;
2187            }
2188            if let Some(end_ns) = end_ns
2189                && report.ts_last > end_ns
2190            {
2191                continue;
2192            }
2193
2194            reports.push(report);
2195        }
2196
2197        Ok(reports)
2198    }
2199
2200    /// Requests fill reports (transaction details) for the given parameters.
2201    ///
2202    /// # Errors
2203    ///
2204    /// Returns an error if the request fails.
2205    ///
2206    /// # References
2207    ///
2208    /// <https://www.okx.com/docs-v5/en/#order-book-trading-trade-get-transaction-details-last-3-days>.
2209    pub async fn request_fill_reports(
2210        &self,
2211        account_id: AccountId,
2212        instrument_type: Option<OKXInstrumentType>,
2213        instrument_id: Option<InstrumentId>,
2214        start: Option<DateTime<Utc>>,
2215        end: Option<DateTime<Utc>>,
2216        limit: Option<u32>,
2217    ) -> anyhow::Result<Vec<FillReport>> {
2218        let mut params = GetTransactionDetailsParamsBuilder::default();
2219
2220        let instrument_type = if let Some(instrument_type) = instrument_type {
2221            instrument_type
2222        } else {
2223            let instrument_id = instrument_id.ok_or_else(|| {
2224                anyhow::anyhow!("Instrument ID required if `instrument_type` not provided")
2225            })?;
2226            let instrument = self
2227                .instrument_or_fetch(instrument_id.symbol.inner())
2228                .await?;
2229            okx_instrument_type(&instrument)?
2230        };
2231
2232        params.inst_type(instrument_type);
2233
2234        if let Some(instrument_id) = instrument_id {
2235            let instrument = self
2236                .instrument_or_fetch(instrument_id.symbol.inner())
2237                .await?;
2238            let instrument_type = okx_instrument_type(&instrument)?;
2239            params.inst_type(instrument_type);
2240            params.inst_id(instrument_id.symbol.inner().to_string());
2241        }
2242
2243        if let Some(limit) = limit {
2244            params.limit(limit);
2245        }
2246
2247        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2248
2249        let resp = self
2250            .inner
2251            .http_get_transaction_details(params)
2252            .await
2253            .map_err(|e| anyhow::anyhow!(e))?;
2254
2255        // Prepare time range filter
2256        let start_ns = start.map(UnixNanos::from);
2257        let end_ns = end.map(UnixNanos::from);
2258
2259        let ts_init = self.generate_ts_init();
2260        let mut reports = Vec::with_capacity(resp.len());
2261
2262        for detail in resp {
2263            let inst = self.instrument_or_fetch(detail.inst_id).await?;
2264
2265            let report = parse_fill_report(
2266                detail,
2267                account_id,
2268                inst.id(),
2269                inst.price_precision(),
2270                inst.size_precision(),
2271                ts_init,
2272            )?;
2273
2274            if let Some(start_ns) = start_ns
2275                && report.ts_event < start_ns
2276            {
2277                continue;
2278            }
2279
2280            if let Some(end_ns) = end_ns
2281                && report.ts_event > end_ns
2282            {
2283                continue;
2284            }
2285
2286            reports.push(report);
2287        }
2288
2289        Ok(reports)
2290    }
2291
2292    /// Requests current position status reports for the given parameters.
2293    ///
2294    /// # Errors
2295    ///
2296    /// Returns an error if the request fails.
2297    ///
2298    /// # References
2299    ///
2300    /// <https://www.okx.com/docs-v5/en/#trading-account-rest-api-get-positions>.
2301    pub async fn request_position_status_reports(
2302        &self,
2303        account_id: AccountId,
2304        instrument_type: Option<OKXInstrumentType>,
2305        instrument_id: Option<InstrumentId>,
2306    ) -> anyhow::Result<Vec<PositionStatusReport>> {
2307        let mut params = GetPositionsParamsBuilder::default();
2308
2309        let instrument_type = if let Some(instrument_type) = instrument_type {
2310            instrument_type
2311        } else {
2312            let instrument_id = instrument_id.ok_or_else(|| {
2313                anyhow::anyhow!("Instrument ID required if `instrument_type` not provided")
2314            })?;
2315            let instrument = self
2316                .instrument_or_fetch(instrument_id.symbol.inner())
2317                .await?;
2318            okx_instrument_type(&instrument)?
2319        };
2320
2321        params.inst_type(instrument_type);
2322
2323        instrument_id
2324            .as_ref()
2325            .map(|i| params.inst_id(i.symbol.inner()));
2326
2327        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2328
2329        let resp = self
2330            .inner
2331            .http_get_positions(params)
2332            .await
2333            .map_err(|e| anyhow::anyhow!(e))?;
2334
2335        let ts_init = self.generate_ts_init();
2336        let mut reports = Vec::with_capacity(resp.len());
2337
2338        for position in resp {
2339            let inst = self.instrument_or_fetch(position.inst_id).await?;
2340
2341            let report = parse_position_status_report(
2342                position,
2343                account_id,
2344                inst.id(),
2345                inst.size_precision(),
2346                ts_init,
2347            )?;
2348            reports.push(report);
2349        }
2350
2351        Ok(reports)
2352    }
2353
2354    /// Places an algo order via HTTP.
2355    ///
2356    /// # Errors
2357    ///
2358    /// Returns an error if the request fails.
2359    ///
2360    /// # References
2361    ///
2362    /// <https://www.okx.com/docs-v5/en/#order-book-trading-algo-trading-post-place-algo-order>
2363    pub async fn place_algo_order(
2364        &self,
2365        request: OKXPlaceAlgoOrderRequest,
2366    ) -> Result<OKXPlaceAlgoOrderResponse, OKXHttpError> {
2367        let body =
2368            serde_json::to_vec(&request).map_err(|e| OKXHttpError::JsonError(e.to_string()))?;
2369
2370        let resp: Vec<OKXPlaceAlgoOrderResponse> = self
2371            .inner
2372            .send_request(Method::POST, "/api/v5/trade/order-algo", Some(body), true)
2373            .await?;
2374
2375        resp.into_iter()
2376            .next()
2377            .ok_or_else(|| OKXHttpError::ValidationError("Empty response".to_string()))
2378    }
2379
2380    /// Cancels an algo order via HTTP.
2381    ///
2382    /// # Errors
2383    ///
2384    /// Returns an error if the request fails.
2385    ///
2386    /// # References
2387    ///
2388    /// <https://www.okx.com/docs-v5/en/#order-book-trading-algo-trading-post-cancel-algo-order>
2389    pub async fn cancel_algo_order(
2390        &self,
2391        request: OKXCancelAlgoOrderRequest,
2392    ) -> Result<OKXCancelAlgoOrderResponse, OKXHttpError> {
2393        // OKX expects an array for cancel-algos endpoint
2394        // Serialize once to bytes to keep signing and sending identical
2395        let body =
2396            serde_json::to_vec(&[request]).map_err(|e| OKXHttpError::JsonError(e.to_string()))?;
2397
2398        let resp: Vec<OKXCancelAlgoOrderResponse> = self
2399            .inner
2400            .send_request(Method::POST, "/api/v5/trade/cancel-algos", Some(body), true)
2401            .await?;
2402
2403        resp.into_iter()
2404            .next()
2405            .ok_or_else(|| OKXHttpError::ValidationError("Empty response".to_string()))
2406    }
2407
2408    /// Places an algo order using domain types.
2409    ///
2410    /// This is a convenience method that accepts Nautilus domain types
2411    /// and builds the appropriate OKX request structure internally.
2412    ///
2413    /// # Errors
2414    ///
2415    /// Returns an error if the request fails.
2416    #[allow(clippy::too_many_arguments)]
2417    pub async fn place_algo_order_with_domain_types(
2418        &self,
2419        instrument_id: InstrumentId,
2420        td_mode: OKXTradeMode,
2421        client_order_id: ClientOrderId,
2422        order_side: OrderSide,
2423        order_type: OrderType,
2424        quantity: Quantity,
2425        trigger_price: Price,
2426        trigger_type: Option<TriggerType>,
2427        limit_price: Option<Price>,
2428        reduce_only: Option<bool>,
2429    ) -> Result<OKXPlaceAlgoOrderResponse, OKXHttpError> {
2430        if !matches!(order_side, OrderSide::Buy | OrderSide::Sell) {
2431            return Err(OKXHttpError::ValidationError(
2432                "Invalid order side".to_string(),
2433            ));
2434        }
2435        let okx_side: OKXSide = order_side.into();
2436
2437        // Map trigger type to OKX format
2438        let trigger_px_type_enum = trigger_type.map(Into::into).unwrap_or(OKXTriggerType::Last);
2439
2440        // Determine order price based on order type
2441        let order_px = if matches!(order_type, OrderType::StopLimit | OrderType::LimitIfTouched) {
2442            limit_price.map(|p| p.to_string())
2443        } else {
2444            // Market orders use -1 to indicate market execution
2445            Some("-1".to_string())
2446        };
2447
2448        let request = OKXPlaceAlgoOrderRequest {
2449            inst_id: instrument_id.symbol.as_str().to_string(),
2450            td_mode,
2451            side: okx_side,
2452            ord_type: OKXAlgoOrderType::Trigger, // All conditional orders use 'trigger' type
2453            sz: quantity.to_string(),
2454            algo_cl_ord_id: Some(client_order_id.as_str().to_string()),
2455            trigger_px: Some(trigger_price.to_string()),
2456            order_px,
2457            trigger_px_type: Some(trigger_px_type_enum),
2458            tgt_ccy: None,  // Let OKX determine based on instrument
2459            pos_side: None, // Use default position side
2460            close_position: None,
2461            tag: Some(OKX_NAUTILUS_BROKER_ID.to_string()),
2462            reduce_only,
2463        };
2464
2465        self.place_algo_order(request).await
2466    }
2467
2468    /// Cancels an algo order using domain types.
2469    ///
2470    /// This is a convenience method that accepts Nautilus domain types
2471    /// and builds the appropriate OKX request structure internally.
2472    ///
2473    /// # Errors
2474    ///
2475    /// Returns an error if the request fails.
2476    pub async fn cancel_algo_order_with_domain_types(
2477        &self,
2478        instrument_id: InstrumentId,
2479        algo_id: String,
2480    ) -> Result<OKXCancelAlgoOrderResponse, OKXHttpError> {
2481        let request = OKXCancelAlgoOrderRequest {
2482            inst_id: instrument_id.symbol.to_string(),
2483            algo_id: Some(algo_id),
2484            algo_cl_ord_id: None,
2485        };
2486
2487        self.cancel_algo_order(request).await
2488    }
2489
2490    /// Requests algo order status reports.
2491    ///
2492    /// # Errors
2493    ///
2494    /// Returns an error if the request fails.
2495    #[allow(clippy::too_many_arguments)]
2496    pub async fn request_algo_order_status_reports(
2497        &self,
2498        account_id: AccountId,
2499        instrument_type: Option<OKXInstrumentType>,
2500        instrument_id: Option<InstrumentId>,
2501        algo_id: Option<String>,
2502        algo_client_order_id: Option<ClientOrderId>,
2503        state: Option<OKXOrderStatus>,
2504        limit: Option<u32>,
2505    ) -> anyhow::Result<Vec<OrderStatusReport>> {
2506        let mut instruments_cache: AHashMap<Ustr, InstrumentAny> = AHashMap::new();
2507
2508        let inst_type = if let Some(inst_type) = instrument_type {
2509            inst_type
2510        } else if let Some(inst_id) = instrument_id {
2511            let instrument = self.instrument_or_fetch(inst_id.symbol.inner()).await?;
2512            let inst_type = okx_instrument_type(&instrument)?;
2513            instruments_cache.insert(inst_id.symbol.inner(), instrument);
2514            inst_type
2515        } else {
2516            anyhow::bail!("instrument_type or instrument_id required for algo order query")
2517        };
2518
2519        let mut params_builder = GetAlgoOrdersParamsBuilder::default();
2520        params_builder.inst_type(inst_type);
2521        if let Some(inst_id) = instrument_id {
2522            params_builder.inst_id(inst_id.symbol.inner().to_string());
2523        }
2524        if let Some(algo_id) = algo_id.as_ref() {
2525            params_builder.algo_id(algo_id.clone());
2526        }
2527        if let Some(client_order_id) = algo_client_order_id.as_ref() {
2528            params_builder.algo_cl_ord_id(client_order_id.as_str().to_string());
2529        }
2530        if let Some(state) = state {
2531            params_builder.state(state);
2532        }
2533        if let Some(limit) = limit {
2534            params_builder.limit(limit);
2535        }
2536
2537        let params = params_builder
2538            .build()
2539            .map_err(|e| anyhow::anyhow!(format!("Failed to build algo order params: {e}")))?;
2540
2541        let ts_init = self.generate_ts_init();
2542        let mut reports = Vec::new();
2543        let mut seen: AHashSet<(String, String)> = AHashSet::new();
2544
2545        let pending = match self.inner.http_get_order_algo_pending(params.clone()).await {
2546            Ok(result) => result,
2547            Err(OKXHttpError::UnexpectedStatus { status, .. })
2548                if status == StatusCode::NOT_FOUND =>
2549            {
2550                Vec::new()
2551            }
2552            Err(error) => return Err(error.into()),
2553        };
2554        self.collect_algo_reports(
2555            account_id,
2556            &pending,
2557            &mut instruments_cache,
2558            ts_init,
2559            &mut seen,
2560            &mut reports,
2561        )
2562        .await?;
2563
2564        let history = match self.inner.http_get_order_algo_history(params).await {
2565            Ok(result) => result,
2566            Err(OKXHttpError::UnexpectedStatus { status, .. })
2567                if status == StatusCode::NOT_FOUND =>
2568            {
2569                Vec::new()
2570            }
2571            Err(error) => return Err(error.into()),
2572        };
2573        self.collect_algo_reports(
2574            account_id,
2575            &history,
2576            &mut instruments_cache,
2577            ts_init,
2578            &mut seen,
2579            &mut reports,
2580        )
2581        .await?;
2582
2583        Ok(reports)
2584    }
2585
2586    /// Requests an algo order status report by client order identifier.
2587    ///
2588    /// # Errors
2589    ///
2590    /// Returns an error if the request fails.
2591    pub async fn request_algo_order_status_report(
2592        &self,
2593        account_id: AccountId,
2594        instrument_id: InstrumentId,
2595        algo_client_order_id: ClientOrderId,
2596    ) -> anyhow::Result<Option<OrderStatusReport>> {
2597        let reports = self
2598            .request_algo_order_status_reports(
2599                account_id,
2600                None,
2601                Some(instrument_id),
2602                None,
2603                Some(algo_client_order_id),
2604                None,
2605                Some(50_u32),
2606            )
2607            .await?;
2608
2609        Ok(reports.into_iter().next())
2610    }
2611    async fn collect_algo_reports(
2612        &self,
2613        account_id: AccountId,
2614        orders: &[OKXOrderAlgo],
2615        instruments_cache: &mut AHashMap<Ustr, InstrumentAny>,
2616        ts_init: UnixNanos,
2617        seen: &mut AHashSet<(String, String)>,
2618        reports: &mut Vec<OrderStatusReport>,
2619    ) -> anyhow::Result<()> {
2620        for order in orders {
2621            let key = (order.algo_id.clone(), order.algo_cl_ord_id.clone());
2622            if !seen.insert(key) {
2623                continue;
2624            }
2625
2626            let instrument = if let Some(instrument) = instruments_cache.get(&order.inst_id) {
2627                instrument.clone()
2628            } else {
2629                let instrument = self.instrument_or_fetch(order.inst_id).await?;
2630                instruments_cache.insert(order.inst_id, instrument.clone());
2631                instrument
2632            };
2633
2634            let report = parse_http_algo_order(order, account_id, &instrument, ts_init)?;
2635            reports.push(report);
2636        }
2637
2638        Ok(())
2639    }
2640}
2641
2642fn parse_http_algo_order(
2643    order: &OKXOrderAlgo,
2644    account_id: AccountId,
2645    instrument: &InstrumentAny,
2646    ts_init: UnixNanos,
2647) -> anyhow::Result<OrderStatusReport> {
2648    let ord_px = if order.ord_px.is_empty() {
2649        "-1".to_string()
2650    } else {
2651        order.ord_px.clone()
2652    };
2653
2654    let reduce_only = if order.reduce_only.is_empty() {
2655        "false".to_string()
2656    } else {
2657        order.reduce_only.clone()
2658    };
2659
2660    let msg = OKXAlgoOrderMsg {
2661        algo_id: order.algo_id.clone(),
2662        algo_cl_ord_id: order.algo_cl_ord_id.clone(),
2663        cl_ord_id: order.cl_ord_id.clone(),
2664        ord_id: order.ord_id.clone(),
2665        inst_id: order.inst_id,
2666        inst_type: order.inst_type,
2667        ord_type: order.ord_type,
2668        state: order.state,
2669        side: order.side,
2670        pos_side: order.pos_side,
2671        sz: order.sz.clone(),
2672        trigger_px: order.trigger_px.clone(),
2673        trigger_px_type: order.trigger_px_type.unwrap_or(OKXTriggerType::None),
2674        ord_px,
2675        td_mode: order.td_mode,
2676        lever: order.lever.clone(),
2677        reduce_only,
2678        actual_px: order.actual_px.clone(),
2679        actual_sz: order.actual_sz.clone(),
2680        notional_usd: order.notional_usd.clone(),
2681        c_time: order.c_time,
2682        u_time: order.u_time,
2683        trigger_time: order.trigger_time.clone(),
2684        tag: order.tag.clone(),
2685    };
2686
2687    parse_algo_order_status_report(&msg, instrument, account_id, ts_init)
2688}