nautilus_okx/http/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides an ergonomic wrapper around the **OKX v5 REST API** –
17//! <https://www.okx.com/docs-v5/en/>.
18//!
19//! The core type exported by this module is [`OKXHttpClient`].  It offers an
20//! interface to all exchange endpoints currently required by NautilusTrader.
21//!
22//! Key responsibilities handled internally:
23//! • Request signing and header composition for private routes (HMAC-SHA256).
24//! • Rate-limiting based on the public OKX specification.
25//! • Zero-copy deserialization of large JSON payloads into domain models.
26//! • Conversion of raw exchange errors into the rich [`OKXHttpError`] enum.
27//!
28//! # Quick links to official docs
29//! | Domain                               | OKX reference                                          |
30//! |--------------------------------------|--------------------------------------------------------|
31//! | Market data                          | <https://www.okx.com/docs-v5/en/#rest-api-market-data> |
32//! | Account & positions                  | <https://www.okx.com/docs-v5/en/#rest-api-account>     |
33//! | Funding & asset balances             | <https://www.okx.com/docs-v5/en/#rest-api-funding>     |
34
35use std::{
36    collections::HashMap,
37    fmt::Debug,
38    num::NonZeroU32,
39    str::FromStr,
40    sync::{Arc, LazyLock, Mutex},
41};
42
43use ahash::{AHashMap, AHashSet};
44use chrono::{DateTime, Utc};
45use nautilus_core::{
46    MUTEX_POISONED, UnixNanos, consts::NAUTILUS_USER_AGENT, env::get_or_env_var,
47    time::get_atomic_clock_realtime,
48};
49use nautilus_model::{
50    data::{Bar, BarType, IndexPriceUpdate, MarkPriceUpdate, TradeTick},
51    enums::{AggregationSource, BarAggregation, OrderSide, OrderType, TriggerType},
52    events::AccountState,
53    identifiers::{AccountId, ClientOrderId, InstrumentId},
54    instruments::{Instrument, InstrumentAny},
55    reports::{FillReport, OrderStatusReport, PositionStatusReport},
56    types::{Price, Quantity},
57};
58use nautilus_network::{
59    http::HttpClient,
60    ratelimiter::quota::Quota,
61    retry::{RetryConfig, RetryManager},
62};
63use reqwest::{Method, StatusCode, header::USER_AGENT};
64use rust_decimal::Decimal;
65use serde::{Deserialize, Serialize, de::DeserializeOwned};
66use tokio_util::sync::CancellationToken;
67use ustr::Ustr;
68
69use super::{
70    error::OKXHttpError,
71    models::{
72        OKXAccount, OKXCancelAlgoOrderRequest, OKXCancelAlgoOrderResponse, OKXFeeRate,
73        OKXIndexTicker, OKXMarkPrice, OKXOrderAlgo, OKXOrderHistory, OKXPlaceAlgoOrderRequest,
74        OKXPlaceAlgoOrderResponse, OKXPosition, OKXPositionHistory, OKXPositionTier, OKXServerTime,
75        OKXTransactionDetail,
76    },
77    query::{
78        GetAlgoOrdersParams, GetAlgoOrdersParamsBuilder, GetCandlesticksParams,
79        GetCandlesticksParamsBuilder, GetIndexTickerParams, GetIndexTickerParamsBuilder,
80        GetInstrumentsParams, GetInstrumentsParamsBuilder, GetMarkPriceParams,
81        GetMarkPriceParamsBuilder, GetOrderHistoryParams, GetOrderHistoryParamsBuilder,
82        GetOrderListParams, GetOrderListParamsBuilder, GetPositionTiersParams,
83        GetPositionsHistoryParams, GetPositionsParams, GetPositionsParamsBuilder,
84        GetTradeFeeParams, GetTradesParams, GetTradesParamsBuilder, GetTransactionDetailsParams,
85        GetTransactionDetailsParamsBuilder, SetPositionModeParams, SetPositionModeParamsBuilder,
86    },
87};
88use crate::{
89    common::{
90        consts::{OKX_HTTP_URL, OKX_NAUTILUS_BROKER_ID, should_retry_error_code},
91        credential::Credential,
92        enums::{
93            OKXAlgoOrderType, OKXInstrumentType, OKXOrderStatus, OKXPositionMode, OKXSide,
94            OKXTradeMode, OKXTriggerType,
95        },
96        models::OKXInstrument,
97        parse::{
98            okx_instrument_type, parse_account_state, parse_candlestick, parse_fill_report,
99            parse_index_price_update, parse_instrument_any, parse_mark_price_update,
100            parse_order_status_report, parse_position_status_report, parse_trade_tick,
101        },
102    },
103    http::{
104        models::{OKXCandlestick, OKXTrade},
105        query::{GetOrderParams, GetPendingOrdersParams},
106    },
107    websocket::{messages::OKXAlgoOrderMsg, parse::parse_algo_order_status_report},
108};
109
110const OKX_SUCCESS_CODE: &str = "0";
111
112/// Default OKX REST API rate limit: 500 requests per 2 seconds.
113///
114/// - Sub-account order limit: 1000 requests per 2 seconds.
115/// - Account balance: 10 requests per 2 seconds.
116/// - Account instruments: 20 requests per 2 seconds.
117///
118/// We use a conservative 250 requests per second (500 per 2 seconds) as a general limit
119/// that should accommodate most use cases while respecting OKX's documented limits.
120pub static OKX_REST_QUOTA: LazyLock<Quota> =
121    LazyLock::new(|| Quota::per_second(NonZeroU32::new(250).unwrap()));
122
123const OKX_GLOBAL_RATE_KEY: &str = "okx:global";
124
125/// Represents an OKX HTTP response.
126#[derive(Debug, Serialize, Deserialize)]
127pub struct OKXResponse<T> {
128    /// The OKX response code, which is `"0"` for success.
129    pub code: String,
130    /// A message string which can be informational or describe an error cause.
131    pub msg: String,
132    /// The typed data returned by the OKX endpoint.
133    pub data: Vec<T>,
134}
135
136/// Provides a HTTP client for connecting to the [OKX](https://okx.com) REST API.
137///
138/// This client wraps the underlying [`HttpClient`] to handle functionality
139/// specific to OKX, such as request signing (for authenticated endpoints),
140/// forming request URLs, and deserializing responses into specific data models.
141pub struct OKXHttpInnerClient {
142    base_url: String,
143    client: HttpClient,
144    credential: Option<Credential>,
145    retry_manager: RetryManager<OKXHttpError>,
146    cancellation_token: CancellationToken,
147    is_demo: bool,
148}
149
150impl Default for OKXHttpInnerClient {
151    fn default() -> Self {
152        Self::new(None, Some(60), None, None, None, false)
153            .expect("Failed to create default OKXHttpInnerClient")
154    }
155}
156
157impl Debug for OKXHttpInnerClient {
158    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
159        let credential = self.credential.as_ref().map(|_| "<redacted>");
160        f.debug_struct(stringify!(OKXHttpInnerClient))
161            .field("base_url", &self.base_url)
162            .field("credential", &credential)
163            .finish_non_exhaustive()
164    }
165}
166
167impl OKXHttpInnerClient {
168    fn rate_limiter_quotas() -> Vec<(String, Quota)> {
169        vec![
170            (OKX_GLOBAL_RATE_KEY.to_string(), *OKX_REST_QUOTA),
171            (
172                "okx:/api/v5/account/balance".to_string(),
173                Quota::per_second(NonZeroU32::new(5).unwrap()),
174            ),
175            (
176                "okx:/api/v5/public/instruments".to_string(),
177                Quota::per_second(NonZeroU32::new(10).unwrap()),
178            ),
179            (
180                "okx:/api/v5/market/candles".to_string(),
181                Quota::per_second(NonZeroU32::new(50).unwrap()),
182            ),
183            (
184                "okx:/api/v5/market/history-candles".to_string(),
185                Quota::per_second(NonZeroU32::new(20).unwrap()),
186            ),
187            (
188                "okx:/api/v5/market/history-trades".to_string(),
189                Quota::per_second(NonZeroU32::new(30).unwrap()),
190            ),
191            (
192                "okx:/api/v5/trade/order".to_string(),
193                Quota::per_second(NonZeroU32::new(30).unwrap()), // 60 requests / 2 seconds (per instrument)
194            ),
195            (
196                "okx:/api/v5/trade/orders-pending".to_string(),
197                Quota::per_second(NonZeroU32::new(20).unwrap()),
198            ),
199            (
200                "okx:/api/v5/trade/orders-history".to_string(),
201                Quota::per_second(NonZeroU32::new(20).unwrap()),
202            ),
203            (
204                "okx:/api/v5/trade/fills".to_string(),
205                Quota::per_second(NonZeroU32::new(30).unwrap()),
206            ),
207            (
208                "okx:/api/v5/trade/order-algo".to_string(),
209                Quota::per_second(NonZeroU32::new(10).unwrap()),
210            ),
211            (
212                "okx:/api/v5/trade/cancel-algos".to_string(),
213                Quota::per_second(NonZeroU32::new(10).unwrap()),
214            ),
215        ]
216    }
217
218    fn rate_limit_keys(endpoint: &str) -> Vec<Ustr> {
219        let normalized = endpoint.split('?').next().unwrap_or(endpoint);
220        let route = format!("okx:{normalized}");
221
222        vec![Ustr::from(OKX_GLOBAL_RATE_KEY), Ustr::from(route.as_str())]
223    }
224
225    /// Cancel all pending HTTP requests.
226    pub fn cancel_all_requests(&self) {
227        self.cancellation_token.cancel();
228    }
229
230    /// Get the cancellation token for this client.
231    pub fn cancellation_token(&self) -> &CancellationToken {
232        &self.cancellation_token
233    }
234
235    /// Creates a new [`OKXHttpClient`] using the default OKX HTTP URL,
236    /// optionally overridden with a custom base URL.
237    ///
238    /// This version of the client has **no credentials**, so it can only
239    /// call publicly accessible endpoints.
240    ///
241    /// # Errors
242    ///
243    /// Returns an error if the retry manager cannot be created.
244    pub fn new(
245        base_url: Option<String>,
246        timeout_secs: Option<u64>,
247        max_retries: Option<u32>,
248        retry_delay_ms: Option<u64>,
249        retry_delay_max_ms: Option<u64>,
250        is_demo: bool,
251    ) -> Result<Self, OKXHttpError> {
252        let retry_config = RetryConfig {
253            max_retries: max_retries.unwrap_or(3),
254            initial_delay_ms: retry_delay_ms.unwrap_or(1000),
255            max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
256            backoff_factor: 2.0,
257            jitter_ms: 1000,
258            operation_timeout_ms: Some(60_000),
259            immediate_first: false,
260            max_elapsed_ms: Some(180_000),
261        };
262
263        let retry_manager = RetryManager::new(retry_config).map_err(|e| {
264            OKXHttpError::ValidationError(format!("Failed to create retry manager: {e}"))
265        })?;
266
267        Ok(Self {
268            base_url: base_url.unwrap_or(OKX_HTTP_URL.to_string()),
269            client: HttpClient::new(
270                Self::default_headers(is_demo),
271                vec![],
272                Self::rate_limiter_quotas(),
273                Some(*OKX_REST_QUOTA),
274                timeout_secs,
275            ),
276            credential: None,
277            retry_manager,
278            cancellation_token: CancellationToken::new(),
279            is_demo,
280        })
281    }
282
283    /// Creates a new [`OKXHttpClient`] configured with credentials
284    /// for authenticated requests, optionally using a custom base URL.
285    ///
286    /// # Errors
287    ///
288    /// Returns an error if the retry manager cannot be created.
289    #[allow(clippy::too_many_arguments)]
290    pub fn with_credentials(
291        api_key: String,
292        api_secret: String,
293        api_passphrase: String,
294        base_url: String,
295        timeout_secs: Option<u64>,
296        max_retries: Option<u32>,
297        retry_delay_ms: Option<u64>,
298        retry_delay_max_ms: Option<u64>,
299        is_demo: bool,
300    ) -> Result<Self, OKXHttpError> {
301        let retry_config = RetryConfig {
302            max_retries: max_retries.unwrap_or(3),
303            initial_delay_ms: retry_delay_ms.unwrap_or(1000),
304            max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
305            backoff_factor: 2.0,
306            jitter_ms: 1000,
307            operation_timeout_ms: Some(60_000),
308            immediate_first: false,
309            max_elapsed_ms: Some(180_000),
310        };
311
312        let retry_manager = RetryManager::new(retry_config).map_err(|e| {
313            OKXHttpError::ValidationError(format!("Failed to create retry manager: {e}"))
314        })?;
315
316        Ok(Self {
317            base_url,
318            client: HttpClient::new(
319                Self::default_headers(is_demo),
320                vec![],
321                Self::rate_limiter_quotas(),
322                Some(*OKX_REST_QUOTA),
323                timeout_secs,
324            ),
325            credential: Some(Credential::new(api_key, api_secret, api_passphrase)),
326            retry_manager,
327            cancellation_token: CancellationToken::new(),
328            is_demo,
329        })
330    }
331
332    /// Builds the default headers to include with each request (e.g., `User-Agent`).
333    fn default_headers(is_demo: bool) -> HashMap<String, String> {
334        let mut headers =
335            HashMap::from([(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())]);
336
337        if is_demo {
338            headers.insert("x-simulated-trading".to_string(), "1".to_string());
339        }
340
341        headers
342    }
343
344    /// Combine a base path with a `serde_urlencoded` query string if one exists.
345    ///
346    /// # Errors
347    ///
348    /// Returns an error if the query string serialization fails.
349    fn build_path<S: Serialize>(base: &str, params: &S) -> Result<String, OKXHttpError> {
350        let query = serde_urlencoded::to_string(params)
351            .map_err(|e| OKXHttpError::JsonError(e.to_string()))?;
352        if query.is_empty() {
353            Ok(base.to_owned())
354        } else {
355            Ok(format!("{base}?{query}"))
356        }
357    }
358
359    /// Signs an OKX request with timestamp, API key, passphrase, and signature.
360    ///
361    /// # Errors
362    ///
363    /// Returns [`OKXHttpError::MissingCredentials`] if no credentials are set
364    /// but the request requires authentication.
365    fn sign_request(
366        &self,
367        method: &Method,
368        path: &str,
369        body: Option<&[u8]>,
370    ) -> Result<HashMap<String, String>, OKXHttpError> {
371        let credential = match self.credential.as_ref() {
372            Some(c) => c,
373            None => return Err(OKXHttpError::MissingCredentials),
374        };
375
376        let api_key = credential.api_key.to_string();
377        let api_passphrase = credential.api_passphrase.clone();
378
379        // OKX requires milliseconds in the timestamp (ISO 8601 with milliseconds)
380        let now = Utc::now();
381        let millis = now.timestamp_subsec_millis();
382        let timestamp = now.format("%Y-%m-%dT%H:%M:%S").to_string() + &format!(".{:03}Z", millis);
383        let signature = credential.sign_bytes(&timestamp, method.as_str(), path, body);
384
385        let mut headers = HashMap::new();
386        headers.insert("OK-ACCESS-KEY".to_string(), api_key);
387        headers.insert("OK-ACCESS-PASSPHRASE".to_string(), api_passphrase);
388        headers.insert("OK-ACCESS-TIMESTAMP".to_string(), timestamp);
389        headers.insert("OK-ACCESS-SIGN".to_string(), signature);
390
391        Ok(headers)
392    }
393
394    /// Sends an HTTP request to OKX and parses the response into `Vec<T>`.
395    ///
396    /// Internally, this method handles:
397    /// - Building the URL from `base_url` + `path`.
398    /// - Optionally signing the request.
399    /// - Deserializing JSON responses into typed models, or returning a [`OKXHttpError`].
400    /// - Retrying with exponential backoff on transient errors.
401    ///
402    /// # Errors
403    ///
404    /// Returns an error if:
405    /// - The HTTP request fails.
406    /// - Authentication is required but credentials are missing.
407    /// - The response cannot be deserialized into the expected type.
408    /// - The OKX API returns an error response.
409    async fn send_request<T: DeserializeOwned>(
410        &self,
411        method: Method,
412        path: &str,
413        body: Option<Vec<u8>>,
414        authenticate: bool,
415    ) -> Result<Vec<T>, OKXHttpError> {
416        let url = format!("{}{path}", self.base_url);
417        let endpoint = path.to_string();
418        let method_clone = method.clone();
419        let body_clone = body.clone();
420
421        let operation = || {
422            let url = url.clone();
423            let method = method_clone.clone();
424            let body = body_clone.clone();
425            let endpoint = endpoint.clone();
426
427            async move {
428                let mut headers = if authenticate {
429                    self.sign_request(&method, endpoint.as_str(), body.as_deref())?
430                } else {
431                    HashMap::new()
432                };
433
434                // Always set Content-Type header when body is present
435                if body.is_some() {
436                    headers.insert("Content-Type".to_string(), "application/json".to_string());
437                }
438
439                let rate_keys = Self::rate_limit_keys(endpoint.as_str());
440                let resp = self
441                    .client
442                    .request_with_ustr_keys(
443                        method.clone(),
444                        url,
445                        Some(headers),
446                        body,
447                        None,
448                        Some(rate_keys),
449                    )
450                    .await?;
451
452                tracing::trace!("Response: {resp:?}");
453
454                if resp.status.is_success() {
455                    let okx_response: OKXResponse<T> =
456                        serde_json::from_slice(&resp.body).map_err(|e| {
457                            tracing::error!("Failed to deserialize OKXResponse: {e}");
458                            OKXHttpError::JsonError(e.to_string())
459                        })?;
460
461                    if okx_response.code != OKX_SUCCESS_CODE {
462                        return Err(OKXHttpError::OkxError {
463                            error_code: okx_response.code,
464                            message: okx_response.msg,
465                        });
466                    }
467
468                    Ok(okx_response.data)
469                } else {
470                    let error_body = String::from_utf8_lossy(&resp.body);
471                    if resp.status.as_u16() == StatusCode::NOT_FOUND.as_u16() {
472                        tracing::debug!("HTTP 404 with body: {error_body}");
473                    } else {
474                        tracing::error!(
475                            "HTTP error {} with body: {error_body}",
476                            resp.status.as_str()
477                        );
478                    }
479
480                    if let Ok(parsed_error) = serde_json::from_slice::<OKXResponse<T>>(&resp.body) {
481                        return Err(OKXHttpError::OkxError {
482                            error_code: parsed_error.code,
483                            message: parsed_error.msg,
484                        });
485                    }
486
487                    Err(OKXHttpError::UnexpectedStatus {
488                        status: StatusCode::from_u16(resp.status.as_u16()).unwrap(),
489                        body: error_body.to_string(),
490                    })
491                }
492            }
493        };
494
495        // Retry strategy based on OKX error responses and HTTP status codes:
496        //
497        // 1. Network errors: always retry (transient connection issues)
498        // 2. HTTP 5xx/429: server errors and rate limiting should be retried
499        // 3. OKX specific retryable error codes (defined in common::consts)
500        //
501        // Note: OKX returns many permanent errors which should NOT be retried
502        // (e.g., "Invalid instrument", "Insufficient balance", "Invalid API Key")
503        let should_retry = |error: &OKXHttpError| -> bool {
504            match error {
505                OKXHttpError::HttpClientError(_) => true,
506                OKXHttpError::UnexpectedStatus { status, .. } => {
507                    status.as_u16() >= 500 || status.as_u16() == 429
508                }
509                OKXHttpError::OkxError { error_code, .. } => should_retry_error_code(error_code),
510                _ => false,
511            }
512        };
513
514        let create_error = |msg: String| -> OKXHttpError {
515            if msg == "canceled" {
516                OKXHttpError::Canceled("Adapter disconnecting or shutting down".to_string())
517            } else {
518                OKXHttpError::ValidationError(msg)
519            }
520        };
521
522        self.retry_manager
523            .execute_with_retry_with_cancel(
524                endpoint.as_str(),
525                operation,
526                should_retry,
527                create_error,
528                &self.cancellation_token,
529            )
530            .await
531    }
532
533    /// Sets the position mode for an account.
534    ///
535    /// # Errors
536    ///
537    /// Returns an error if JSON serialization of `params` fails, if the HTTP
538    /// request fails, or if the response body cannot be deserialized.
539    ///
540    /// # References
541    ///
542    /// <https://www.okx.com/docs-v5/en/#trading-account-rest-api-set-position-mode>
543    pub async fn http_set_position_mode(
544        &self,
545        params: SetPositionModeParams,
546    ) -> Result<Vec<serde_json::Value>, OKXHttpError> {
547        let path = "/api/v5/account/set-position-mode";
548        let body = serde_json::to_vec(&params)?;
549        self.send_request(Method::POST, path, Some(body), true)
550            .await
551    }
552
553    /// Requests position tiers information, maximum leverage depends on your borrowings and margin ratio.
554    ///
555    /// # Errors
556    ///
557    /// Returns an error if the HTTP request fails, authentication is rejected
558    /// or the response cannot be deserialized.
559    ///
560    /// # References
561    ///
562    /// <https://www.okx.com/docs-v5/en/#public-data-rest-api-get-position-tiers>
563    pub async fn http_get_position_tiers(
564        &self,
565        params: GetPositionTiersParams,
566    ) -> Result<Vec<OKXPositionTier>, OKXHttpError> {
567        let path = Self::build_path("/api/v5/public/position-tiers", &params)?;
568        self.send_request(Method::GET, &path, None, false).await
569    }
570
571    /// Requests a list of instruments with open contracts.
572    ///
573    /// # Errors
574    ///
575    /// Returns an error if JSON serialization of `params` fails, if the HTTP
576    /// request fails, or if the response body cannot be deserialized.
577    ///
578    /// # References
579    ///
580    /// <https://www.okx.com/docs-v5/en/#public-data-rest-api-get-instruments>
581    pub async fn http_get_instruments(
582        &self,
583        params: GetInstrumentsParams,
584    ) -> Result<Vec<OKXInstrument>, OKXHttpError> {
585        let path = Self::build_path("/api/v5/public/instruments", &params)?;
586        self.send_request(Method::GET, &path, None, false).await
587    }
588
589    /// Requests the current server time from OKX.
590    ///
591    /// Retrieves the OKX system time in Unix timestamp (milliseconds). This is useful for
592    /// synchronizing local clocks with the exchange server and logging time drift.
593    ///
594    /// # Errors
595    ///
596    /// Returns an error if the HTTP request fails or if the response body
597    /// cannot be parsed into [`OKXServerTime`].
598    ///
599    /// # References
600    ///
601    /// <https://www.okx.com/docs-v5/en/#public-data-rest-api-get-system-time>
602    pub async fn http_get_server_time(&self) -> Result<u64, OKXHttpError> {
603        let response: Vec<OKXServerTime> = self
604            .send_request(Method::GET, "/api/v5/public/time", None, false)
605            .await?;
606        response
607            .first()
608            .map(|t| t.ts)
609            .ok_or_else(|| OKXHttpError::JsonError("Empty server time response".to_string()))
610    }
611
612    /// Requests a mark price.
613    ///
614    /// We set the mark price based on the SPOT index and at a reasonable basis to prevent individual
615    /// users from manipulating the market and causing the contract price to fluctuate.
616    ///
617    /// # Errors
618    ///
619    /// Returns an error if the HTTP request fails or if the response body
620    /// cannot be parsed into [`OKXMarkPrice`].
621    ///
622    /// # References
623    ///
624    /// <https://www.okx.com/docs-v5/en/#public-data-rest-api-get-mark-price>
625    pub async fn http_get_mark_price(
626        &self,
627        params: GetMarkPriceParams,
628    ) -> Result<Vec<OKXMarkPrice>, OKXHttpError> {
629        let path = Self::build_path("/api/v5/public/mark-price", &params)?;
630        self.send_request(Method::GET, &path, None, false).await
631    }
632
633    /// Requests the latest index price.
634    ///
635    /// # Errors
636    ///
637    /// Returns an error if the operation fails.
638    ///
639    /// # References
640    ///
641    /// <https://www.okx.com/docs-v5/en/#public-data-rest-api-get-index-tickers>
642    pub async fn http_get_index_ticker(
643        &self,
644        params: GetIndexTickerParams,
645    ) -> Result<Vec<OKXIndexTicker>, OKXHttpError> {
646        let path = Self::build_path("/api/v5/market/index-tickers", &params)?;
647        self.send_request(Method::GET, &path, None, false).await
648    }
649
650    /// Requests trades history.
651    ///
652    /// # Errors
653    ///
654    /// Returns an error if the operation fails.
655    ///
656    /// # References
657    ///
658    /// <https://www.okx.com/docs-v5/en/#order-book-trading-market-data-get-trades-history>
659    pub async fn http_get_trades(
660        &self,
661        params: GetTradesParams,
662    ) -> Result<Vec<OKXTrade>, OKXHttpError> {
663        let path = Self::build_path("/api/v5/market/history-trades", &params)?;
664        self.send_request(Method::GET, &path, None, false).await
665    }
666
667    /// Requests recent candlestick data.
668    ///
669    /// # Errors
670    ///
671    /// Returns an error if the operation fails.
672    ///
673    /// # References
674    ///
675    /// <https://www.okx.com/docs-v5/en/#order-book-trading-market-data-get-candlesticks>
676    pub async fn http_get_candlesticks(
677        &self,
678        params: GetCandlesticksParams,
679    ) -> Result<Vec<OKXCandlestick>, OKXHttpError> {
680        let path = Self::build_path("/api/v5/market/candles", &params)?;
681        self.send_request(Method::GET, &path, None, false).await
682    }
683
684    /// Requests historical candlestick data.
685    ///
686    /// # Errors
687    ///
688    /// Returns an error if the operation fails.
689    ///
690    /// # References
691    ///
692    /// <https://www.okx.com/docs-v5/en/#order-book-trading-market-data-get-candlesticks-history>
693    pub async fn http_get_candlesticks_history(
694        &self,
695        params: GetCandlesticksParams,
696    ) -> Result<Vec<OKXCandlestick>, OKXHttpError> {
697        let path = Self::build_path("/api/v5/market/history-candles", &params)?;
698        self.send_request(Method::GET, &path, None, false).await
699    }
700
701    /// Lists current open orders.
702    ///
703    /// # Errors
704    ///
705    /// Returns an error if the operation fails.
706    ///
707    /// # References
708    ///
709    /// <https://www.okx.com/docs-v5/en/#order-book-trading-trade-get-orders-pending>
710    pub async fn http_get_pending_orders(
711        &self,
712        params: GetPendingOrdersParams,
713    ) -> Result<Vec<OKXOrderHistory>, OKXHttpError> {
714        let path = Self::build_path("/api/v5/trade/orders-pending", &params)?;
715        self.send_request(Method::GET, &path, None, true).await
716    }
717
718    /// Retrieves a single order’s details.
719    ///
720    /// # Errors
721    ///
722    /// Returns an error if the operation fails.
723    ///
724    /// # References
725    ///
726    /// <https://www.okx.com/docs-v5/en/#order-book-trading-trade-get-order>
727    pub async fn http_get_order(
728        &self,
729        params: GetOrderParams,
730    ) -> Result<Vec<OKXOrderHistory>, OKXHttpError> {
731        let path = Self::build_path("/api/v5/trade/order", &params)?;
732        self.send_request(Method::GET, &path, None, true).await
733    }
734
735    /// Requests a list of assets (with non-zero balance), remaining balance, and available amount
736    /// in the trading account.
737    ///
738    /// # Errors
739    ///
740    /// Returns an error if the operation fails.
741    ///
742    /// # References
743    ///
744    /// <https://www.okx.com/docs-v5/en/#trading-account-rest-api-get-balance>
745    pub async fn http_get_balance(&self) -> Result<Vec<OKXAccount>, OKXHttpError> {
746        let path = "/api/v5/account/balance";
747        self.send_request(Method::GET, path, None, true).await
748    }
749
750    /// Requests fee rates for the account.
751    ///
752    /// Returns fee rates for the specified instrument type and the user's VIP level.
753    ///
754    /// # Errors
755    ///
756    /// Returns an error if the operation fails.
757    ///
758    /// # References
759    ///
760    /// <https://www.okx.com/docs-v5/en/#trading-account-rest-api-get-fee-rates>
761    pub async fn http_get_trade_fee(
762        &self,
763        params: GetTradeFeeParams,
764    ) -> Result<Vec<OKXFeeRate>, OKXHttpError> {
765        let path = Self::build_path("/api/v5/account/trade-fee", &params)?;
766        self.send_request(Method::GET, &path, None, true).await
767    }
768
769    /// Requests historical order records.
770    ///
771    /// # Errors
772    ///
773    /// Returns an error if the operation fails.
774    ///
775    /// # References
776    ///
777    /// <https://www.okx.com/docs-v5/en/#order-book-trading-trade-get-orders-history>
778    pub async fn http_get_order_history(
779        &self,
780        params: GetOrderHistoryParams,
781    ) -> Result<Vec<OKXOrderHistory>, OKXHttpError> {
782        let path = Self::build_path("/api/v5/trade/orders-history", &params)?;
783        self.send_request(Method::GET, &path, None, true).await
784    }
785
786    /// Requests order list (pending orders).
787    ///
788    /// # Errors
789    ///
790    /// Returns an error if the operation fails.
791    ///
792    /// # References
793    ///
794    /// <https://www.okx.com/docs-v5/en/#order-book-trading-trade-get-order-list>
795    pub async fn http_get_order_list(
796        &self,
797        params: GetOrderListParams,
798    ) -> Result<Vec<OKXOrderHistory>, OKXHttpError> {
799        let path = Self::build_path("/api/v5/trade/orders-pending", &params)?;
800        self.send_request(Method::GET, &path, None, true).await
801    }
802
803    /// Requests pending algo orders.
804    ///
805    /// # Errors
806    ///
807    /// Returns an error if the operation fails.
808    pub async fn http_get_order_algo_pending(
809        &self,
810        params: GetAlgoOrdersParams,
811    ) -> Result<Vec<OKXOrderAlgo>, OKXHttpError> {
812        let path = Self::build_path("/api/v5/trade/order-algo-pending", &params)?;
813        self.send_request(Method::GET, &path, None, true).await
814    }
815
816    /// Requests historical algo orders.
817    ///
818    /// # Errors
819    ///
820    /// Returns an error if the operation fails.
821    pub async fn http_get_order_algo_history(
822        &self,
823        params: GetAlgoOrdersParams,
824    ) -> Result<Vec<OKXOrderAlgo>, OKXHttpError> {
825        let path = Self::build_path("/api/v5/trade/order-algo-history", &params)?;
826        self.send_request(Method::GET, &path, None, true).await
827    }
828
829    /// Requests information on your positions. When the account is in net mode, net positions will
830    /// be displayed, and when the account is in long/short mode, long or short positions will be
831    /// displayed. Returns in reverse chronological order using ctime.
832    ///
833    /// # Errors
834    ///
835    /// Returns an error if the operation fails.
836    ///
837    /// # References
838    ///
839    /// <https://www.okx.com/docs-v5/en/#trading-account-rest-api-get-positions>
840    pub async fn http_get_positions(
841        &self,
842        params: GetPositionsParams,
843    ) -> Result<Vec<OKXPosition>, OKXHttpError> {
844        let path = Self::build_path("/api/v5/account/positions", &params)?;
845        self.send_request(Method::GET, &path, None, true).await
846    }
847
848    /// Requests closed or historical position data.
849    ///
850    /// # Errors
851    ///
852    /// Returns an error if the operation fails.
853    ///
854    /// # References
855    ///
856    /// <https://www.okx.com/docs-v5/en/#trading-account-rest-api-get-positions-history>
857    pub async fn http_get_position_history(
858        &self,
859        params: GetPositionsHistoryParams,
860    ) -> Result<Vec<OKXPositionHistory>, OKXHttpError> {
861        let path = Self::build_path("/api/v5/account/positions-history", &params)?;
862        self.send_request(Method::GET, &path, None, true).await
863    }
864
865    /// Requests transaction details (fills) for the given parameters.
866    ///
867    /// # Errors
868    ///
869    /// Returns an error if the operation fails.
870    ///
871    /// # References
872    ///
873    /// <https://www.okx.com/docs-v5/en/#order-book-trading-trade-get-transaction-details-last-3-days>
874    pub async fn http_get_transaction_details(
875        &self,
876        params: GetTransactionDetailsParams,
877    ) -> Result<Vec<OKXTransactionDetail>, OKXHttpError> {
878        let path = Self::build_path("/api/v5/trade/fills", &params)?;
879        self.send_request(Method::GET, &path, None, true).await
880    }
881}
882
883/// Provides a higher-level HTTP client for the [OKX](https://okx.com) REST API.
884///
885/// This client wraps the underlying `OKXHttpInnerClient` to handle conversions
886/// into the Nautilus domain model.
887#[derive(Clone, Debug)]
888#[cfg_attr(
889    feature = "python",
890    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.adapters")
891)]
892pub struct OKXHttpClient {
893    pub(crate) inner: Arc<OKXHttpInnerClient>,
894    pub(crate) instruments_cache: Arc<Mutex<HashMap<Ustr, InstrumentAny>>>,
895    cache_initialized: bool,
896}
897
898impl Default for OKXHttpClient {
899    fn default() -> Self {
900        Self::new(None, Some(60), None, None, None, false)
901            .expect("Failed to create default OKXHttpClient")
902    }
903}
904
905impl OKXHttpClient {
906    /// Creates a new [`OKXHttpClient`] using the default OKX HTTP URL,
907    /// optionally overridden with a custom base url.
908    ///
909    /// This version of the client has **no credentials**, so it can only
910    /// call publicly accessible endpoints.
911    ///
912    /// # Errors
913    ///
914    /// Returns an error if the retry manager cannot be created.
915    pub fn new(
916        base_url: Option<String>,
917        timeout_secs: Option<u64>,
918        max_retries: Option<u32>,
919        retry_delay_ms: Option<u64>,
920        retry_delay_max_ms: Option<u64>,
921        is_demo: bool,
922    ) -> anyhow::Result<Self> {
923        Ok(Self {
924            inner: Arc::new(OKXHttpInnerClient::new(
925                base_url,
926                timeout_secs,
927                max_retries,
928                retry_delay_ms,
929                retry_delay_max_ms,
930                is_demo,
931            )?),
932            instruments_cache: Arc::new(Mutex::new(HashMap::new())),
933            cache_initialized: false,
934        })
935    }
936
937    /// Creates a new authenticated [`OKXHttpClient`] using environment variables and
938    /// the default OKX HTTP base url.
939    ///
940    /// # Errors
941    ///
942    /// Returns an error if the operation fails.
943    pub fn from_env() -> anyhow::Result<Self> {
944        Self::with_credentials(None, None, None, None, None, None, None, None, false)
945    }
946
947    /// Creates a new [`OKXHttpClient`] configured with credentials
948    /// for authenticated requests, optionally using a custom base url.
949    ///
950    /// # Errors
951    ///
952    /// Returns an error if the operation fails.
953    #[allow(clippy::too_many_arguments)]
954    pub fn with_credentials(
955        api_key: Option<String>,
956        api_secret: Option<String>,
957        api_passphrase: Option<String>,
958        base_url: Option<String>,
959        timeout_secs: Option<u64>,
960        max_retries: Option<u32>,
961        retry_delay_ms: Option<u64>,
962        retry_delay_max_ms: Option<u64>,
963        is_demo: bool,
964    ) -> anyhow::Result<Self> {
965        let api_key = get_or_env_var(api_key, "OKX_API_KEY")?;
966        let api_secret = get_or_env_var(api_secret, "OKX_API_SECRET")?;
967        let api_passphrase = get_or_env_var(api_passphrase, "OKX_API_PASSPHRASE")?;
968        let base_url = base_url.unwrap_or(OKX_HTTP_URL.to_string());
969
970        Ok(Self {
971            inner: Arc::new(OKXHttpInnerClient::with_credentials(
972                api_key,
973                api_secret,
974                api_passphrase,
975                base_url,
976                timeout_secs,
977                max_retries,
978                retry_delay_ms,
979                retry_delay_max_ms,
980                is_demo,
981            )?),
982            instruments_cache: Arc::new(Mutex::new(HashMap::new())),
983            cache_initialized: false,
984        })
985    }
986
987    /// Retrieves an instrument from the cache.
988    ///
989    /// # Errors
990    ///
991    /// Returns an error if the instrument is not found in the cache.
992    fn get_instrument_from_cache(&self, symbol: Ustr) -> anyhow::Result<InstrumentAny> {
993        self.instruments_cache
994            .lock()
995            .expect("`instruments_cache` lock poisoned")
996            .get(&symbol)
997            .cloned()
998            .ok_or_else(|| anyhow::anyhow!("Instrument {symbol} not in cache"))
999    }
1000
1001    async fn instrument_or_fetch(&self, symbol: Ustr) -> anyhow::Result<InstrumentAny> {
1002        if let Ok(inst) = self.get_instrument_from_cache(symbol) {
1003            return Ok(inst);
1004        }
1005
1006        for group in [
1007            OKXInstrumentType::Spot,
1008            OKXInstrumentType::Margin,
1009            OKXInstrumentType::Futures,
1010            OKXInstrumentType::Swap,
1011            OKXInstrumentType::Option,
1012        ] {
1013            if let Ok(instruments) = self.request_instruments(group, None).await {
1014                let mut guard = self.instruments_cache.lock().expect(MUTEX_POISONED);
1015                for inst in instruments {
1016                    guard.insert(inst.raw_symbol().inner(), inst);
1017                }
1018                drop(guard);
1019
1020                if let Ok(inst) = self.get_instrument_from_cache(symbol) {
1021                    return Ok(inst);
1022                }
1023            }
1024        }
1025
1026        anyhow::bail!("Instrument {symbol} not in cache and fetch failed");
1027    }
1028
1029    /// Cancel all pending HTTP requests.
1030    pub fn cancel_all_requests(&self) {
1031        self.inner.cancel_all_requests();
1032    }
1033
1034    /// Get the cancellation token for this client.
1035    pub fn cancellation_token(&self) -> &CancellationToken {
1036        self.inner.cancellation_token()
1037    }
1038
1039    /// Returns the base url being used by the client.
1040    pub fn base_url(&self) -> &str {
1041        self.inner.base_url.as_str()
1042    }
1043
1044    /// Returns the public API key being used by the client.
1045    pub fn api_key(&self) -> Option<&str> {
1046        self.inner.credential.as_ref().map(|c| c.api_key.as_str())
1047    }
1048
1049    /// Returns whether the client is configured for demo trading.
1050    #[must_use]
1051    pub fn is_demo(&self) -> bool {
1052        self.inner.is_demo
1053    }
1054
1055    /// Requests the current server time from OKX.
1056    ///
1057    /// Returns the OKX system time as a Unix timestamp in milliseconds.
1058    ///
1059    /// # Errors
1060    ///
1061    /// Returns an error if the HTTP request fails or if the response cannot be parsed.
1062    pub async fn http_get_server_time(&self) -> Result<u64, OKXHttpError> {
1063        self.inner.http_get_server_time().await
1064    }
1065
1066    /// Checks if the client is initialized.
1067    ///
1068    /// The client is considered initialized if any instruments have been cached from the venue.
1069    #[must_use]
1070    pub const fn is_initialized(&self) -> bool {
1071        self.cache_initialized
1072    }
1073
1074    /// Generates a timestamp for initialization.
1075    fn generate_ts_init(&self) -> UnixNanos {
1076        get_atomic_clock_realtime().get_time_ns()
1077    }
1078
1079    /// Returns the cached instrument symbols.
1080    #[must_use]
1081    /// Returns a snapshot of all instrument symbols currently held in the
1082    /// internal cache.
1083    ///
1084    /// # Panics
1085    ///
1086    /// Panics if the internal mutex guarding the instrument cache is poisoned
1087    /// (which would indicate a previous panic while the lock was held).
1088    pub fn get_cached_symbols(&self) -> Vec<String> {
1089        self.instruments_cache
1090            .lock()
1091            .unwrap()
1092            .keys()
1093            .map(std::string::ToString::to_string)
1094            .collect()
1095    }
1096
1097    /// Adds the `instruments` to the clients instrument cache.
1098    ///
1099    /// Any existing instruments will be replaced.
1100    /// Inserts multiple instruments into the local cache.
1101    ///
1102    /// # Panics
1103    ///
1104    /// Panics if the instruments cache mutex is poisoned.
1105    pub fn add_instruments(&mut self, instruments: Vec<InstrumentAny>) {
1106        for inst in instruments {
1107            self.instruments_cache
1108                .lock()
1109                .unwrap()
1110                .insert(inst.raw_symbol().inner(), inst);
1111        }
1112        self.cache_initialized = true;
1113    }
1114
1115    /// Adds the `instrument` to the clients instrument cache.
1116    ///
1117    /// Any existing instrument will be replaced.
1118    /// Inserts a single instrument into the local cache.
1119    ///
1120    /// # Panics
1121    ///
1122    /// Panics if the instruments cache mutex is poisoned.
1123    pub fn add_instrument(&mut self, instrument: InstrumentAny) {
1124        self.instruments_cache
1125            .lock()
1126            .unwrap()
1127            .insert(instrument.raw_symbol().inner(), instrument);
1128        self.cache_initialized = true;
1129    }
1130
1131    /// Requests the account state for the `account_id` from OKX.
1132    ///
1133    /// # Errors
1134    ///
1135    /// Returns an error if the HTTP request fails or no account state is returned.
1136    pub async fn request_account_state(
1137        &self,
1138        account_id: AccountId,
1139    ) -> anyhow::Result<AccountState> {
1140        let resp = self
1141            .inner
1142            .http_get_balance()
1143            .await
1144            .map_err(|e| anyhow::anyhow!(e))?;
1145
1146        let ts_init = self.generate_ts_init();
1147        let raw = resp
1148            .first()
1149            .ok_or_else(|| anyhow::anyhow!("No account state returned from OKX"))?;
1150        let account_state = parse_account_state(raw, account_id, ts_init)?;
1151
1152        Ok(account_state)
1153    }
1154
1155    /// Sets the position mode for the account.
1156    ///
1157    /// Defaults to NetMode if no position mode is provided.
1158    ///
1159    /// # Errors
1160    ///
1161    /// Returns an error if the HTTP request fails or the position mode cannot be set.
1162    ///
1163    /// # Note
1164    ///
1165    /// This endpoint only works for accounts with derivatives trading enabled.
1166    /// If the account only has spot trading, this will return an error.
1167    pub async fn set_position_mode(&self, position_mode: OKXPositionMode) -> anyhow::Result<()> {
1168        let mut params = SetPositionModeParamsBuilder::default();
1169        params.pos_mode(position_mode);
1170        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1171
1172        match self.inner.http_set_position_mode(params).await {
1173            Ok(_) => Ok(()),
1174            Err(e) => {
1175                if let OKXHttpError::OkxError {
1176                    error_code,
1177                    message,
1178                } = &e
1179                    && error_code == "50115"
1180                {
1181                    tracing::warn!(
1182                        "Account does not support position mode setting (derivatives trading not enabled): {message}"
1183                    );
1184                    return Ok(()); // Gracefully handle this case
1185                }
1186                anyhow::bail!(e)
1187            }
1188        }
1189    }
1190
1191    /// Requests all instruments for the `instrument_type` from OKX.
1192    ///
1193    /// # Errors
1194    ///
1195    /// Returns an error if the HTTP request fails or instrument parsing fails.
1196    pub async fn request_instruments(
1197        &self,
1198        instrument_type: OKXInstrumentType,
1199        instrument_family: Option<String>,
1200    ) -> anyhow::Result<Vec<InstrumentAny>> {
1201        let mut params = GetInstrumentsParamsBuilder::default();
1202        params.inst_type(instrument_type);
1203
1204        if let Some(family) = instrument_family.clone() {
1205            params.inst_family(family);
1206        }
1207
1208        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1209
1210        let resp = self
1211            .inner
1212            .http_get_instruments(params)
1213            .await
1214            .map_err(|e| anyhow::anyhow!(e))?;
1215
1216        let fee_rate_opt = {
1217            let fee_params = GetTradeFeeParams {
1218                inst_type: instrument_type,
1219                uly: None,
1220                inst_family: instrument_family,
1221            };
1222
1223            match self.inner.http_get_trade_fee(fee_params).await {
1224                Ok(rates) => rates.into_iter().next(),
1225                Err(OKXHttpError::MissingCredentials) => {
1226                    log::debug!("Missing credentials for fee rates, using None");
1227                    None
1228                }
1229                Err(e) => {
1230                    log::warn!("Failed to fetch fee rates for {instrument_type}: {e}");
1231                    None
1232                }
1233            }
1234        };
1235
1236        let ts_init = self.generate_ts_init();
1237
1238        let mut instruments: Vec<InstrumentAny> = Vec::new();
1239        for inst in &resp {
1240            // Determine which fee fields to use based on contract type
1241            let (maker_fee, taker_fee) = if let Some(ref fee_rate) = fee_rate_opt {
1242                let is_usdt_margined =
1243                    inst.ct_type == crate::common::enums::OKXContractType::Linear;
1244                let (maker_str, taker_str) = if is_usdt_margined {
1245                    (&fee_rate.maker_u, &fee_rate.taker_u)
1246                } else {
1247                    (&fee_rate.maker, &fee_rate.taker)
1248                };
1249
1250                let maker = if !maker_str.is_empty() {
1251                    Decimal::from_str(maker_str).ok()
1252                } else {
1253                    None
1254                };
1255                let taker = if !taker_str.is_empty() {
1256                    Decimal::from_str(taker_str).ok()
1257                } else {
1258                    None
1259                };
1260
1261                (maker, taker)
1262            } else {
1263                (None, None)
1264            };
1265
1266            match parse_instrument_any(inst, None, None, maker_fee, taker_fee, ts_init) {
1267                Ok(Some(instrument_any)) => {
1268                    instruments.push(instrument_any);
1269                }
1270                Ok(None) => {
1271                    // Unsupported instrument type, skip silently
1272                }
1273                Err(e) => {
1274                    log::warn!("Failed to parse instrument {}: {e}", inst.inst_id);
1275                }
1276            }
1277        }
1278
1279        Ok(instruments)
1280    }
1281
1282    /// Requests the latest mark price for the `instrument_type` from OKX.
1283    ///
1284    /// # Errors
1285    ///
1286    /// Returns an error if the HTTP request fails or no mark price is returned.
1287    pub async fn request_mark_price(
1288        &self,
1289        instrument_id: InstrumentId,
1290    ) -> anyhow::Result<MarkPriceUpdate> {
1291        let mut params = GetMarkPriceParamsBuilder::default();
1292        params.inst_id(instrument_id.symbol.inner());
1293        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1294
1295        let resp = self
1296            .inner
1297            .http_get_mark_price(params)
1298            .await
1299            .map_err(|e| anyhow::anyhow!(e))?;
1300
1301        let raw = resp
1302            .first()
1303            .ok_or_else(|| anyhow::anyhow!("No mark price returned from OKX"))?;
1304        let inst = self
1305            .instrument_or_fetch(instrument_id.symbol.inner())
1306            .await?;
1307        let ts_init = self.generate_ts_init();
1308
1309        let mark_price =
1310            parse_mark_price_update(raw, instrument_id, inst.price_precision(), ts_init)
1311                .map_err(|e| anyhow::anyhow!(e))?;
1312        Ok(mark_price)
1313    }
1314
1315    /// Requests the latest index price for the `instrument_id` from OKX.
1316    ///
1317    /// # Errors
1318    ///
1319    /// Returns an error if the HTTP request fails or no index price is returned.
1320    pub async fn request_index_price(
1321        &self,
1322        instrument_id: InstrumentId,
1323    ) -> anyhow::Result<IndexPriceUpdate> {
1324        let mut params = GetIndexTickerParamsBuilder::default();
1325        params.inst_id(instrument_id.symbol.inner());
1326        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1327
1328        let resp = self
1329            .inner
1330            .http_get_index_ticker(params)
1331            .await
1332            .map_err(|e| anyhow::anyhow!(e))?;
1333
1334        let raw = resp
1335            .first()
1336            .ok_or_else(|| anyhow::anyhow!("No index price returned from OKX"))?;
1337        let inst = self
1338            .instrument_or_fetch(instrument_id.symbol.inner())
1339            .await?;
1340        let ts_init = self.generate_ts_init();
1341
1342        let index_price =
1343            parse_index_price_update(raw, instrument_id, inst.price_precision(), ts_init)
1344                .map_err(|e| anyhow::anyhow!(e))?;
1345        Ok(index_price)
1346    }
1347
1348    /// Requests trades for the `instrument_id` and `start` -> `end` time range.
1349    ///
1350    /// # Errors
1351    ///
1352    /// Returns an error if the HTTP request fails or trade parsing fails.
1353    pub async fn request_trades(
1354        &self,
1355        instrument_id: InstrumentId,
1356        start: Option<DateTime<Utc>>,
1357        end: Option<DateTime<Utc>>,
1358        limit: Option<u32>,
1359    ) -> anyhow::Result<Vec<TradeTick>> {
1360        let mut params = GetTradesParamsBuilder::default();
1361
1362        params.inst_id(instrument_id.symbol.inner());
1363        if let Some(s) = start {
1364            params.after(s.timestamp_millis().to_string());
1365        }
1366        if let Some(e) = end {
1367            params.before(e.timestamp_millis().to_string());
1368        }
1369        // OKX expects the optional `limit` parameter to be between 1 and 100 (default 100).
1370        // The request layer uses 0 to express "no explicit limit", so we omit the field in that case
1371        // and clamp any larger value to the documented maximum to avoid 51000 parameter errors.
1372        const OKX_TRADES_MAX_LIMIT: u32 = 100;
1373        if let Some(l) = limit
1374            && l > 0
1375        {
1376            params.limit(l.min(OKX_TRADES_MAX_LIMIT));
1377        }
1378
1379        let params = params.build().map_err(anyhow::Error::new)?;
1380
1381        // Fetch raw trades
1382        let raw_trades = self
1383            .inner
1384            .http_get_trades(params)
1385            .await
1386            .map_err(anyhow::Error::new)?;
1387
1388        let ts_init = self.generate_ts_init();
1389        let inst = self
1390            .instrument_or_fetch(instrument_id.symbol.inner())
1391            .await?;
1392
1393        let mut trades = Vec::with_capacity(raw_trades.len());
1394        for raw in raw_trades {
1395            match parse_trade_tick(
1396                &raw,
1397                instrument_id,
1398                inst.price_precision(),
1399                inst.size_precision(),
1400                ts_init,
1401            ) {
1402                Ok(trade) => trades.push(trade),
1403                Err(e) => tracing::error!("{e}"),
1404            }
1405        }
1406
1407        Ok(trades)
1408    }
1409
1410    /// Requests historical bars for the given bar type and time range.
1411    ///
1412    /// The aggregation source must be `EXTERNAL`. Time range validation ensures start < end.
1413    /// Returns bars sorted oldest to newest.
1414    ///
1415    /// # Errors
1416    ///
1417    /// Returns an error if the request fails.
1418    ///
1419    /// # Endpoint Selection
1420    ///
1421    /// The OKX API has different endpoints with different limits:
1422    /// - Regular endpoint (`/api/v5/market/candles`): ≤ 300 rows/call, ≤ 40 req/2s
1423    ///   - Used when: start is None OR age ≤ 100 days
1424    /// - History endpoint (`/api/v5/market/history-candles`): ≤ 100 rows/call, ≤ 20 req/2s
1425    ///   - Used when: start is Some AND age > 100 days
1426    ///
1427    /// Age is calculated as `Utc::now() - start` at the time of the first request.
1428    ///
1429    /// # Supported Aggregations
1430    ///
1431    /// Maps to OKX bar query parameter:
1432    /// - `Second` → `{n}s`
1433    /// - `Minute` → `{n}m`
1434    /// - `Hour` → `{n}H`
1435    /// - `Day` → `{n}D`
1436    /// - `Week` → `{n}W`
1437    /// - `Month` → `{n}M`
1438    ///
1439    /// # Pagination
1440    ///
1441    /// - Uses `before` parameter for backwards pagination
1442    /// - Pages backwards from end time (or now) to start time
1443    /// - Stops when: limit reached, time window covered, or API returns empty
1444    /// - Rate limit safety: ≥ 50ms between requests
1445    ///
1446    /// # Panics
1447    ///
1448    /// May panic if internal data structures are in an unexpected state.
1449    ///
1450    /// # References
1451    ///
1452    /// - <https://tr.okx.com/docs-v5/en/#order-book-trading-market-data-get-candlesticks>
1453    /// - <https://tr.okx.com/docs-v5/en/#order-book-trading-market-data-get-candlesticks-history>
1454    pub async fn request_bars(
1455        &self,
1456        bar_type: BarType,
1457        start: Option<DateTime<Utc>>,
1458        mut end: Option<DateTime<Utc>>,
1459        limit: Option<u32>,
1460    ) -> anyhow::Result<Vec<Bar>> {
1461        const HISTORY_SPLIT_DAYS: i64 = 100;
1462        const MAX_PAGES_SOFT: usize = 500;
1463
1464        let limit = if limit == Some(0) { None } else { limit };
1465
1466        anyhow::ensure!(
1467            bar_type.aggregation_source() == AggregationSource::External,
1468            "Only EXTERNAL aggregation is supported"
1469        );
1470        if let (Some(s), Some(e)) = (start, end) {
1471            anyhow::ensure!(s < e, "Invalid time range: start={s:?} end={e:?}");
1472        }
1473
1474        let now = Utc::now();
1475        if let Some(s) = start
1476            && s > now
1477        {
1478            return Ok(Vec::new());
1479        }
1480        if let Some(e) = end
1481            && e > now
1482        {
1483            end = Some(now);
1484        }
1485
1486        let spec = bar_type.spec();
1487        let step = spec.step.get();
1488        let bar_param = match spec.aggregation {
1489            BarAggregation::Second => format!("{step}s"),
1490            BarAggregation::Minute => format!("{step}m"),
1491            BarAggregation::Hour => format!("{step}H"),
1492            BarAggregation::Day => format!("{step}D"),
1493            BarAggregation::Week => format!("{step}W"),
1494            BarAggregation::Month => format!("{step}M"),
1495            a => anyhow::bail!("OKX does not support {a:?} aggregation"),
1496        };
1497
1498        let slot_ms: i64 = match spec.aggregation {
1499            BarAggregation::Second => (step as i64) * 1_000,
1500            BarAggregation::Minute => (step as i64) * 60_000,
1501            BarAggregation::Hour => (step as i64) * 3_600_000,
1502            BarAggregation::Day => (step as i64) * 86_400_000,
1503            BarAggregation::Week => (step as i64) * 7 * 86_400_000,
1504            BarAggregation::Month => (step as i64) * 30 * 86_400_000,
1505            _ => unreachable!("Unsupported aggregation should have been caught above"),
1506        };
1507        let slot_ns: i64 = slot_ms * 1_000_000;
1508
1509        #[derive(Clone, Copy, Debug, PartialEq, Eq)]
1510        enum Mode {
1511            Latest,
1512            Backward,
1513            Range,
1514        }
1515
1516        let mode = match (start, end) {
1517            (None, None) => Mode::Latest,
1518            (Some(_), None) => Mode::Backward, // Changed: when only start is provided, work backward from now
1519            (None, Some(_)) => Mode::Backward,
1520            (Some(_), Some(_)) => Mode::Range,
1521        };
1522
1523        let start_ns = start.and_then(|s| s.timestamp_nanos_opt());
1524        let end_ns = end.and_then(|e| e.timestamp_nanos_opt());
1525
1526        // Floor start and ceiling end to bar boundaries for cleaner API requests
1527        let start_ms = start.map(|s| {
1528            let ms = s.timestamp_millis();
1529            if slot_ms > 0 {
1530                (ms / slot_ms) * slot_ms // Floor to nearest bar boundary
1531            } else {
1532                ms
1533            }
1534        });
1535        let end_ms = end.map(|e| {
1536            let ms = e.timestamp_millis();
1537            if slot_ms > 0 {
1538                ((ms + slot_ms - 1) / slot_ms) * slot_ms // Ceiling to nearest bar boundary
1539            } else {
1540                ms
1541            }
1542        });
1543        let now_ms = now.timestamp_millis();
1544
1545        let symbol = bar_type.instrument_id().symbol;
1546        let inst = self.instrument_or_fetch(symbol.inner()).await?;
1547
1548        let mut out: Vec<Bar> = Vec::new();
1549        let mut pages = 0usize;
1550
1551        // IMPORTANT: OKX API behavior:
1552        // - With 'after' parameter: returns bars with timestamp > after (going forward)
1553        // - With 'before' parameter: returns bars with timestamp < before (going backward)
1554        // For Range mode, we use 'before' starting from the end time to get bars before it
1555        let mut after_ms: Option<i64> = None;
1556        let mut before_ms: Option<i64> = match mode {
1557            Mode::Backward => end_ms.map(|v| v.saturating_sub(1)),
1558            Mode::Range => {
1559                // For Range, start from the end time (or current time if no end specified)
1560                // The API will return bars with timestamp < before_ms
1561                Some(end_ms.unwrap_or(now_ms))
1562            }
1563            Mode::Latest => None,
1564        };
1565
1566        // For Range mode, we'll paginate backwards like Backward mode
1567        let mut forward_prepend_mode = matches!(mode, Mode::Range);
1568
1569        // Adjust before_ms to ensure we get data from the API
1570        // OKX API might not have bars for the very recent past
1571        // This handles both explicit end=now and the actor layer setting end=now when it's None
1572        if matches!(mode, Mode::Backward | Mode::Range)
1573            && let Some(b) = before_ms
1574        {
1575            // OKX endpoints have different data availability windows:
1576            // - Regular endpoint: has most recent data but limited depth
1577            // - History endpoint: has deep history but lags behind current time
1578            // Use a small buffer to avoid the "dead zone"
1579            let buffer_ms = slot_ms.max(60_000); // At least 1 minute or 1 bar
1580            if b >= now_ms.saturating_sub(buffer_ms) {
1581                before_ms = Some(now_ms.saturating_sub(buffer_ms));
1582            }
1583        }
1584
1585        let mut have_latest_first_page = false;
1586        let mut progressless_loops = 0u8;
1587
1588        loop {
1589            if let Some(lim) = limit
1590                && lim > 0
1591                && out.len() >= lim as usize
1592            {
1593                break;
1594            }
1595            if pages >= MAX_PAGES_SOFT {
1596                break;
1597            }
1598
1599            let pivot_ms = if let Some(a) = after_ms {
1600                a
1601            } else if let Some(b) = before_ms {
1602                b
1603            } else {
1604                now_ms
1605            };
1606            // Choose endpoint based on how old the data is:
1607            // - Use regular endpoint for recent data (< 1 hour old)
1608            // - Use history endpoint for older data (> 1 hour old)
1609            // This avoids the "gap" where history endpoint has no recent data
1610            // and regular endpoint has limited depth
1611            let age_ms = now_ms.saturating_sub(pivot_ms);
1612            let age_hours = age_ms / (60 * 60 * 1000);
1613            let using_history = age_hours > 1; // Use history if data is > 1 hour old
1614
1615            let page_ceiling = if using_history { 100 } else { 300 };
1616            let remaining = limit
1617                .filter(|&l| l > 0) // Treat limit=0 as no limit
1618                .map_or(page_ceiling, |l| (l as usize).saturating_sub(out.len()));
1619            let page_cap = remaining.min(page_ceiling);
1620
1621            let mut p = GetCandlesticksParamsBuilder::default();
1622            p.inst_id(symbol.as_str())
1623                .bar(&bar_param)
1624                .limit(page_cap as u32);
1625
1626            // Track whether this planned request uses BEFORE or AFTER.
1627            let mut req_used_before = false;
1628
1629            match mode {
1630                Mode::Latest => {
1631                    if have_latest_first_page && let Some(b) = before_ms {
1632                        p.before_ms(b);
1633                        req_used_before = true;
1634                    }
1635                }
1636                Mode::Backward => {
1637                    if let Some(b) = before_ms {
1638                        p.before_ms(b);
1639                        req_used_before = true;
1640                    }
1641                }
1642                Mode::Range => {
1643                    // For first request with regular endpoint, try without parameters
1644                    // to get the most recent bars, then filter
1645                    if pages == 0 && !using_history {
1646                        // Don't set any time parameters on first request
1647                        // This gets the most recent bars available
1648                    } else if forward_prepend_mode {
1649                        if let Some(b) = before_ms {
1650                            p.before_ms(b);
1651                            req_used_before = true;
1652                        }
1653                    } else if let Some(a) = after_ms {
1654                        p.after_ms(a);
1655                    }
1656                }
1657            }
1658
1659            let params = p.build().map_err(anyhow::Error::new)?;
1660
1661            let mut raw = if using_history {
1662                self.inner
1663                    .http_get_candlesticks_history(params.clone())
1664                    .await
1665                    .map_err(anyhow::Error::new)?
1666            } else {
1667                self.inner
1668                    .http_get_candlesticks(params.clone())
1669                    .await
1670                    .map_err(anyhow::Error::new)?
1671            };
1672
1673            // --- Fallbacks on empty page ---
1674            if raw.is_empty() {
1675                // LATEST: retry same cursor via history, then step back a page-interval before giving up
1676                if matches!(mode, Mode::Latest)
1677                    && have_latest_first_page
1678                    && !using_history
1679                    && let Some(b) = before_ms
1680                {
1681                    let mut p2 = GetCandlesticksParamsBuilder::default();
1682                    p2.inst_id(symbol.as_str())
1683                        .bar(&bar_param)
1684                        .limit(page_cap as u32);
1685                    p2.before_ms(b);
1686                    let params2 = p2.build().map_err(anyhow::Error::new)?;
1687                    let raw2 = self
1688                        .inner
1689                        .http_get_candlesticks_history(params2)
1690                        .await
1691                        .map_err(anyhow::Error::new)?;
1692                    if !raw2.is_empty() {
1693                        raw = raw2;
1694                    } else {
1695                        // Step back one page interval and retry loop
1696                        let jump = (page_cap as i64).saturating_mul(slot_ms.max(1));
1697                        before_ms = Some(b.saturating_sub(jump));
1698                        progressless_loops = progressless_loops.saturating_add(1);
1699                        if progressless_loops >= 3 {
1700                            break;
1701                        }
1702                        continue;
1703                    }
1704                }
1705
1706                // Range mode doesn't need special bootstrap - it uses the normal flow with before_ms set
1707
1708                // If still empty: for Range after first page, try a single backstep window using BEFORE
1709                if raw.is_empty() && matches!(mode, Mode::Range) && pages > 0 {
1710                    let backstep_ms = (page_cap as i64).saturating_mul(slot_ms.max(1));
1711                    let pivot_back = after_ms.unwrap_or(now_ms).saturating_sub(backstep_ms);
1712
1713                    let mut p2 = GetCandlesticksParamsBuilder::default();
1714                    p2.inst_id(symbol.as_str())
1715                        .bar(&bar_param)
1716                        .limit(page_cap as u32)
1717                        .before_ms(pivot_back);
1718                    let params2 = p2.build().map_err(anyhow::Error::new)?;
1719                    let raw2 = if (now_ms.saturating_sub(pivot_back)) / (24 * 60 * 60 * 1000)
1720                        > HISTORY_SPLIT_DAYS
1721                    {
1722                        self.inner.http_get_candlesticks_history(params2).await
1723                    } else {
1724                        self.inner.http_get_candlesticks(params2).await
1725                    }
1726                    .map_err(anyhow::Error::new)?;
1727                    if raw2.is_empty() {
1728                        break;
1729                    } else {
1730                        raw = raw2;
1731                        forward_prepend_mode = true;
1732                        req_used_before = true;
1733                    }
1734                }
1735
1736                // First LATEST page empty: jump back >100d to force history, then continue loop
1737                if raw.is_empty()
1738                    && matches!(mode, Mode::Latest)
1739                    && !have_latest_first_page
1740                    && !using_history
1741                {
1742                    let jump_days_ms = (HISTORY_SPLIT_DAYS + 1) * 86_400_000;
1743                    before_ms = Some(now_ms.saturating_sub(jump_days_ms));
1744                    have_latest_first_page = true;
1745                    continue;
1746                }
1747
1748                // Still empty for any other case? Just break.
1749                if raw.is_empty() {
1750                    break;
1751                }
1752            }
1753            // --- end fallbacks ---
1754
1755            pages += 1;
1756
1757            // Parse, oldest → newest
1758            let ts_init = self.generate_ts_init();
1759            let mut page: Vec<Bar> = Vec::with_capacity(raw.len());
1760            for r in &raw {
1761                page.push(parse_candlestick(
1762                    r,
1763                    bar_type,
1764                    inst.price_precision(),
1765                    inst.size_precision(),
1766                    ts_init,
1767                )?);
1768            }
1769            page.reverse();
1770
1771            let page_oldest_ms = page.first().map(|b| b.ts_event.as_i64() / 1_000_000);
1772            let page_newest_ms = page.last().map(|b| b.ts_event.as_i64() / 1_000_000);
1773
1774            // Range filter (inclusive)
1775            // For Range mode, if we have no bars yet and this is an early page,
1776            // be more tolerant with the start boundary to handle gaps in data
1777            let mut filtered: Vec<Bar> = if matches!(mode, Mode::Range)
1778                && out.is_empty()
1779                && pages < 2
1780            {
1781                // On first pages of Range mode with no data yet, include the most recent bar
1782                // even if it's slightly before our start time (within 2 bar periods)
1783                // BUT we want ALL bars in the page that are within our range
1784                let tolerance_ns = slot_ns * 2; // Allow up to 2 bar periods before start
1785
1786                // Debug: log the page range
1787                if !page.is_empty() {
1788                    tracing::debug!(
1789                        "Range mode bootstrap page: {} bars from {} to {}, filtering with start={:?} end={:?}",
1790                        page.len(),
1791                        page.first().unwrap().ts_event.as_i64() / 1_000_000,
1792                        page.last().unwrap().ts_event.as_i64() / 1_000_000,
1793                        start_ms,
1794                        end_ms
1795                    );
1796                }
1797
1798                let result: Vec<Bar> = page
1799                    .clone()
1800                    .into_iter()
1801                    .filter(|b| {
1802                        let ts = b.ts_event.as_i64();
1803                        // Accept bars from (start - tolerance) to end
1804                        let ok_after =
1805                            start_ns.is_none_or(|sns| ts >= sns.saturating_sub(tolerance_ns));
1806                        let ok_before = end_ns.is_none_or(|ens| ts <= ens);
1807                        ok_after && ok_before
1808                    })
1809                    .collect();
1810
1811                result
1812            } else {
1813                // Normal filtering
1814                page.clone()
1815                    .into_iter()
1816                    .filter(|b| {
1817                        let ts = b.ts_event.as_i64();
1818                        let ok_after = start_ns.is_none_or(|sns| ts >= sns);
1819                        let ok_before = end_ns.is_none_or(|ens| ts <= ens);
1820                        ok_after && ok_before
1821                    })
1822                    .collect()
1823            };
1824
1825            if !page.is_empty() && filtered.is_empty() {
1826                // For Range mode, if all bars are before our start time, there's no point continuing
1827                if matches!(mode, Mode::Range)
1828                    && !forward_prepend_mode
1829                    && let (Some(newest_ms), Some(start_ms)) = (page_newest_ms, start_ms)
1830                    && newest_ms < start_ms.saturating_sub(slot_ms * 2)
1831                {
1832                    // Bars are too old (more than 2 bar periods before start), stop
1833                    break;
1834                }
1835            }
1836
1837            // Track contribution for progress guard
1838            let contribution;
1839
1840            if out.is_empty() {
1841                contribution = filtered.len();
1842                out = filtered;
1843            } else {
1844                match mode {
1845                    Mode::Backward | Mode::Latest => {
1846                        if let Some(first) = out.first() {
1847                            filtered.retain(|b| b.ts_event < first.ts_event);
1848                        }
1849                        contribution = filtered.len();
1850                        if contribution != 0 {
1851                            let mut new_out = Vec::with_capacity(out.len() + filtered.len());
1852                            new_out.extend_from_slice(&filtered);
1853                            new_out.extend_from_slice(&out);
1854                            out = new_out;
1855                        }
1856                    }
1857                    Mode::Range => {
1858                        if forward_prepend_mode || req_used_before {
1859                            // We are backfilling older pages: prepend them.
1860                            if let Some(first) = out.first() {
1861                                filtered.retain(|b| b.ts_event < first.ts_event);
1862                            }
1863                            contribution = filtered.len();
1864                            if contribution != 0 {
1865                                let mut new_out = Vec::with_capacity(out.len() + filtered.len());
1866                                new_out.extend_from_slice(&filtered);
1867                                new_out.extend_from_slice(&out);
1868                                out = new_out;
1869                            }
1870                        } else {
1871                            // Normal forward: append newer pages.
1872                            if let Some(last) = out.last() {
1873                                filtered.retain(|b| b.ts_event > last.ts_event);
1874                            }
1875                            contribution = filtered.len();
1876                            out.extend(filtered);
1877                        }
1878                    }
1879                }
1880            }
1881
1882            // Duplicate-window mitigation for Latest/Backward
1883            if contribution == 0
1884                && matches!(mode, Mode::Latest | Mode::Backward)
1885                && let Some(b) = before_ms
1886            {
1887                let jump = (page_cap as i64).saturating_mul(slot_ms.max(1));
1888                let new_b = b.saturating_sub(jump);
1889                if new_b != b {
1890                    before_ms = Some(new_b);
1891                }
1892            }
1893
1894            if contribution == 0 {
1895                progressless_loops = progressless_loops.saturating_add(1);
1896                if progressless_loops >= 3 {
1897                    break;
1898                }
1899            } else {
1900                progressless_loops = 0;
1901
1902                // Advance cursors only when we made progress
1903                match mode {
1904                    Mode::Latest | Mode::Backward => {
1905                        if let Some(oldest) = page_oldest_ms {
1906                            before_ms = Some(oldest.saturating_sub(1));
1907                            have_latest_first_page = true;
1908                        } else {
1909                            break;
1910                        }
1911                    }
1912                    Mode::Range => {
1913                        if forward_prepend_mode || req_used_before {
1914                            if let Some(oldest) = page_oldest_ms {
1915                                // Move back by at least one bar period to avoid getting the same data
1916                                let jump_back = slot_ms.max(60_000); // At least 1 minute
1917                                before_ms = Some(oldest.saturating_sub(jump_back));
1918                                after_ms = None;
1919                            } else {
1920                                break;
1921                            }
1922                        } else if let Some(newest) = page_newest_ms {
1923                            after_ms = Some(newest.saturating_add(1));
1924                            before_ms = None;
1925                        } else {
1926                            break;
1927                        }
1928                    }
1929                }
1930            }
1931
1932            // Stop conditions
1933            if let Some(lim) = limit
1934                && lim > 0
1935                && out.len() >= lim as usize
1936            {
1937                break;
1938            }
1939            if let Some(ens) = end_ns
1940                && let Some(last) = out.last()
1941                && last.ts_event.as_i64() >= ens
1942            {
1943                break;
1944            }
1945            if let Some(sns) = start_ns
1946                && let Some(first) = out.first()
1947                && (matches!(mode, Mode::Backward) || forward_prepend_mode)
1948                && first.ts_event.as_i64() <= sns
1949            {
1950                // For Range mode, check if we have all bars up to the end time
1951                if matches!(mode, Mode::Range) {
1952                    // Don't stop if we haven't reached the end time yet
1953                    if let Some(ens) = end_ns
1954                        && let Some(last) = out.last()
1955                    {
1956                        let last_ts = last.ts_event.as_i64();
1957                        if last_ts < ens {
1958                            // We have bars before start but haven't reached end, need to continue forward
1959                            // Switch from backward to forward pagination
1960                            forward_prepend_mode = false;
1961                            after_ms = Some((last_ts / 1_000_000).saturating_add(1));
1962                            before_ms = None;
1963                            continue;
1964                        }
1965                    }
1966                }
1967                break;
1968            }
1969
1970            tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
1971        }
1972
1973        // Final rescue for FORWARD/RANGE when nothing gathered
1974        if out.is_empty() && matches!(mode, Mode::Range) {
1975            let pivot = end_ms.unwrap_or(now_ms.saturating_sub(1));
1976            let hist = (now_ms.saturating_sub(pivot)) / (24 * 60 * 60 * 1000) > HISTORY_SPLIT_DAYS;
1977            let mut p = GetCandlesticksParamsBuilder::default();
1978            p.inst_id(symbol.as_str())
1979                .bar(&bar_param)
1980                .limit(300)
1981                .before_ms(pivot);
1982            let params = p.build().map_err(anyhow::Error::new)?;
1983            let raw = if hist {
1984                self.inner.http_get_candlesticks_history(params).await
1985            } else {
1986                self.inner.http_get_candlesticks(params).await
1987            }
1988            .map_err(anyhow::Error::new)?;
1989            if !raw.is_empty() {
1990                let ts_init = self.generate_ts_init();
1991                let mut page: Vec<Bar> = Vec::with_capacity(raw.len());
1992                for r in &raw {
1993                    page.push(parse_candlestick(
1994                        r,
1995                        bar_type,
1996                        inst.price_precision(),
1997                        inst.size_precision(),
1998                        ts_init,
1999                    )?);
2000                }
2001                page.reverse();
2002                out = page
2003                    .into_iter()
2004                    .filter(|b| {
2005                        let ts = b.ts_event.as_i64();
2006                        let ok_after = start_ns.is_none_or(|sns| ts >= sns);
2007                        let ok_before = end_ns.is_none_or(|ens| ts <= ens);
2008                        ok_after && ok_before
2009                    })
2010                    .collect();
2011            }
2012        }
2013
2014        // Trim against end bound if needed (keep ≤ end)
2015        if let Some(ens) = end_ns {
2016            while out.last().is_some_and(|b| b.ts_event.as_i64() > ens) {
2017                out.pop();
2018            }
2019        }
2020
2021        // Clamp first bar for Range when using forward pagination
2022        if matches!(mode, Mode::Range)
2023            && !forward_prepend_mode
2024            && let Some(sns) = start_ns
2025        {
2026            let lower = sns.saturating_sub(slot_ns);
2027            while out.first().is_some_and(|b| b.ts_event.as_i64() < lower) {
2028                out.remove(0);
2029            }
2030        }
2031
2032        if let Some(lim) = limit
2033            && lim > 0
2034            && out.len() > lim as usize
2035        {
2036            out.truncate(lim as usize);
2037        }
2038
2039        Ok(out)
2040    }
2041
2042    /// Requests historical order status reports for the given parameters.
2043    ///
2044    /// # Errors
2045    ///
2046    /// Returns an error if the request fails.
2047    ///
2048    /// # References
2049    ///
2050    /// - <https://www.okx.com/docs-v5/en/#order-book-trading-trade-get-order-history-last-7-days>.
2051    /// - <https://www.okx.com/docs-v5/en/#order-book-trading-trade-get-order-history-last-3-months>.
2052    #[allow(clippy::too_many_arguments)]
2053    pub async fn request_order_status_reports(
2054        &self,
2055        account_id: AccountId,
2056        instrument_type: Option<OKXInstrumentType>,
2057        instrument_id: Option<InstrumentId>,
2058        start: Option<DateTime<Utc>>,
2059        end: Option<DateTime<Utc>>,
2060        open_only: bool,
2061        limit: Option<u32>,
2062    ) -> anyhow::Result<Vec<OrderStatusReport>> {
2063        let mut history_params = GetOrderHistoryParamsBuilder::default();
2064
2065        let instrument_type = if let Some(instrument_type) = instrument_type {
2066            instrument_type
2067        } else {
2068            let instrument_id = instrument_id.ok_or_else(|| {
2069                anyhow::anyhow!("Instrument ID required if `instrument_type` not provided")
2070            })?;
2071            let instrument = self
2072                .instrument_or_fetch(instrument_id.symbol.inner())
2073                .await?;
2074            okx_instrument_type(&instrument)?
2075        };
2076
2077        history_params.inst_type(instrument_type);
2078
2079        if let Some(instrument_id) = instrument_id.as_ref() {
2080            history_params.inst_id(instrument_id.symbol.inner().to_string());
2081        }
2082
2083        if let Some(limit) = limit {
2084            history_params.limit(limit);
2085        }
2086
2087        let history_params = history_params.build().map_err(|e| anyhow::anyhow!(e))?;
2088
2089        let mut pending_params = GetOrderListParamsBuilder::default();
2090        pending_params.inst_type(instrument_type);
2091
2092        if let Some(instrument_id) = instrument_id.as_ref() {
2093            pending_params.inst_id(instrument_id.symbol.inner().to_string());
2094        }
2095
2096        if let Some(limit) = limit {
2097            pending_params.limit(limit);
2098        }
2099
2100        let pending_params = pending_params.build().map_err(|e| anyhow::anyhow!(e))?;
2101
2102        let combined_resp = if open_only {
2103            // Only request pending/open orders
2104            self.inner
2105                .http_get_order_list(pending_params)
2106                .await
2107                .map_err(|e| anyhow::anyhow!(e))?
2108        } else {
2109            // Make both requests concurrently
2110            let (history_resp, pending_resp) = tokio::try_join!(
2111                self.inner.http_get_order_history(history_params),
2112                self.inner.http_get_order_list(pending_params)
2113            )
2114            .map_err(|e| anyhow::anyhow!(e))?;
2115
2116            // Combine both responses
2117            let mut combined_resp = history_resp;
2118            combined_resp.extend(pending_resp);
2119            combined_resp
2120        };
2121
2122        // Prepare time range filter
2123        let start_ns = start.map(UnixNanos::from);
2124        let end_ns = end.map(UnixNanos::from);
2125
2126        let ts_init = self.generate_ts_init();
2127        let mut reports = Vec::with_capacity(combined_resp.len());
2128
2129        // Use a seen filter in case pending orders are within the histories "2hr reserve window"
2130        let mut seen: AHashSet<String> = AHashSet::new();
2131
2132        for order in combined_resp {
2133            let seen_key = if !order.cl_ord_id.is_empty() {
2134                order.cl_ord_id.as_str().to_string()
2135            } else if let Some(algo_cl_ord_id) = order
2136                .algo_cl_ord_id
2137                .as_ref()
2138                .filter(|value| !value.as_str().is_empty())
2139            {
2140                algo_cl_ord_id.as_str().to_string()
2141            } else if let Some(algo_id) = order
2142                .algo_id
2143                .as_ref()
2144                .filter(|value| !value.as_str().is_empty())
2145            {
2146                algo_id.as_str().to_string()
2147            } else {
2148                order.ord_id.as_str().to_string()
2149            };
2150
2151            if !seen.insert(seen_key) {
2152                continue; // Reserved pending already reported
2153            }
2154
2155            let inst = self.instrument_or_fetch(order.inst_id).await?;
2156
2157            let report = parse_order_status_report(
2158                &order,
2159                account_id,
2160                inst.id(),
2161                inst.price_precision(),
2162                inst.size_precision(),
2163                ts_init,
2164            );
2165
2166            if let Some(start_ns) = start_ns
2167                && report.ts_last < start_ns
2168            {
2169                continue;
2170            }
2171            if let Some(end_ns) = end_ns
2172                && report.ts_last > end_ns
2173            {
2174                continue;
2175            }
2176
2177            reports.push(report);
2178        }
2179
2180        Ok(reports)
2181    }
2182
2183    /// Requests fill reports (transaction details) for the given parameters.
2184    ///
2185    /// # Errors
2186    ///
2187    /// Returns an error if the request fails.
2188    ///
2189    /// # References
2190    ///
2191    /// <https://www.okx.com/docs-v5/en/#order-book-trading-trade-get-transaction-details-last-3-days>.
2192    pub async fn request_fill_reports(
2193        &self,
2194        account_id: AccountId,
2195        instrument_type: Option<OKXInstrumentType>,
2196        instrument_id: Option<InstrumentId>,
2197        start: Option<DateTime<Utc>>,
2198        end: Option<DateTime<Utc>>,
2199        limit: Option<u32>,
2200    ) -> anyhow::Result<Vec<FillReport>> {
2201        let mut params = GetTransactionDetailsParamsBuilder::default();
2202
2203        let instrument_type = if let Some(instrument_type) = instrument_type {
2204            instrument_type
2205        } else {
2206            let instrument_id = instrument_id.ok_or_else(|| {
2207                anyhow::anyhow!("Instrument ID required if `instrument_type` not provided")
2208            })?;
2209            let instrument = self
2210                .instrument_or_fetch(instrument_id.symbol.inner())
2211                .await?;
2212            okx_instrument_type(&instrument)?
2213        };
2214
2215        params.inst_type(instrument_type);
2216
2217        if let Some(instrument_id) = instrument_id {
2218            let instrument = self
2219                .instrument_or_fetch(instrument_id.symbol.inner())
2220                .await?;
2221            let instrument_type = okx_instrument_type(&instrument)?;
2222            params.inst_type(instrument_type);
2223            params.inst_id(instrument_id.symbol.inner().to_string());
2224        }
2225
2226        if let Some(limit) = limit {
2227            params.limit(limit);
2228        }
2229
2230        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2231
2232        let resp = self
2233            .inner
2234            .http_get_transaction_details(params)
2235            .await
2236            .map_err(|e| anyhow::anyhow!(e))?;
2237
2238        // Prepare time range filter
2239        let start_ns = start.map(UnixNanos::from);
2240        let end_ns = end.map(UnixNanos::from);
2241
2242        let ts_init = self.generate_ts_init();
2243        let mut reports = Vec::with_capacity(resp.len());
2244
2245        for detail in resp {
2246            let inst = self.instrument_or_fetch(detail.inst_id).await?;
2247
2248            let report = parse_fill_report(
2249                detail,
2250                account_id,
2251                inst.id(),
2252                inst.price_precision(),
2253                inst.size_precision(),
2254                ts_init,
2255            )?;
2256
2257            if let Some(start_ns) = start_ns
2258                && report.ts_event < start_ns
2259            {
2260                continue;
2261            }
2262
2263            if let Some(end_ns) = end_ns
2264                && report.ts_event > end_ns
2265            {
2266                continue;
2267            }
2268
2269            reports.push(report);
2270        }
2271
2272        Ok(reports)
2273    }
2274
2275    /// Requests current position status reports for the given parameters.
2276    ///
2277    /// # Position Modes
2278    ///
2279    /// OKX supports two position modes, which affects how position data is returned:
2280    ///
2281    /// ## Net Mode (One-way)
2282    /// - `posSide` field will be `"net"`
2283    /// - `pos` field uses **signed quantities**:
2284    ///   - Positive value = Long position
2285    ///   - Negative value = Short position
2286    ///   - Zero = Flat/no position
2287    ///
2288    /// ## Long/Short Mode (Hedge/Dual-side)
2289    /// - `posSide` field will be `"long"` or `"short"`
2290    /// - `pos` field is **always positive** (use `posSide` to determine actual side)
2291    /// - Allows holding simultaneous long and short positions on the same instrument
2292    /// - Position IDs are suffixed with `-LONG` or `-SHORT` for uniqueness
2293    ///
2294    /// # Errors
2295    ///
2296    /// Returns an error if the request fails.
2297    ///
2298    /// # References
2299    ///
2300    /// <https://www.okx.com/docs-v5/en/#trading-account-rest-api-get-positions>
2301    pub async fn request_position_status_reports(
2302        &self,
2303        account_id: AccountId,
2304        instrument_type: Option<OKXInstrumentType>,
2305        instrument_id: Option<InstrumentId>,
2306    ) -> anyhow::Result<Vec<PositionStatusReport>> {
2307        let mut params = GetPositionsParamsBuilder::default();
2308
2309        let instrument_type = if let Some(instrument_type) = instrument_type {
2310            instrument_type
2311        } else {
2312            let instrument_id = instrument_id.ok_or_else(|| {
2313                anyhow::anyhow!("Instrument ID required if `instrument_type` not provided")
2314            })?;
2315            let instrument = self
2316                .instrument_or_fetch(instrument_id.symbol.inner())
2317                .await?;
2318            okx_instrument_type(&instrument)?
2319        };
2320
2321        params.inst_type(instrument_type);
2322
2323        instrument_id
2324            .as_ref()
2325            .map(|i| params.inst_id(i.symbol.inner()));
2326
2327        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2328
2329        let resp = self
2330            .inner
2331            .http_get_positions(params)
2332            .await
2333            .map_err(|e| anyhow::anyhow!(e))?;
2334
2335        let ts_init = self.generate_ts_init();
2336        let mut reports = Vec::with_capacity(resp.len());
2337
2338        for position in resp {
2339            let inst = self.instrument_or_fetch(position.inst_id).await?;
2340
2341            let report = parse_position_status_report(
2342                position,
2343                account_id,
2344                inst.id(),
2345                inst.size_precision(),
2346                ts_init,
2347            )?;
2348            reports.push(report);
2349        }
2350
2351        Ok(reports)
2352    }
2353
2354    /// Places an algo order via HTTP.
2355    ///
2356    /// # Errors
2357    ///
2358    /// Returns an error if the request fails.
2359    ///
2360    /// # References
2361    ///
2362    /// <https://www.okx.com/docs-v5/en/#order-book-trading-algo-trading-post-place-algo-order>
2363    pub async fn place_algo_order(
2364        &self,
2365        request: OKXPlaceAlgoOrderRequest,
2366    ) -> Result<OKXPlaceAlgoOrderResponse, OKXHttpError> {
2367        let body =
2368            serde_json::to_vec(&request).map_err(|e| OKXHttpError::JsonError(e.to_string()))?;
2369
2370        let resp: Vec<OKXPlaceAlgoOrderResponse> = self
2371            .inner
2372            .send_request(Method::POST, "/api/v5/trade/order-algo", Some(body), true)
2373            .await?;
2374
2375        resp.into_iter()
2376            .next()
2377            .ok_or_else(|| OKXHttpError::ValidationError("Empty response".to_string()))
2378    }
2379
2380    /// Cancels an algo order via HTTP.
2381    ///
2382    /// # Errors
2383    ///
2384    /// Returns an error if the request fails.
2385    ///
2386    /// # References
2387    ///
2388    /// <https://www.okx.com/docs-v5/en/#order-book-trading-algo-trading-post-cancel-algo-order>
2389    pub async fn cancel_algo_order(
2390        &self,
2391        request: OKXCancelAlgoOrderRequest,
2392    ) -> Result<OKXCancelAlgoOrderResponse, OKXHttpError> {
2393        // OKX expects an array for cancel-algos endpoint
2394        // Serialize once to bytes to keep signing and sending identical
2395        let body =
2396            serde_json::to_vec(&[request]).map_err(|e| OKXHttpError::JsonError(e.to_string()))?;
2397
2398        let resp: Vec<OKXCancelAlgoOrderResponse> = self
2399            .inner
2400            .send_request(Method::POST, "/api/v5/trade/cancel-algos", Some(body), true)
2401            .await?;
2402
2403        resp.into_iter()
2404            .next()
2405            .ok_or_else(|| OKXHttpError::ValidationError("Empty response".to_string()))
2406    }
2407
2408    /// Places an algo order using domain types.
2409    ///
2410    /// This is a convenience method that accepts Nautilus domain types
2411    /// and builds the appropriate OKX request structure internally.
2412    ///
2413    /// # Errors
2414    ///
2415    /// Returns an error if the request fails.
2416    #[allow(clippy::too_many_arguments)]
2417    pub async fn place_algo_order_with_domain_types(
2418        &self,
2419        instrument_id: InstrumentId,
2420        td_mode: OKXTradeMode,
2421        client_order_id: ClientOrderId,
2422        order_side: OrderSide,
2423        order_type: OrderType,
2424        quantity: Quantity,
2425        trigger_price: Price,
2426        trigger_type: Option<TriggerType>,
2427        limit_price: Option<Price>,
2428        reduce_only: Option<bool>,
2429    ) -> Result<OKXPlaceAlgoOrderResponse, OKXHttpError> {
2430        if !matches!(order_side, OrderSide::Buy | OrderSide::Sell) {
2431            return Err(OKXHttpError::ValidationError(
2432                "Invalid order side".to_string(),
2433            ));
2434        }
2435        let okx_side: OKXSide = order_side.into();
2436
2437        // Map trigger type to OKX format
2438        let trigger_px_type_enum = trigger_type.map_or(OKXTriggerType::Last, Into::into);
2439
2440        // Determine order price based on order type
2441        let order_px = if matches!(order_type, OrderType::StopLimit | OrderType::LimitIfTouched) {
2442            limit_price.map(|p| p.to_string())
2443        } else {
2444            // Market orders use -1 to indicate market execution
2445            Some("-1".to_string())
2446        };
2447
2448        let request = OKXPlaceAlgoOrderRequest {
2449            inst_id: instrument_id.symbol.as_str().to_string(),
2450            td_mode,
2451            side: okx_side,
2452            ord_type: OKXAlgoOrderType::Trigger, // All conditional orders use 'trigger' type
2453            sz: quantity.to_string(),
2454            algo_cl_ord_id: Some(client_order_id.as_str().to_string()),
2455            trigger_px: Some(trigger_price.to_string()),
2456            order_px,
2457            trigger_px_type: Some(trigger_px_type_enum),
2458            tgt_ccy: None,  // Let OKX determine based on instrument
2459            pos_side: None, // Use default position side
2460            close_position: None,
2461            tag: Some(OKX_NAUTILUS_BROKER_ID.to_string()),
2462            reduce_only,
2463        };
2464
2465        self.place_algo_order(request).await
2466    }
2467
2468    /// Cancels an algo order using domain types.
2469    ///
2470    /// This is a convenience method that accepts Nautilus domain types
2471    /// and builds the appropriate OKX request structure internally.
2472    ///
2473    /// # Errors
2474    ///
2475    /// Returns an error if the request fails.
2476    pub async fn cancel_algo_order_with_domain_types(
2477        &self,
2478        instrument_id: InstrumentId,
2479        algo_id: String,
2480    ) -> Result<OKXCancelAlgoOrderResponse, OKXHttpError> {
2481        let request = OKXCancelAlgoOrderRequest {
2482            inst_id: instrument_id.symbol.to_string(),
2483            algo_id: Some(algo_id),
2484            algo_cl_ord_id: None,
2485        };
2486
2487        self.cancel_algo_order(request).await
2488    }
2489
2490    /// Requests algo order status reports.
2491    ///
2492    /// # Errors
2493    ///
2494    /// Returns an error if the request fails.
2495    #[allow(clippy::too_many_arguments)]
2496    pub async fn request_algo_order_status_reports(
2497        &self,
2498        account_id: AccountId,
2499        instrument_type: Option<OKXInstrumentType>,
2500        instrument_id: Option<InstrumentId>,
2501        algo_id: Option<String>,
2502        algo_client_order_id: Option<ClientOrderId>,
2503        state: Option<OKXOrderStatus>,
2504        limit: Option<u32>,
2505    ) -> anyhow::Result<Vec<OrderStatusReport>> {
2506        let mut instruments_cache: AHashMap<Ustr, InstrumentAny> = AHashMap::new();
2507
2508        let inst_type = if let Some(inst_type) = instrument_type {
2509            inst_type
2510        } else if let Some(inst_id) = instrument_id {
2511            let instrument = self.instrument_or_fetch(inst_id.symbol.inner()).await?;
2512            let inst_type = okx_instrument_type(&instrument)?;
2513            instruments_cache.insert(inst_id.symbol.inner(), instrument);
2514            inst_type
2515        } else {
2516            anyhow::bail!("instrument_type or instrument_id required for algo order query")
2517        };
2518
2519        let mut params_builder = GetAlgoOrdersParamsBuilder::default();
2520        params_builder.inst_type(inst_type);
2521        if let Some(inst_id) = instrument_id {
2522            params_builder.inst_id(inst_id.symbol.inner().to_string());
2523        }
2524        if let Some(algo_id) = algo_id.as_ref() {
2525            params_builder.algo_id(algo_id.clone());
2526        }
2527        if let Some(client_order_id) = algo_client_order_id.as_ref() {
2528            params_builder.algo_cl_ord_id(client_order_id.as_str().to_string());
2529        }
2530        if let Some(state) = state {
2531            params_builder.state(state);
2532        }
2533        if let Some(limit) = limit {
2534            params_builder.limit(limit);
2535        }
2536
2537        let params = params_builder
2538            .build()
2539            .map_err(|e| anyhow::anyhow!(format!("Failed to build algo order params: {e}")))?;
2540
2541        let ts_init = self.generate_ts_init();
2542        let mut reports = Vec::new();
2543        let mut seen: AHashSet<(String, String)> = AHashSet::new();
2544
2545        let pending = match self.inner.http_get_order_algo_pending(params.clone()).await {
2546            Ok(result) => result,
2547            Err(OKXHttpError::UnexpectedStatus { status, .. })
2548                if status == StatusCode::NOT_FOUND =>
2549            {
2550                Vec::new()
2551            }
2552            Err(e) => return Err(e.into()),
2553        };
2554        self.collect_algo_reports(
2555            account_id,
2556            &pending,
2557            &mut instruments_cache,
2558            ts_init,
2559            &mut seen,
2560            &mut reports,
2561        )
2562        .await?;
2563
2564        let history = match self.inner.http_get_order_algo_history(params).await {
2565            Ok(result) => result,
2566            Err(OKXHttpError::UnexpectedStatus { status, .. })
2567                if status == StatusCode::NOT_FOUND =>
2568            {
2569                Vec::new()
2570            }
2571            Err(e) => return Err(e.into()),
2572        };
2573        self.collect_algo_reports(
2574            account_id,
2575            &history,
2576            &mut instruments_cache,
2577            ts_init,
2578            &mut seen,
2579            &mut reports,
2580        )
2581        .await?;
2582
2583        Ok(reports)
2584    }
2585
2586    /// Requests an algo order status report by client order identifier.
2587    ///
2588    /// # Errors
2589    ///
2590    /// Returns an error if the request fails.
2591    pub async fn request_algo_order_status_report(
2592        &self,
2593        account_id: AccountId,
2594        instrument_id: InstrumentId,
2595        algo_client_order_id: ClientOrderId,
2596    ) -> anyhow::Result<Option<OrderStatusReport>> {
2597        let reports = self
2598            .request_algo_order_status_reports(
2599                account_id,
2600                None,
2601                Some(instrument_id),
2602                None,
2603                Some(algo_client_order_id),
2604                None,
2605                Some(50_u32),
2606            )
2607            .await?;
2608
2609        Ok(reports.into_iter().next())
2610    }
2611    async fn collect_algo_reports(
2612        &self,
2613        account_id: AccountId,
2614        orders: &[OKXOrderAlgo],
2615        instruments_cache: &mut AHashMap<Ustr, InstrumentAny>,
2616        ts_init: UnixNanos,
2617        seen: &mut AHashSet<(String, String)>,
2618        reports: &mut Vec<OrderStatusReport>,
2619    ) -> anyhow::Result<()> {
2620        for order in orders {
2621            let key = (order.algo_id.clone(), order.algo_cl_ord_id.clone());
2622            if !seen.insert(key) {
2623                continue;
2624            }
2625
2626            let instrument = if let Some(instrument) = instruments_cache.get(&order.inst_id) {
2627                instrument.clone()
2628            } else {
2629                let instrument = self.instrument_or_fetch(order.inst_id).await?;
2630                instruments_cache.insert(order.inst_id, instrument.clone());
2631                instrument
2632            };
2633
2634            let report = parse_http_algo_order(order, account_id, &instrument, ts_init)?;
2635            reports.push(report);
2636        }
2637
2638        Ok(())
2639    }
2640}
2641
2642fn parse_http_algo_order(
2643    order: &OKXOrderAlgo,
2644    account_id: AccountId,
2645    instrument: &InstrumentAny,
2646    ts_init: UnixNanos,
2647) -> anyhow::Result<OrderStatusReport> {
2648    let ord_px = if order.ord_px.is_empty() {
2649        "-1".to_string()
2650    } else {
2651        order.ord_px.clone()
2652    };
2653
2654    let reduce_only = if order.reduce_only.is_empty() {
2655        "false".to_string()
2656    } else {
2657        order.reduce_only.clone()
2658    };
2659
2660    let msg = OKXAlgoOrderMsg {
2661        algo_id: order.algo_id.clone(),
2662        algo_cl_ord_id: order.algo_cl_ord_id.clone(),
2663        cl_ord_id: order.cl_ord_id.clone(),
2664        ord_id: order.ord_id.clone(),
2665        inst_id: order.inst_id,
2666        inst_type: order.inst_type,
2667        ord_type: order.ord_type,
2668        state: order.state,
2669        side: order.side,
2670        pos_side: order.pos_side,
2671        sz: order.sz.clone(),
2672        trigger_px: order.trigger_px.clone(),
2673        trigger_px_type: order.trigger_px_type.unwrap_or(OKXTriggerType::None),
2674        ord_px,
2675        td_mode: order.td_mode,
2676        lever: order.lever.clone(),
2677        reduce_only,
2678        actual_px: order.actual_px.clone(),
2679        actual_sz: order.actual_sz.clone(),
2680        notional_usd: order.notional_usd.clone(),
2681        c_time: order.c_time,
2682        u_time: order.u_time,
2683        trigger_time: order.trigger_time.clone(),
2684        tag: order.tag.clone(),
2685    };
2686
2687    parse_algo_order_status_report(&msg, instrument, account_id, ts_init)
2688}