nautilus_okx/http/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides an ergonomic wrapper around the **OKX v5 REST API** –
17//! <https://www.okx.com/docs-v5/en/>.
18//!
19//! The core type exported by this module is [`OKXHttpClient`].  It offers an
20//! interface to all exchange endpoints currently required by NautilusTrader.
21//!
22//! Key responsibilities handled internally:
23//! • Request signing and header composition for private routes (HMAC-SHA256).
24//! • Rate-limiting based on the public OKX specification.
25//! • Deserialization of JSON payloads into domain models.
26//! • Conversion of raw exchange errors into the rich [`OKXHttpError`] enum.
27//!
28//! # Official Documentation
29//!
30//! | Endpoint                             | Reference                                              |
31//! |--------------------------------------|--------------------------------------------------------|
32//! | Market data                          | <https://www.okx.com/docs-v5/en/#rest-api-market-data> |
33//! | Account & positions                  | <https://www.okx.com/docs-v5/en/#rest-api-account>     |
34//! | Funding & asset balances             | <https://www.okx.com/docs-v5/en/#rest-api-funding>     |
35
36use std::{
37    collections::HashMap,
38    fmt::Debug,
39    num::NonZeroU32,
40    str::FromStr,
41    sync::{
42        Arc, LazyLock,
43        atomic::{AtomicBool, Ordering},
44    },
45};
46
47use ahash::{AHashMap, AHashSet};
48use chrono::{DateTime, Utc};
49use dashmap::DashMap;
50use nautilus_core::{
51    UnixNanos, consts::NAUTILUS_USER_AGENT, env::get_or_env_var, time::get_atomic_clock_realtime,
52};
53use nautilus_model::{
54    data::{Bar, BarType, IndexPriceUpdate, MarkPriceUpdate, TradeTick},
55    enums::{AggregationSource, BarAggregation, OrderSide, OrderType, TriggerType},
56    events::AccountState,
57    identifiers::{AccountId, ClientOrderId, InstrumentId},
58    instruments::{Instrument, InstrumentAny},
59    reports::{FillReport, OrderStatusReport, PositionStatusReport},
60    types::{Price, Quantity},
61};
62use nautilus_network::{
63    http::{HttpClient, Method, StatusCode, USER_AGENT},
64    ratelimiter::quota::Quota,
65    retry::{RetryConfig, RetryManager},
66};
67use rust_decimal::Decimal;
68use serde::{Deserialize, Serialize, de::DeserializeOwned};
69use tokio_util::sync::CancellationToken;
70use ustr::Ustr;
71
72use super::{
73    error::OKXHttpError,
74    models::{
75        OKXAccount, OKXCancelAlgoOrderRequest, OKXCancelAlgoOrderResponse, OKXFeeRate,
76        OKXIndexTicker, OKXMarkPrice, OKXOrderAlgo, OKXOrderHistory, OKXPlaceAlgoOrderRequest,
77        OKXPlaceAlgoOrderResponse, OKXPosition, OKXPositionHistory, OKXPositionTier, OKXServerTime,
78        OKXTransactionDetail,
79    },
80    query::{
81        GetAlgoOrdersParams, GetAlgoOrdersParamsBuilder, GetCandlesticksParams,
82        GetCandlesticksParamsBuilder, GetIndexTickerParams, GetIndexTickerParamsBuilder,
83        GetInstrumentsParams, GetInstrumentsParamsBuilder, GetMarkPriceParams,
84        GetMarkPriceParamsBuilder, GetOrderHistoryParams, GetOrderHistoryParamsBuilder,
85        GetOrderListParams, GetOrderListParamsBuilder, GetPositionTiersParams,
86        GetPositionsHistoryParams, GetPositionsParams, GetPositionsParamsBuilder,
87        GetTradeFeeParams, GetTradesParams, GetTradesParamsBuilder, GetTransactionDetailsParams,
88        GetTransactionDetailsParamsBuilder, SetPositionModeParams, SetPositionModeParamsBuilder,
89    },
90};
91use crate::{
92    common::{
93        consts::{OKX_HTTP_URL, OKX_NAUTILUS_BROKER_ID, should_retry_error_code},
94        credential::Credential,
95        enums::{
96            OKXAlgoOrderType, OKXContractType, OKXInstrumentStatus, OKXInstrumentType,
97            OKXOrderStatus, OKXPositionMode, OKXSide, OKXTradeMode, OKXTriggerType,
98        },
99        models::OKXInstrument,
100        parse::{
101            okx_instrument_type, okx_instrument_type_from_symbol, parse_account_state,
102            parse_candlestick, parse_fill_report, parse_index_price_update, parse_instrument_any,
103            parse_mark_price_update, parse_order_status_report, parse_position_status_report,
104            parse_spot_margin_position_from_balance, parse_trade_tick,
105        },
106    },
107    http::{
108        models::{OKXCandlestick, OKXTrade},
109        query::GetOrderParams,
110    },
111    websocket::{messages::OKXAlgoOrderMsg, parse::parse_algo_order_status_report},
112};
113
114const OKX_SUCCESS_CODE: &str = "0";
115
116/// Default OKX REST API rate limit: 500 requests per 2 seconds.
117///
118/// - Sub-account order limit: 1000 requests per 2 seconds.
119/// - Account balance: 10 requests per 2 seconds.
120/// - Account instruments: 20 requests per 2 seconds.
121///
122/// We use a conservative 250 requests per second (500 per 2 seconds) as a general limit
123/// that should accommodate most use cases while respecting OKX's documented limits.
124pub static OKX_REST_QUOTA: LazyLock<Quota> =
125    LazyLock::new(|| Quota::per_second(NonZeroU32::new(250).unwrap()));
126
127const OKX_GLOBAL_RATE_KEY: &str = "okx:global";
128
129/// Represents an OKX HTTP response.
130#[derive(Debug, Serialize, Deserialize)]
131pub struct OKXResponse<T> {
132    /// The OKX response code, which is `"0"` for success.
133    pub code: String,
134    /// A message string which can be informational or describe an error cause.
135    pub msg: String,
136    /// The typed data returned by the OKX endpoint.
137    pub data: Vec<T>,
138}
139
140/// Provides a raw HTTP client for interacting with the [OKX](https://okx.com) REST API.
141///
142/// This client wraps the underlying [`HttpClient`] to handle functionality
143/// specific to OKX, such as request signing (for authenticated endpoints),
144/// forming request URLs, and deserializing responses into OKX specific data models.
145pub struct OKXRawHttpClient {
146    base_url: String,
147    client: HttpClient,
148    credential: Option<Credential>,
149    retry_manager: RetryManager<OKXHttpError>,
150    cancellation_token: CancellationToken,
151    is_demo: bool,
152}
153
154impl Default for OKXRawHttpClient {
155    fn default() -> Self {
156        Self::new(None, Some(60), None, None, None, false, None)
157            .expect("Failed to create default OKXRawHttpClient")
158    }
159}
160
161impl Debug for OKXRawHttpClient {
162    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
163        let credential = self.credential.as_ref().map(|_| "<redacted>");
164        f.debug_struct(stringify!(OKXRawHttpClient))
165            .field("base_url", &self.base_url)
166            .field("credential", &credential)
167            .finish_non_exhaustive()
168    }
169}
170
171impl OKXRawHttpClient {
172    fn rate_limiter_quotas() -> Vec<(String, Quota)> {
173        vec![
174            (OKX_GLOBAL_RATE_KEY.to_string(), *OKX_REST_QUOTA),
175            (
176                "okx:/api/v5/account/balance".to_string(),
177                Quota::per_second(NonZeroU32::new(5).unwrap()),
178            ),
179            (
180                "okx:/api/v5/public/instruments".to_string(),
181                Quota::per_second(NonZeroU32::new(10).unwrap()),
182            ),
183            (
184                "okx:/api/v5/market/candles".to_string(),
185                Quota::per_second(NonZeroU32::new(50).unwrap()),
186            ),
187            (
188                "okx:/api/v5/market/history-candles".to_string(),
189                Quota::per_second(NonZeroU32::new(20).unwrap()),
190            ),
191            (
192                "okx:/api/v5/market/history-trades".to_string(),
193                Quota::per_second(NonZeroU32::new(30).unwrap()),
194            ),
195            (
196                "okx:/api/v5/trade/order".to_string(),
197                Quota::per_second(NonZeroU32::new(30).unwrap()), // 60 requests / 2 seconds (per instrument)
198            ),
199            (
200                "okx:/api/v5/trade/orders-pending".to_string(),
201                Quota::per_second(NonZeroU32::new(20).unwrap()),
202            ),
203            (
204                "okx:/api/v5/trade/orders-history".to_string(),
205                Quota::per_second(NonZeroU32::new(20).unwrap()),
206            ),
207            (
208                "okx:/api/v5/trade/fills".to_string(),
209                Quota::per_second(NonZeroU32::new(30).unwrap()),
210            ),
211            (
212                "okx:/api/v5/trade/order-algo".to_string(),
213                Quota::per_second(NonZeroU32::new(10).unwrap()),
214            ),
215            (
216                "okx:/api/v5/trade/cancel-algos".to_string(),
217                Quota::per_second(NonZeroU32::new(10).unwrap()),
218            ),
219        ]
220    }
221
222    fn rate_limit_keys(endpoint: &str) -> Vec<Ustr> {
223        let normalized = endpoint.split('?').next().unwrap_or(endpoint);
224        let route = format!("okx:{normalized}");
225
226        vec![Ustr::from(OKX_GLOBAL_RATE_KEY), Ustr::from(route.as_str())]
227    }
228
229    /// Cancel all pending HTTP requests.
230    pub fn cancel_all_requests(&self) {
231        self.cancellation_token.cancel();
232    }
233
234    /// Get the cancellation token for this client.
235    pub fn cancellation_token(&self) -> &CancellationToken {
236        &self.cancellation_token
237    }
238
239    /// Creates a new [`OKXHttpClient`] using the default OKX HTTP URL,
240    /// optionally overridden with a custom base URL.
241    ///
242    /// This version of the client has **no credentials**, so it can only
243    /// call publicly accessible endpoints.
244    ///
245    /// # Errors
246    ///
247    /// Returns an error if the retry manager cannot be created.
248    pub fn new(
249        base_url: Option<String>,
250        timeout_secs: Option<u64>,
251        max_retries: Option<u32>,
252        retry_delay_ms: Option<u64>,
253        retry_delay_max_ms: Option<u64>,
254        is_demo: bool,
255        proxy_url: Option<String>,
256    ) -> Result<Self, OKXHttpError> {
257        let retry_config = RetryConfig {
258            max_retries: max_retries.unwrap_or(3),
259            initial_delay_ms: retry_delay_ms.unwrap_or(1000),
260            max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
261            backoff_factor: 2.0,
262            jitter_ms: 1000,
263            operation_timeout_ms: Some(60_000),
264            immediate_first: false,
265            max_elapsed_ms: Some(180_000),
266        };
267
268        let retry_manager = RetryManager::new(retry_config);
269
270        Ok(Self {
271            base_url: base_url.unwrap_or(OKX_HTTP_URL.to_string()),
272            client: HttpClient::new(
273                Self::default_headers(is_demo),
274                vec![],
275                Self::rate_limiter_quotas(),
276                Some(*OKX_REST_QUOTA),
277                timeout_secs,
278                proxy_url,
279            )
280            .map_err(|e| {
281                OKXHttpError::ValidationError(format!("Failed to create HTTP client: {e}"))
282            })?,
283            credential: None,
284            retry_manager,
285            cancellation_token: CancellationToken::new(),
286            is_demo,
287        })
288    }
289
290    /// Creates a new [`OKXHttpClient`] configured with credentials
291    /// for authenticated requests, optionally using a custom base URL.
292    ///
293    /// # Errors
294    ///
295    /// Returns an error if the retry manager cannot be created.
296    #[allow(clippy::too_many_arguments)]
297    pub fn with_credentials(
298        api_key: String,
299        api_secret: String,
300        api_passphrase: String,
301        base_url: String,
302        timeout_secs: Option<u64>,
303        max_retries: Option<u32>,
304        retry_delay_ms: Option<u64>,
305        retry_delay_max_ms: Option<u64>,
306        is_demo: bool,
307        proxy_url: Option<String>,
308    ) -> Result<Self, OKXHttpError> {
309        let retry_config = RetryConfig {
310            max_retries: max_retries.unwrap_or(3),
311            initial_delay_ms: retry_delay_ms.unwrap_or(1000),
312            max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
313            backoff_factor: 2.0,
314            jitter_ms: 1000,
315            operation_timeout_ms: Some(60_000),
316            immediate_first: false,
317            max_elapsed_ms: Some(180_000),
318        };
319
320        let retry_manager = RetryManager::new(retry_config);
321
322        Ok(Self {
323            base_url,
324            client: HttpClient::new(
325                Self::default_headers(is_demo),
326                vec![],
327                Self::rate_limiter_quotas(),
328                Some(*OKX_REST_QUOTA),
329                timeout_secs,
330                proxy_url,
331            )
332            .map_err(|e| {
333                OKXHttpError::ValidationError(format!("Failed to create HTTP client: {e}"))
334            })?,
335            credential: Some(Credential::new(api_key, api_secret, api_passphrase)),
336            retry_manager,
337            cancellation_token: CancellationToken::new(),
338            is_demo,
339        })
340    }
341
342    /// Builds the default headers to include with each request (e.g., `User-Agent`).
343    fn default_headers(is_demo: bool) -> HashMap<String, String> {
344        let mut headers =
345            HashMap::from([(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())]);
346
347        if is_demo {
348            headers.insert("x-simulated-trading".to_string(), "1".to_string());
349        }
350
351        headers
352    }
353
354    /// Signs an OKX request with timestamp, API key, passphrase, and signature.
355    ///
356    /// # Errors
357    ///
358    /// Returns [`OKXHttpError::MissingCredentials`] if no credentials are set
359    /// but the request requires authentication.
360    fn sign_request(
361        &self,
362        method: &Method,
363        path: &str,
364        body: Option<&[u8]>,
365    ) -> Result<HashMap<String, String>, OKXHttpError> {
366        let credential = match self.credential.as_ref() {
367            Some(c) => c,
368            None => return Err(OKXHttpError::MissingCredentials),
369        };
370
371        let api_key = credential.api_key.to_string();
372        let api_passphrase = credential.api_passphrase.clone();
373
374        // OKX requires milliseconds in the timestamp (ISO 8601 with milliseconds)
375        let now = Utc::now();
376        let millis = now.timestamp_subsec_millis();
377        let timestamp = now.format("%Y-%m-%dT%H:%M:%S").to_string() + &format!(".{millis:03}Z");
378        let signature = credential.sign_bytes(&timestamp, method.as_str(), path, body);
379
380        let mut headers = HashMap::new();
381        headers.insert("OK-ACCESS-KEY".to_string(), api_key);
382        headers.insert("OK-ACCESS-PASSPHRASE".to_string(), api_passphrase);
383        headers.insert("OK-ACCESS-TIMESTAMP".to_string(), timestamp);
384        headers.insert("OK-ACCESS-SIGN".to_string(), signature);
385
386        Ok(headers)
387    }
388
389    /// Sends an HTTP request to OKX and parses the response into `Vec<T>`.
390    ///
391    /// Internally, this method handles:
392    /// - Building the URL from `base_url` + `path`.
393    /// - Optionally signing the request.
394    /// - Deserializing JSON responses into typed models, or returning a [`OKXHttpError`].
395    /// - Retrying with exponential backoff on transient errors.
396    ///
397    /// # Errors
398    ///
399    /// Returns an error if:
400    /// - The HTTP request fails.
401    /// - Authentication is required but credentials are missing.
402    /// - The response cannot be deserialized into the expected type.
403    /// - The OKX API returns an error response.
404    async fn send_request<T: DeserializeOwned, P: Serialize>(
405        &self,
406        method: Method,
407        path: &str,
408        params: Option<&P>,
409        body: Option<Vec<u8>>,
410        authenticate: bool,
411    ) -> Result<Vec<T>, OKXHttpError> {
412        let url = format!("{}{path}", self.base_url);
413
414        // Pre-compute rate limit keys once outside the retry closure
415        let rate_keys: Vec<String> = Self::rate_limit_keys(path)
416            .into_iter()
417            .map(|k| k.to_string())
418            .collect();
419
420        let operation = || {
421            let url = url.clone();
422            let method = method.clone();
423            let body = body.clone();
424            let rate_keys = rate_keys.clone();
425
426            async move {
427                // Serialize params to query string for signing (if needed)
428                let query_string = if let Some(p) = params {
429                    serde_urlencoded::to_string(p).map_err(|e| {
430                        OKXHttpError::JsonError(format!("Failed to serialize params: {e}"))
431                    })?
432                } else {
433                    String::new()
434                };
435
436                // Build full path with query string for signing
437                let full_path = if query_string.is_empty() {
438                    path.to_string()
439                } else {
440                    format!("{path}?{query_string}")
441                };
442
443                let mut headers = if authenticate {
444                    self.sign_request(&method, &full_path, body.as_deref())?
445                } else {
446                    HashMap::new()
447                };
448
449                // Always set Content-Type header when body is present
450                if body.is_some() {
451                    headers.insert("Content-Type".to_string(), "application/json".to_string());
452                }
453
454                let resp = self
455                    .client
456                    .request_with_params(
457                        method.clone(),
458                        url,
459                        params,
460                        Some(headers),
461                        body,
462                        None,
463                        Some(rate_keys),
464                    )
465                    .await?;
466
467                log::trace!("Response: {resp:?}");
468
469                if resp.status.is_success() {
470                    let okx_response: OKXResponse<T> =
471                        serde_json::from_slice(&resp.body).map_err(|e| {
472                            log::error!("Failed to deserialize OKXResponse: {e}");
473                            OKXHttpError::JsonError(e.to_string())
474                        })?;
475
476                    if okx_response.code != OKX_SUCCESS_CODE {
477                        return Err(OKXHttpError::OkxError {
478                            error_code: okx_response.code,
479                            message: okx_response.msg,
480                        });
481                    }
482
483                    Ok(okx_response.data)
484                } else {
485                    let error_body = String::from_utf8_lossy(&resp.body);
486                    if resp.status.as_u16() == StatusCode::NOT_FOUND.as_u16() {
487                        log::debug!("HTTP 404 with body: {error_body}");
488                    } else {
489                        log::error!(
490                            "HTTP error {} with body: {error_body}",
491                            resp.status.as_str()
492                        );
493                    }
494
495                    if let Ok(parsed_error) = serde_json::from_slice::<OKXResponse<T>>(&resp.body) {
496                        return Err(OKXHttpError::OkxError {
497                            error_code: parsed_error.code,
498                            message: parsed_error.msg,
499                        });
500                    }
501
502                    Err(OKXHttpError::UnexpectedStatus {
503                        status: StatusCode::from_u16(resp.status.as_u16()).unwrap(),
504                        body: error_body.to_string(),
505                    })
506                }
507            }
508        };
509
510        // Retry strategy based on OKX error responses and HTTP status codes:
511        //
512        // 1. Network errors: always retry (transient connection issues)
513        // 2. HTTP 5xx/429: server errors and rate limiting should be retried
514        // 3. OKX specific retryable error codes (defined in common::consts)
515        //
516        // Note: OKX returns many permanent errors which should NOT be retried
517        // (e.g., "Invalid instrument", "Insufficient balance", "Invalid API Key")
518        let should_retry = |error: &OKXHttpError| -> bool {
519            match error {
520                OKXHttpError::HttpClientError(_) => true,
521                OKXHttpError::UnexpectedStatus { status, .. } => {
522                    status.as_u16() >= 500 || status.as_u16() == 429
523                }
524                OKXHttpError::OkxError { error_code, .. } => should_retry_error_code(error_code),
525                _ => false,
526            }
527        };
528
529        let create_error = |msg: String| -> OKXHttpError {
530            if msg == "canceled" {
531                OKXHttpError::Canceled("Adapter disconnecting or shutting down".to_string())
532            } else {
533                OKXHttpError::ValidationError(msg)
534            }
535        };
536
537        self.retry_manager
538            .execute_with_retry_with_cancel(
539                path,
540                operation,
541                should_retry,
542                create_error,
543                &self.cancellation_token,
544            )
545            .await
546    }
547
548    /// Sets the position mode for an account.
549    ///
550    /// # Errors
551    ///
552    /// Returns an error if JSON serialization of `params` fails, if the HTTP
553    /// request fails, or if the response body cannot be deserialized.
554    ///
555    /// # References
556    ///
557    /// <https://www.okx.com/docs-v5/en/#trading-account-rest-api-set-position-mode>
558    pub async fn set_position_mode(
559        &self,
560        params: SetPositionModeParams,
561    ) -> Result<Vec<serde_json::Value>, OKXHttpError> {
562        let path = "/api/v5/account/set-position-mode";
563        let body = serde_json::to_vec(&params)?;
564        self.send_request::<_, ()>(Method::POST, path, None, Some(body), true)
565            .await
566    }
567
568    /// Requests position tiers information, maximum leverage depends on your borrowings and margin ratio.
569    ///
570    /// # Errors
571    ///
572    /// Returns an error if the HTTP request fails, authentication is rejected
573    /// or the response cannot be deserialized.
574    ///
575    /// # References
576    ///
577    /// <https://www.okx.com/docs-v5/en/#public-data-rest-api-get-position-tiers>
578    pub async fn get_position_tiers(
579        &self,
580        params: GetPositionTiersParams,
581    ) -> Result<Vec<OKXPositionTier>, OKXHttpError> {
582        self.send_request(
583            Method::GET,
584            "/api/v5/public/position-tiers",
585            Some(&params),
586            None,
587            false,
588        )
589        .await
590    }
591
592    /// Requests a list of instruments with open contracts.
593    ///
594    /// # Errors
595    ///
596    /// Returns an error if JSON serialization of `params` fails, if the HTTP
597    /// request fails, or if the response body cannot be deserialized.
598    ///
599    /// # References
600    ///
601    /// <https://www.okx.com/docs-v5/en/#public-data-rest-api-get-instruments>
602    pub async fn get_instruments(
603        &self,
604        params: GetInstrumentsParams,
605    ) -> Result<Vec<OKXInstrument>, OKXHttpError> {
606        self.send_request(
607            Method::GET,
608            "/api/v5/public/instruments",
609            Some(&params),
610            None,
611            false,
612        )
613        .await
614    }
615
616    /// Requests the current server time from OKX.
617    ///
618    /// Retrieves the OKX system time in Unix timestamp (milliseconds). This is useful for
619    /// synchronizing local clocks with the exchange server and logging time drift.
620    ///
621    /// # Errors
622    ///
623    /// Returns an error if the HTTP request fails or if the response body
624    /// cannot be parsed into [`OKXServerTime`].
625    ///
626    /// # References
627    ///
628    /// <https://www.okx.com/docs-v5/en/#public-data-rest-api-get-system-time>
629    pub async fn get_server_time(&self) -> Result<u64, OKXHttpError> {
630        let response: Vec<OKXServerTime> = self
631            .send_request::<_, ()>(Method::GET, "/api/v5/public/time", None, None, false)
632            .await?;
633        response
634            .first()
635            .map(|t| t.ts)
636            .ok_or_else(|| OKXHttpError::JsonError("Empty server time response".to_string()))
637    }
638
639    /// Requests a mark price.
640    ///
641    /// We set the mark price based on the SPOT index and at a reasonable basis to prevent individual
642    /// users from manipulating the market and causing the contract price to fluctuate.
643    ///
644    /// # Errors
645    ///
646    /// Returns an error if the HTTP request fails or if the response body
647    /// cannot be parsed into [`OKXMarkPrice`].
648    ///
649    /// # References
650    ///
651    /// <https://www.okx.com/docs-v5/en/#public-data-rest-api-get-mark-price>
652    pub async fn get_mark_price(
653        &self,
654        params: GetMarkPriceParams,
655    ) -> Result<Vec<OKXMarkPrice>, OKXHttpError> {
656        self.send_request(
657            Method::GET,
658            "/api/v5/public/mark-price",
659            Some(&params),
660            None,
661            false,
662        )
663        .await
664    }
665
666    /// Requests the latest index price.
667    ///
668    /// # Errors
669    ///
670    /// Returns an error if the operation fails.
671    ///
672    /// # References
673    ///
674    /// <https://www.okx.com/docs-v5/en/#public-data-rest-api-get-index-tickers>
675    pub async fn get_index_tickers(
676        &self,
677        params: GetIndexTickerParams,
678    ) -> Result<Vec<OKXIndexTicker>, OKXHttpError> {
679        self.send_request(
680            Method::GET,
681            "/api/v5/market/index-tickers",
682            Some(&params),
683            None,
684            false,
685        )
686        .await
687    }
688
689    /// Requests trades history.
690    ///
691    /// # Errors
692    ///
693    /// Returns an error if the operation fails.
694    ///
695    /// # References
696    ///
697    /// <https://www.okx.com/docs-v5/en/#order-book-trading-market-data-get-trades-history>
698    pub async fn get_history_trades(
699        &self,
700        params: GetTradesParams,
701    ) -> Result<Vec<OKXTrade>, OKXHttpError> {
702        self.send_request(
703            Method::GET,
704            "/api/v5/market/history-trades",
705            Some(&params),
706            None,
707            false,
708        )
709        .await
710    }
711
712    /// Requests recent candlestick data.
713    ///
714    /// # Errors
715    ///
716    /// Returns an error if the operation fails.
717    ///
718    /// # References
719    ///
720    /// <https://www.okx.com/docs-v5/en/#order-book-trading-market-data-get-candlesticks>
721    pub async fn get_candles(
722        &self,
723        params: GetCandlesticksParams,
724    ) -> Result<Vec<OKXCandlestick>, OKXHttpError> {
725        self.send_request(
726            Method::GET,
727            "/api/v5/market/candles",
728            Some(&params),
729            None,
730            false,
731        )
732        .await
733    }
734
735    /// Requests historical candlestick data.
736    ///
737    /// # Errors
738    ///
739    /// Returns an error if the operation fails.
740    ///
741    /// # References
742    ///
743    /// <https://www.okx.com/docs-v5/en/#order-book-trading-market-data-get-candlesticks-history>
744    pub async fn get_history_candles(
745        &self,
746        params: GetCandlesticksParams,
747    ) -> Result<Vec<OKXCandlestick>, OKXHttpError> {
748        self.send_request(
749            Method::GET,
750            "/api/v5/market/history-candles",
751            Some(&params),
752            None,
753            false,
754        )
755        .await
756    }
757
758    /// Requests a list of assets (with non-zero balance), remaining balance, and available amount
759    /// in the trading account.
760    ///
761    /// # Errors
762    ///
763    /// Returns an error if the operation fails.
764    ///
765    /// # References
766    ///
767    /// <https://www.okx.com/docs-v5/en/#trading-account-rest-api-get-balance>
768    pub async fn get_balance(&self) -> Result<Vec<OKXAccount>, OKXHttpError> {
769        let path = "/api/v5/account/balance";
770        self.send_request::<_, ()>(Method::GET, path, None, None, true)
771            .await
772    }
773
774    /// Requests fee rates for the account.
775    ///
776    /// Returns fee rates for the specified instrument type and the user's VIP level.
777    ///
778    /// # Errors
779    ///
780    /// Returns an error if the operation fails.
781    ///
782    /// # References
783    ///
784    /// <https://www.okx.com/docs-v5/en/#trading-account-rest-api-get-fee-rates>
785    pub async fn get_trade_fee(
786        &self,
787        params: GetTradeFeeParams,
788    ) -> Result<Vec<OKXFeeRate>, OKXHttpError> {
789        self.send_request(
790            Method::GET,
791            "/api/v5/account/trade-fee",
792            Some(&params),
793            None,
794            true,
795        )
796        .await
797    }
798
799    /// Retrieves a single order’s details.
800    ///
801    /// # Errors
802    ///
803    /// Returns an error if the operation fails.
804    ///
805    /// # References
806    ///
807    /// <https://www.okx.com/docs-v5/en/#order-book-trading-trade-get-order>
808    pub async fn get_order(
809        &self,
810        params: GetOrderParams,
811    ) -> Result<Vec<OKXOrderHistory>, OKXHttpError> {
812        self.send_request(
813            Method::GET,
814            "/api/v5/trade/order",
815            Some(&params),
816            None,
817            true,
818        )
819        .await
820    }
821
822    /// Requests order list (pending orders).
823    ///
824    /// # Errors
825    ///
826    /// Returns an error if the operation fails.
827    ///
828    /// # References
829    ///
830    /// <https://www.okx.com/docs-v5/en/#order-book-trading-trade-get-order-list>
831    pub async fn get_orders_pending(
832        &self,
833        params: GetOrderListParams,
834    ) -> Result<Vec<OKXOrderHistory>, OKXHttpError> {
835        self.send_request(
836            Method::GET,
837            "/api/v5/trade/orders-pending",
838            Some(&params),
839            None,
840            true,
841        )
842        .await
843    }
844
845    /// Requests historical order records.
846    ///
847    /// # Errors
848    ///
849    /// Returns an error if the operation fails.
850    ///
851    /// # References
852    ///
853    /// <https://www.okx.com/docs-v5/en/#order-book-trading-trade-get-orders-history>
854    pub async fn get_orders_history(
855        &self,
856        params: GetOrderHistoryParams,
857    ) -> Result<Vec<OKXOrderHistory>, OKXHttpError> {
858        self.send_request(
859            Method::GET,
860            "/api/v5/trade/orders-history",
861            Some(&params),
862            None,
863            true,
864        )
865        .await
866    }
867
868    /// Requests pending algo orders.
869    ///
870    /// # Errors
871    ///
872    /// Returns an error if the operation fails.
873    pub async fn get_order_algo_pending(
874        &self,
875        params: GetAlgoOrdersParams,
876    ) -> Result<Vec<OKXOrderAlgo>, OKXHttpError> {
877        self.send_request(
878            Method::GET,
879            "/api/v5/trade/order-algo-pending",
880            Some(&params),
881            None,
882            true,
883        )
884        .await
885    }
886
887    /// Requests historical algo orders.
888    ///
889    /// # Errors
890    ///
891    /// Returns an error if the operation fails.
892    pub async fn get_order_algo_history(
893        &self,
894        params: GetAlgoOrdersParams,
895    ) -> Result<Vec<OKXOrderAlgo>, OKXHttpError> {
896        self.send_request(
897            Method::GET,
898            "/api/v5/trade/order-algo-history",
899            Some(&params),
900            None,
901            true,
902        )
903        .await
904    }
905
906    /// Requests transaction details (fills) for the given parameters.
907    ///
908    /// # Errors
909    ///
910    /// Returns an error if the operation fails.
911    ///
912    /// # References
913    ///
914    /// <https://www.okx.com/docs-v5/en/#order-book-trading-trade-get-transaction-details-last-3-days>
915    pub async fn get_fills(
916        &self,
917        params: GetTransactionDetailsParams,
918    ) -> Result<Vec<OKXTransactionDetail>, OKXHttpError> {
919        self.send_request(
920            Method::GET,
921            "/api/v5/trade/fills",
922            Some(&params),
923            None,
924            true,
925        )
926        .await
927    }
928
929    /// Requests information on your positions. When the account is in net mode, net positions will
930    /// be displayed, and when the account is in long/short mode, long or short positions will be
931    /// displayed. Returns in reverse chronological order using ctime.
932    ///
933    /// # Errors
934    ///
935    /// Returns an error if the operation fails.
936    ///
937    /// # References
938    ///
939    /// <https://www.okx.com/docs-v5/en/#trading-account-rest-api-get-positions>
940    pub async fn get_positions(
941        &self,
942        params: GetPositionsParams,
943    ) -> Result<Vec<OKXPosition>, OKXHttpError> {
944        self.send_request(
945            Method::GET,
946            "/api/v5/account/positions",
947            Some(&params),
948            None,
949            true,
950        )
951        .await
952    }
953
954    /// Requests closed or historical position data.
955    ///
956    /// # Errors
957    ///
958    /// Returns an error if the operation fails.
959    ///
960    /// # References
961    ///
962    /// <https://www.okx.com/docs-v5/en/#trading-account-rest-api-get-positions-history>
963    pub async fn get_positions_history(
964        &self,
965        params: GetPositionsHistoryParams,
966    ) -> Result<Vec<OKXPositionHistory>, OKXHttpError> {
967        self.send_request(
968            Method::GET,
969            "/api/v5/account/positions-history",
970            Some(&params),
971            None,
972            true,
973        )
974        .await
975    }
976}
977
978/// Provides a higher-level HTTP client for the [OKX](https://okx.com) REST API.
979///
980/// This client wraps the underlying `OKXHttpInnerClient` to handle conversions
981/// into the Nautilus domain model.
982#[derive(Debug)]
983#[cfg_attr(
984    feature = "python",
985    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.okx")
986)]
987pub struct OKXHttpClient {
988    pub(crate) inner: Arc<OKXRawHttpClient>,
989    pub(crate) instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
990    cache_initialized: AtomicBool,
991}
992
993impl Clone for OKXHttpClient {
994    fn clone(&self) -> Self {
995        let cache_initialized = AtomicBool::new(false);
996
997        let is_initialized = self.cache_initialized.load(Ordering::Acquire);
998        if is_initialized {
999            cache_initialized.store(true, Ordering::Release);
1000        }
1001
1002        Self {
1003            inner: self.inner.clone(),
1004            instruments_cache: self.instruments_cache.clone(),
1005            cache_initialized,
1006        }
1007    }
1008}
1009
1010impl Default for OKXHttpClient {
1011    fn default() -> Self {
1012        Self::new(None, Some(60), None, None, None, false, None)
1013            .expect("Failed to create default OKXHttpClient")
1014    }
1015}
1016
1017impl OKXHttpClient {
1018    /// Creates a new [`OKXHttpClient`] using the default OKX HTTP URL,
1019    /// optionally overridden with a custom base url.
1020    ///
1021    /// This version of the client has **no credentials**, so it can only
1022    /// call publicly accessible endpoints.
1023    ///
1024    /// # Errors
1025    ///
1026    /// Returns an error if the retry manager cannot be created.
1027    pub fn new(
1028        base_url: Option<String>,
1029        timeout_secs: Option<u64>,
1030        max_retries: Option<u32>,
1031        retry_delay_ms: Option<u64>,
1032        retry_delay_max_ms: Option<u64>,
1033        is_demo: bool,
1034        proxy_url: Option<String>,
1035    ) -> anyhow::Result<Self> {
1036        Ok(Self {
1037            inner: Arc::new(OKXRawHttpClient::new(
1038                base_url,
1039                timeout_secs,
1040                max_retries,
1041                retry_delay_ms,
1042                retry_delay_max_ms,
1043                is_demo,
1044                proxy_url,
1045            )?),
1046            instruments_cache: Arc::new(DashMap::new()),
1047            cache_initialized: AtomicBool::new(false),
1048        })
1049    }
1050
1051    /// Generates a timestamp for initialization.
1052    fn generate_ts_init(&self) -> UnixNanos {
1053        get_atomic_clock_realtime().get_time_ns()
1054    }
1055
1056    /// Creates a new authenticated [`OKXHttpClient`] using environment variables and
1057    /// the default OKX HTTP base url.
1058    ///
1059    /// # Errors
1060    ///
1061    /// Returns an error if the operation fails.
1062    pub fn from_env() -> anyhow::Result<Self> {
1063        Self::with_credentials(None, None, None, None, None, None, None, None, false, None)
1064    }
1065
1066    /// Creates a new [`OKXHttpClient`] configured with credentials
1067    /// for authenticated requests, optionally using a custom base url.
1068    ///
1069    /// # Errors
1070    ///
1071    /// Returns an error if the operation fails.
1072    #[allow(clippy::too_many_arguments)]
1073    pub fn with_credentials(
1074        api_key: Option<String>,
1075        api_secret: Option<String>,
1076        api_passphrase: Option<String>,
1077        base_url: Option<String>,
1078        timeout_secs: Option<u64>,
1079        max_retries: Option<u32>,
1080        retry_delay_ms: Option<u64>,
1081        retry_delay_max_ms: Option<u64>,
1082        is_demo: bool,
1083        proxy_url: Option<String>,
1084    ) -> anyhow::Result<Self> {
1085        let api_key = get_or_env_var(api_key, "OKX_API_KEY")?;
1086        let api_secret = get_or_env_var(api_secret, "OKX_API_SECRET")?;
1087        let api_passphrase = get_or_env_var(api_passphrase, "OKX_API_PASSPHRASE")?;
1088        let base_url = base_url.unwrap_or(OKX_HTTP_URL.to_string());
1089
1090        Ok(Self {
1091            inner: Arc::new(OKXRawHttpClient::with_credentials(
1092                api_key,
1093                api_secret,
1094                api_passphrase,
1095                base_url,
1096                timeout_secs,
1097                max_retries,
1098                retry_delay_ms,
1099                retry_delay_max_ms,
1100                is_demo,
1101                proxy_url,
1102            )?),
1103            instruments_cache: Arc::new(DashMap::new()),
1104            cache_initialized: AtomicBool::new(false),
1105        })
1106    }
1107
1108    /// Retrieves an instrument from the cache.
1109    ///
1110    /// # Errors
1111    ///
1112    /// Returns an error if the instrument is not found in the cache.
1113    fn instrument_from_cache(&self, symbol: Ustr) -> anyhow::Result<InstrumentAny> {
1114        self.instruments_cache
1115            .get(&symbol)
1116            .map(|entry| entry.value().clone())
1117            .ok_or_else(|| anyhow::anyhow!("Instrument {symbol} not in cache"))
1118    }
1119
1120    /// Cancel all pending HTTP requests.
1121    pub fn cancel_all_requests(&self) {
1122        self.inner.cancel_all_requests();
1123    }
1124
1125    /// Get the cancellation token for this client.
1126    pub fn cancellation_token(&self) -> &CancellationToken {
1127        self.inner.cancellation_token()
1128    }
1129
1130    /// Returns the base url being used by the client.
1131    pub fn base_url(&self) -> &str {
1132        self.inner.base_url.as_str()
1133    }
1134
1135    /// Returns the public API key being used by the client.
1136    pub fn api_key(&self) -> Option<&str> {
1137        self.inner.credential.as_ref().map(|c| c.api_key.as_str())
1138    }
1139
1140    /// Returns a masked version of the API key for logging purposes.
1141    #[must_use]
1142    pub fn api_key_masked(&self) -> Option<String> {
1143        self.inner.credential.as_ref().map(|c| c.api_key_masked())
1144    }
1145
1146    /// Returns whether the client is configured for demo trading.
1147    #[must_use]
1148    pub fn is_demo(&self) -> bool {
1149        self.inner.is_demo
1150    }
1151
1152    /// Requests the current server time from OKX.
1153    ///
1154    /// Returns the OKX system time as a Unix timestamp in milliseconds.
1155    ///
1156    /// # Errors
1157    ///
1158    /// Returns an error if the HTTP request fails or if the response cannot be parsed.
1159    pub async fn get_server_time(&self) -> Result<u64, OKXHttpError> {
1160        self.inner.get_server_time().await
1161    }
1162
1163    /// Checks if the client is initialized.
1164    ///
1165    /// The client is considered initialized if any instruments have been cached from the venue.
1166    #[must_use]
1167    pub fn is_initialized(&self) -> bool {
1168        self.cache_initialized.load(Ordering::Acquire)
1169    }
1170
1171    /// Returns a snapshot of all instrument symbols currently held in the
1172    /// internal cache.
1173    #[must_use]
1174    pub fn get_cached_symbols(&self) -> Vec<String> {
1175        self.instruments_cache
1176            .iter()
1177            .map(|entry| entry.key().to_string())
1178            .collect()
1179    }
1180
1181    /// Caches multiple instruments.
1182    ///
1183    /// Any existing instruments with the same symbols will be replaced.
1184    pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
1185        for inst in instruments {
1186            self.instruments_cache
1187                .insert(inst.raw_symbol().inner(), inst);
1188        }
1189        self.cache_initialized.store(true, Ordering::Release);
1190    }
1191
1192    /// Caches a single instrument.
1193    ///
1194    /// Any existing instrument with the same symbol will be replaced.
1195    pub fn cache_instrument(&self, instrument: InstrumentAny) {
1196        self.instruments_cache
1197            .insert(instrument.raw_symbol().inner(), instrument);
1198        self.cache_initialized.store(true, Ordering::Release);
1199    }
1200
1201    /// Gets an instrument from the cache by symbol.
1202    pub fn get_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
1203        self.instruments_cache
1204            .get(symbol)
1205            .map(|entry| entry.value().clone())
1206    }
1207
1208    /// Requests the account state for the `account_id` from OKX.
1209    ///
1210    /// # Errors
1211    ///
1212    /// Returns an error if the HTTP request fails or no account state is returned.
1213    pub async fn request_account_state(
1214        &self,
1215        account_id: AccountId,
1216    ) -> anyhow::Result<AccountState> {
1217        let resp = self
1218            .inner
1219            .get_balance()
1220            .await
1221            .map_err(|e| anyhow::anyhow!(e))?;
1222
1223        let ts_init = self.generate_ts_init();
1224        let raw = resp
1225            .first()
1226            .ok_or_else(|| anyhow::anyhow!("No account state returned from OKX"))?;
1227        let account_state = parse_account_state(raw, account_id, ts_init)?;
1228
1229        Ok(account_state)
1230    }
1231
1232    /// Sets the position mode for the account.
1233    ///
1234    /// Defaults to NetMode if no position mode is provided.
1235    ///
1236    /// # Errors
1237    ///
1238    /// Returns an error if the HTTP request fails or the position mode cannot be set.
1239    ///
1240    /// # Note
1241    ///
1242    /// This endpoint only works for accounts with derivatives trading enabled.
1243    /// If the account only has spot trading, this will return an error.
1244    pub async fn set_position_mode(&self, position_mode: OKXPositionMode) -> anyhow::Result<()> {
1245        let mut params = SetPositionModeParamsBuilder::default();
1246        params.pos_mode(position_mode);
1247        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1248
1249        match self.inner.set_position_mode(params).await {
1250            Ok(_) => Ok(()),
1251            Err(e) => {
1252                if let OKXHttpError::OkxError {
1253                    error_code,
1254                    message,
1255                } = &e
1256                    && error_code == "50115"
1257                {
1258                    log::warn!(
1259                        "Account does not support position mode setting (derivatives trading not enabled): {message}"
1260                    );
1261                    return Ok(()); // Gracefully handle this case
1262                }
1263                anyhow::bail!(e)
1264            }
1265        }
1266    }
1267
1268    /// Requests all instruments for the `instrument_type` from OKX.
1269    ///
1270    /// # Errors
1271    ///
1272    /// Returns an error if the HTTP request fails or instrument parsing fails.
1273    pub async fn request_instruments(
1274        &self,
1275        instrument_type: OKXInstrumentType,
1276        instrument_family: Option<String>,
1277    ) -> anyhow::Result<Vec<InstrumentAny>> {
1278        let mut params = GetInstrumentsParamsBuilder::default();
1279        params.inst_type(instrument_type);
1280
1281        if let Some(family) = instrument_family.clone() {
1282            params.inst_family(family);
1283        }
1284
1285        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1286
1287        let resp = self
1288            .inner
1289            .get_instruments(params)
1290            .await
1291            .map_err(|e| anyhow::anyhow!(e))?;
1292
1293        let fee_rate_opt = {
1294            let fee_params = GetTradeFeeParams {
1295                inst_type: instrument_type,
1296                uly: None,
1297                inst_family: instrument_family,
1298            };
1299
1300            match self.inner.get_trade_fee(fee_params).await {
1301                Ok(rates) => rates.into_iter().next(),
1302                Err(OKXHttpError::MissingCredentials) => {
1303                    log::debug!("Missing credentials for fee rates, using None");
1304                    None
1305                }
1306                Err(e) => {
1307                    log::warn!("Failed to fetch fee rates for {instrument_type}: {e}");
1308                    None
1309                }
1310            }
1311        };
1312
1313        let ts_init = self.generate_ts_init();
1314
1315        let mut instruments: Vec<InstrumentAny> = Vec::new();
1316        for inst in &resp {
1317            // Skip pre-open instruments which have incomplete/empty field values
1318            // Keep suspended instruments as they have valid metadata and may return to live
1319            if inst.state == OKXInstrumentStatus::Preopen {
1320                continue;
1321            }
1322
1323            // Determine which fee fields to use based on contract type
1324            // OKX fee rate convention: positive = rebate, negative = commission
1325            // Nautilus convention: negative = rebate, positive = commission
1326            // Negate to convert between conventions
1327            let (maker_fee, taker_fee) = if let Some(ref fee_rate) = fee_rate_opt {
1328                let is_usdt_margined = inst.ct_type == OKXContractType::Linear;
1329                let (maker_str, taker_str) = if is_usdt_margined {
1330                    (&fee_rate.maker_u, &fee_rate.taker_u)
1331                } else {
1332                    (&fee_rate.maker, &fee_rate.taker)
1333                };
1334
1335                let maker = if maker_str.is_empty() {
1336                    None
1337                } else {
1338                    Decimal::from_str(maker_str).ok().map(|v| -v)
1339                };
1340                let taker = if taker_str.is_empty() {
1341                    None
1342                } else {
1343                    Decimal::from_str(taker_str).ok().map(|v| -v)
1344                };
1345
1346                (maker, taker)
1347            } else {
1348                (None, None)
1349            };
1350
1351            match parse_instrument_any(inst, None, None, maker_fee, taker_fee, ts_init) {
1352                Ok(Some(instrument_any)) => {
1353                    instruments.push(instrument_any);
1354                }
1355                Ok(None) => {
1356                    // Unsupported instrument type, skip silently
1357                }
1358                Err(e) => {
1359                    log::warn!("Failed to parse instrument {}: {e}", inst.inst_id);
1360                }
1361            }
1362        }
1363
1364        Ok(instruments)
1365    }
1366
1367    /// Requests a single instrument by `instrument_id` from OKX.
1368    ///
1369    /// Fetches the instrument from the API, caches it, and returns it.
1370    ///
1371    /// # Errors
1372    ///
1373    /// This function will return an error if:
1374    /// - The API request fails.
1375    /// - The instrument is not found.
1376    /// - Failed to parse instrument data.
1377    pub async fn request_instrument(
1378        &self,
1379        instrument_id: InstrumentId,
1380    ) -> anyhow::Result<InstrumentAny> {
1381        let symbol = instrument_id.symbol.as_str();
1382        let instrument_type = okx_instrument_type_from_symbol(symbol);
1383
1384        let mut params = GetInstrumentsParamsBuilder::default();
1385        params.inst_type(instrument_type);
1386        params.inst_id(symbol);
1387
1388        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1389
1390        let resp = self
1391            .inner
1392            .get_instruments(params)
1393            .await
1394            .map_err(|e| anyhow::anyhow!(e))?;
1395
1396        let raw_inst = resp
1397            .first()
1398            .ok_or_else(|| anyhow::anyhow!("Instrument {symbol} not found"))?;
1399
1400        // Skip pre-open instruments which have incomplete/empty field values
1401        if raw_inst.state == OKXInstrumentStatus::Preopen {
1402            anyhow::bail!("Instrument {symbol} is in pre-open state");
1403        }
1404
1405        let fee_rate_opt = {
1406            let fee_params = GetTradeFeeParams {
1407                inst_type: instrument_type,
1408                uly: None,
1409                inst_family: None,
1410            };
1411
1412            match self.inner.get_trade_fee(fee_params).await {
1413                Ok(rates) => rates.into_iter().next(),
1414                Err(OKXHttpError::MissingCredentials) => {
1415                    log::debug!("Missing credentials for fee rates, using None");
1416                    None
1417                }
1418                Err(e) => {
1419                    log::warn!("Failed to fetch fee rates for {symbol}: {e}");
1420                    None
1421                }
1422            }
1423        };
1424
1425        // OKX fee rate convention: positive = rebate, negative = commission
1426        // Nautilus convention: negative = rebate, positive = commission
1427        // Negate to convert between conventions
1428        let (maker_fee, taker_fee) = if let Some(ref fee_rate) = fee_rate_opt {
1429            let is_usdt_margined = raw_inst.ct_type == OKXContractType::Linear;
1430            let (maker_str, taker_str) = if is_usdt_margined {
1431                (&fee_rate.maker_u, &fee_rate.taker_u)
1432            } else {
1433                (&fee_rate.maker, &fee_rate.taker)
1434            };
1435
1436            let maker = if maker_str.is_empty() {
1437                None
1438            } else {
1439                Decimal::from_str(maker_str).ok().map(|v| -v)
1440            };
1441            let taker = if taker_str.is_empty() {
1442                None
1443            } else {
1444                Decimal::from_str(taker_str).ok().map(|v| -v)
1445            };
1446
1447            (maker, taker)
1448        } else {
1449            (None, None)
1450        };
1451
1452        let ts_init = self.generate_ts_init();
1453        let instrument = parse_instrument_any(raw_inst, None, None, maker_fee, taker_fee, ts_init)?
1454            .ok_or_else(|| anyhow::anyhow!("Unsupported instrument type for {symbol}"))?;
1455
1456        self.cache_instrument(instrument.clone());
1457
1458        Ok(instrument)
1459    }
1460
1461    /// Requests the latest mark price for the `instrument_type` from OKX.
1462    ///
1463    /// # Errors
1464    ///
1465    /// Returns an error if the HTTP request fails or no mark price is returned.
1466    pub async fn request_mark_price(
1467        &self,
1468        instrument_id: InstrumentId,
1469    ) -> anyhow::Result<MarkPriceUpdate> {
1470        let mut params = GetMarkPriceParamsBuilder::default();
1471        params.inst_id(instrument_id.symbol.inner());
1472        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1473
1474        let resp = self
1475            .inner
1476            .get_mark_price(params)
1477            .await
1478            .map_err(|e| anyhow::anyhow!(e))?;
1479
1480        let raw = resp
1481            .first()
1482            .ok_or_else(|| anyhow::anyhow!("No mark price returned from OKX"))?;
1483        let inst = self.instrument_from_cache(instrument_id.symbol.inner())?;
1484        let ts_init = self.generate_ts_init();
1485
1486        let mark_price =
1487            parse_mark_price_update(raw, instrument_id, inst.price_precision(), ts_init)
1488                .map_err(|e| anyhow::anyhow!(e))?;
1489        Ok(mark_price)
1490    }
1491
1492    /// Requests the latest index price for the `instrument_id` from OKX.
1493    ///
1494    /// # Errors
1495    ///
1496    /// Returns an error if the HTTP request fails or no index price is returned.
1497    pub async fn request_index_price(
1498        &self,
1499        instrument_id: InstrumentId,
1500    ) -> anyhow::Result<IndexPriceUpdate> {
1501        let mut params = GetIndexTickerParamsBuilder::default();
1502        params.inst_id(instrument_id.symbol.inner());
1503        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1504
1505        let resp = self
1506            .inner
1507            .get_index_tickers(params)
1508            .await
1509            .map_err(|e| anyhow::anyhow!(e))?;
1510
1511        let raw = resp
1512            .first()
1513            .ok_or_else(|| anyhow::anyhow!("No index price returned from OKX"))?;
1514        let inst = self.instrument_from_cache(instrument_id.symbol.inner())?;
1515        let ts_init = self.generate_ts_init();
1516
1517        let index_price =
1518            parse_index_price_update(raw, instrument_id, inst.price_precision(), ts_init)
1519                .map_err(|e| anyhow::anyhow!(e))?;
1520        Ok(index_price)
1521    }
1522
1523    /// Requests trades for the `instrument_id` and `start` -> `end` time range.
1524    ///
1525    /// # Errors
1526    ///
1527    /// Returns an error if the HTTP request fails or trade parsing fails.
1528    ///
1529    /// # Panics
1530    ///
1531    /// Panics if the API returns an empty response when debug logging is enabled.
1532    pub async fn request_trades(
1533        &self,
1534        instrument_id: InstrumentId,
1535        start: Option<DateTime<Utc>>,
1536        end: Option<DateTime<Utc>>,
1537        limit: Option<u32>,
1538    ) -> anyhow::Result<Vec<TradeTick>> {
1539        const OKX_TRADES_MAX_LIMIT: u32 = 100;
1540        const MAX_PAGES: usize = 500;
1541        const MAX_CONSECUTIVE_EMPTY: usize = 3;
1542
1543        #[derive(Clone, Copy, Debug, PartialEq, Eq)]
1544        enum Mode {
1545            Latest,
1546            Backward,
1547            Range,
1548        }
1549
1550        let limit = if limit == Some(0) { None } else { limit };
1551
1552        if let (Some(s), Some(e)) = (start, end) {
1553            anyhow::ensure!(s < e, "Invalid time range: start={s:?} end={e:?}");
1554        }
1555
1556        let now = Utc::now();
1557
1558        if let Some(s) = start
1559            && s > now
1560        {
1561            return Ok(Vec::new());
1562        }
1563
1564        let end = if let Some(e) = end
1565            && e > now
1566        {
1567            Some(now)
1568        } else {
1569            end
1570        };
1571
1572        let mode = match (start, end) {
1573            (None, None) => Mode::Latest,
1574            (Some(_), None) => Mode::Backward,
1575            (None, Some(_)) => Mode::Backward,
1576            (Some(_), Some(_)) => Mode::Range,
1577        };
1578
1579        let start_ms = start.map(|s| s.timestamp_millis());
1580        let end_ms = end.map(|e| e.timestamp_millis());
1581
1582        let ts_init = self.generate_ts_init();
1583        let inst = self.instrument_from_cache(instrument_id.symbol.inner())?;
1584
1585        // Historical pagination walks backwards using trade IDs, OKX does not honour timestamps for
1586        // standalone `before` requests (type=2)
1587        if matches!(mode, Mode::Backward | Mode::Range) {
1588            let mut before_trade_id: Option<String> = None;
1589            let mut pages = 0usize;
1590            let mut page_results: Vec<Vec<TradeTick>> = Vec::new();
1591            let mut seen_trades: std::collections::HashSet<(String, i64)> =
1592                std::collections::HashSet::new();
1593            let mut unique_count = 0usize;
1594            let mut consecutive_empty_pages = 0usize;
1595
1596            // Only apply default limit when there's no start boundary
1597            // (start provides a natural stopping point, end alone allows infinite backward pagination)
1598            let effective_limit = if start.is_some() {
1599                limit.unwrap_or(u32::MAX)
1600            } else {
1601                limit.unwrap_or(OKX_TRADES_MAX_LIMIT)
1602            };
1603
1604            log::debug!(
1605                "Starting trades pagination: mode={mode:?}, start={start:?}, end={end:?}, limit={limit:?}, effective_limit={effective_limit}"
1606            );
1607
1608            loop {
1609                if pages >= MAX_PAGES {
1610                    log::warn!("Hit MAX_PAGES limit of {MAX_PAGES}");
1611                    break;
1612                }
1613
1614                if effective_limit < u32::MAX && unique_count >= effective_limit as usize {
1615                    log::debug!("Reached effective limit: unique_count={unique_count}");
1616                    break;
1617                }
1618
1619                let remaining = (effective_limit as usize).saturating_sub(unique_count);
1620                let page_cap = remaining.min(OKX_TRADES_MAX_LIMIT as usize) as u32;
1621
1622                log::debug!(
1623                    "Requesting page {}: before_id={:?}, page_cap={}, unique_count={}",
1624                    pages + 1,
1625                    before_trade_id,
1626                    page_cap,
1627                    unique_count
1628                );
1629
1630                let mut params_builder = GetTradesParamsBuilder::default();
1631                params_builder
1632                    .inst_id(instrument_id.symbol.inner())
1633                    .limit(page_cap)
1634                    .pagination_type(1);
1635
1636                // Use 'after' to get older trades (OKX API: after=cursor means < cursor)
1637                if let Some(ref before_id) = before_trade_id {
1638                    params_builder.after(before_id.clone());
1639                }
1640
1641                let params = params_builder.build().map_err(anyhow::Error::new)?;
1642                let raw = self
1643                    .inner
1644                    .get_history_trades(params)
1645                    .await
1646                    .map_err(anyhow::Error::new)?;
1647
1648                log::debug!("Received {} raw trades from API", raw.len());
1649
1650                if !raw.is_empty() {
1651                    let first_id = &raw.first().unwrap().trade_id;
1652                    let last_id = &raw.last().unwrap().trade_id;
1653                    log::debug!(
1654                        "Raw response trade ID range: first={first_id} (newest), last={last_id} (oldest)"
1655                    );
1656                }
1657
1658                if raw.is_empty() {
1659                    log::debug!("API returned empty page, stopping pagination");
1660                    break;
1661                }
1662
1663                pages += 1;
1664
1665                let mut page_trades: Vec<TradeTick> = Vec::with_capacity(raw.len());
1666                let mut hit_start_boundary = false;
1667                let mut filtered_out = 0usize;
1668                let mut duplicates = 0usize;
1669
1670                for r in &raw {
1671                    match parse_trade_tick(
1672                        r,
1673                        instrument_id,
1674                        inst.price_precision(),
1675                        inst.size_precision(),
1676                        ts_init,
1677                    ) {
1678                        Ok(trade) => {
1679                            let ts_ms = trade.ts_event.as_i64() / 1_000_000;
1680
1681                            if let Some(e_ms) = end_ms
1682                                && ts_ms > e_ms
1683                            {
1684                                filtered_out += 1;
1685                                continue;
1686                            }
1687
1688                            if let Some(s_ms) = start_ms
1689                                && ts_ms < s_ms
1690                            {
1691                                hit_start_boundary = true;
1692                                filtered_out += 1;
1693                                break;
1694                            }
1695
1696                            let trade_key = (trade.trade_id.to_string(), trade.ts_event.as_i64());
1697                            if seen_trades.insert(trade_key) {
1698                                unique_count += 1;
1699                                page_trades.push(trade);
1700                            } else {
1701                                duplicates += 1;
1702                            }
1703                        }
1704                        Err(e) => log::error!("{e}"),
1705                    }
1706                }
1707
1708                log::debug!(
1709                    "Page {} processed: {} trades kept, {} filtered out, {} duplicates, hit_start_boundary={}",
1710                    pages,
1711                    page_trades.len(),
1712                    filtered_out,
1713                    duplicates,
1714                    hit_start_boundary
1715                );
1716
1717                // Extract oldest unique trade ID for next page cursor
1718                let oldest_trade_id = if page_trades.is_empty() {
1719                    // Only apply consecutive empty guard if we've already collected some trades
1720                    // This allows historical backfills to paginate through empty prelude
1721                    if unique_count > 0 {
1722                        consecutive_empty_pages += 1;
1723                        if consecutive_empty_pages >= MAX_CONSECUTIVE_EMPTY {
1724                            log::debug!(
1725                                "Stopping: {consecutive_empty_pages} consecutive pages with no trades in range after collecting {unique_count} trades"
1726                            );
1727                            break;
1728                        }
1729                    }
1730                    // No unique trades on page, use raw response for cursor
1731                    raw.last().map(|t| {
1732                        let id = t.trade_id.to_string();
1733                        log::debug!(
1734                            "Setting cursor from raw response (no unique trades): oldest_id={id}"
1735                        );
1736                        id
1737                    })
1738                } else {
1739                    // Use oldest deduplicated trade ID before reversing
1740                    let oldest_id = page_trades.last().map(|t| {
1741                        let id = t.trade_id.to_string();
1742                        log::debug!(
1743                            "Setting cursor from deduplicated trades: oldest_id={}, ts_event={}",
1744                            id,
1745                            t.ts_event.as_i64()
1746                        );
1747                        id
1748                    });
1749                    page_trades.reverse();
1750                    page_results.push(page_trades);
1751                    consecutive_empty_pages = 0;
1752                    oldest_id
1753                };
1754
1755                if let Some(ref old_id) = before_trade_id
1756                    && oldest_trade_id.as_ref() == Some(old_id)
1757                {
1758                    break;
1759                }
1760
1761                if oldest_trade_id.is_none() {
1762                    break;
1763                }
1764
1765                before_trade_id = oldest_trade_id;
1766
1767                if hit_start_boundary {
1768                    break;
1769                }
1770
1771                tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
1772            }
1773
1774            log::debug!(
1775                "Pagination complete: {pages} pages, {unique_count} unique trades collected"
1776            );
1777
1778            let mut out: Vec<TradeTick> = Vec::new();
1779            for page in page_results.into_iter().rev() {
1780                out.extend(page);
1781            }
1782
1783            // Deduplicate by (trade_id, ts_event) composite key
1784            let mut dedup_keys = std::collections::HashSet::new();
1785            let pre_dedup_len = out.len();
1786            out.retain(|trade| {
1787                dedup_keys.insert((trade.trade_id.to_string(), trade.ts_event.as_i64()))
1788            });
1789
1790            if out.len() < pre_dedup_len {
1791                log::debug!(
1792                    "Removed {} duplicate trades during final dedup",
1793                    pre_dedup_len - out.len()
1794                );
1795            }
1796
1797            if let Some(lim) = limit
1798                && lim > 0
1799                && out.len() > lim as usize
1800            {
1801                let excess = out.len() - lim as usize;
1802                log::debug!("Trimming {excess} oldest trades to respect limit={lim}");
1803                out.drain(0..excess);
1804            }
1805
1806            log::debug!("Returning {} trades", out.len());
1807            return Ok(out);
1808        }
1809
1810        let req_limit = limit
1811            .unwrap_or(OKX_TRADES_MAX_LIMIT)
1812            .min(OKX_TRADES_MAX_LIMIT);
1813        let params = GetTradesParamsBuilder::default()
1814            .inst_id(instrument_id.symbol.inner())
1815            .limit(req_limit)
1816            .build()
1817            .map_err(anyhow::Error::new)?;
1818
1819        let raw = self
1820            .inner
1821            .get_history_trades(params)
1822            .await
1823            .map_err(anyhow::Error::new)?;
1824
1825        let mut trades: Vec<TradeTick> = Vec::with_capacity(raw.len());
1826        for r in &raw {
1827            match parse_trade_tick(
1828                r,
1829                instrument_id,
1830                inst.price_precision(),
1831                inst.size_precision(),
1832                ts_init,
1833            ) {
1834                Ok(trade) => trades.push(trade),
1835                Err(e) => log::error!("{e}"),
1836            }
1837        }
1838
1839        // OKX returns newest-first, reverse to oldest-first
1840        trades.reverse();
1841
1842        if let Some(lim) = limit
1843            && lim > 0
1844            && trades.len() > lim as usize
1845        {
1846            trades.drain(0..trades.len() - lim as usize);
1847        }
1848
1849        Ok(trades)
1850    }
1851
1852    /// Requests historical bars for the given bar type and time range.
1853    ///
1854    /// The aggregation source must be `EXTERNAL`. Time range validation ensures start < end.
1855    /// Returns bars sorted oldest to newest.
1856    ///
1857    /// # Errors
1858    ///
1859    /// Returns an error if the request fails.
1860    ///
1861    /// # Endpoint Selection
1862    ///
1863    /// The OKX API has different endpoints with different limits:
1864    /// - Regular endpoint (`/api/v5/market/candles`): ≤ 300 rows/call, ≤ 40 req/2s
1865    ///   - Used when: start is None OR age ≤ 100 days
1866    /// - History endpoint (`/api/v5/market/history-candles`): ≤ 100 rows/call, ≤ 20 req/2s
1867    ///   - Used when: start is Some AND age > 100 days
1868    ///
1869    /// Age is calculated as `Utc::now() - start` at the time of the first request.
1870    ///
1871    /// # Supported Aggregations
1872    ///
1873    /// Maps to OKX bar query parameter:
1874    /// - `Second` → `{n}s`
1875    /// - `Minute` → `{n}m`
1876    /// - `Hour` → `{n}H`
1877    /// - `Day` → `{n}D`
1878    /// - `Week` → `{n}W`
1879    /// - `Month` → `{n}M`
1880    ///
1881    /// # Pagination
1882    ///
1883    /// - Uses `before` parameter for backwards pagination
1884    /// - Pages backwards from end time (or now) to start time
1885    /// - Stops when: limit reached, time window covered, or API returns empty
1886    /// - Rate limit safety: ≥ 50ms between requests
1887    ///
1888    /// # Panics
1889    ///
1890    /// May panic if internal data structures are in an unexpected state.
1891    ///
1892    /// # References
1893    ///
1894    /// - <https://tr.okx.com/docs-v5/en/#order-book-trading-market-data-get-candlesticks>
1895    /// - <https://tr.okx.com/docs-v5/en/#order-book-trading-market-data-get-candlesticks-history>
1896    pub async fn request_bars(
1897        &self,
1898        bar_type: BarType,
1899        start: Option<DateTime<Utc>>,
1900        mut end: Option<DateTime<Utc>>,
1901        limit: Option<u32>,
1902    ) -> anyhow::Result<Vec<Bar>> {
1903        const HISTORY_SPLIT_DAYS: i64 = 100;
1904        const MAX_PAGES_SOFT: usize = 500;
1905
1906        #[derive(Clone, Copy, Debug, PartialEq, Eq)]
1907        enum Mode {
1908            Latest,
1909            Backward,
1910            Range,
1911        }
1912
1913        let limit = if limit == Some(0) { None } else { limit };
1914
1915        anyhow::ensure!(
1916            bar_type.aggregation_source() == AggregationSource::External,
1917            "Only EXTERNAL aggregation is supported"
1918        );
1919
1920        if let (Some(s), Some(e)) = (start, end) {
1921            anyhow::ensure!(s < e, "Invalid time range: start={s:?} end={e:?}");
1922        }
1923
1924        let now = Utc::now();
1925
1926        if let Some(s) = start
1927            && s > now
1928        {
1929            return Ok(Vec::new());
1930        }
1931        if let Some(e) = end
1932            && e > now
1933        {
1934            end = Some(now);
1935        }
1936
1937        let spec = bar_type.spec();
1938        let step = spec.step.get();
1939        let bar_param = match spec.aggregation {
1940            BarAggregation::Second => format!("{step}s"),
1941            BarAggregation::Minute => format!("{step}m"),
1942            BarAggregation::Hour => format!("{step}H"),
1943            BarAggregation::Day => format!("{step}D"),
1944            BarAggregation::Week => format!("{step}W"),
1945            BarAggregation::Month => format!("{step}M"),
1946            a => anyhow::bail!("OKX does not support {a:?} aggregation"),
1947        };
1948
1949        let slot_ms: i64 = match spec.aggregation {
1950            BarAggregation::Second => (step as i64) * 1_000,
1951            BarAggregation::Minute => (step as i64) * 60_000,
1952            BarAggregation::Hour => (step as i64) * 3_600_000,
1953            BarAggregation::Day => (step as i64) * 86_400_000,
1954            BarAggregation::Week => (step as i64) * 7 * 86_400_000,
1955            BarAggregation::Month => (step as i64) * 30 * 86_400_000,
1956            _ => unreachable!("Unsupported aggregation should have been caught above"),
1957        };
1958        let slot_ns: i64 = slot_ms * 1_000_000;
1959
1960        let mode = match (start, end) {
1961            (None, None) => Mode::Latest,
1962            (Some(_), None) => Mode::Backward, // Changed: when only start is provided, work backward from now
1963            (None, Some(_)) => Mode::Backward,
1964            (Some(_), Some(_)) => Mode::Range,
1965        };
1966
1967        let start_ns = start.and_then(|s| s.timestamp_nanos_opt());
1968        let end_ns = end.and_then(|e| e.timestamp_nanos_opt());
1969
1970        // Floor start and ceiling end to bar boundaries for cleaner API requests
1971        let start_ms = start.map(|s| {
1972            let ms = s.timestamp_millis();
1973            if slot_ms > 0 {
1974                (ms / slot_ms) * slot_ms // Floor to nearest bar boundary
1975            } else {
1976                ms
1977            }
1978        });
1979        let end_ms = end.map(|e| {
1980            let ms = e.timestamp_millis();
1981            if slot_ms > 0 {
1982                ((ms + slot_ms - 1) / slot_ms) * slot_ms // Ceiling to nearest bar boundary
1983            } else {
1984                ms
1985            }
1986        });
1987        let now_ms = now.timestamp_millis();
1988
1989        let symbol = bar_type.instrument_id().symbol;
1990        let inst = self.instrument_from_cache(symbol.inner())?;
1991
1992        let mut out: Vec<Bar> = Vec::new();
1993        let mut pages = 0usize;
1994
1995        // IMPORTANT: OKX API has COUNTER-INTUITIVE semantics (same for bars and trades):
1996        // - after=X returns records with timestamp < X (upper bound, despite the name!)
1997        // - before=X returns records with timestamp > X (lower bound, despite the name!)
1998        // For Range [start, end], use: before=start (lower bound), after=end (upper bound)
1999        let mut after_ms: Option<i64> = match mode {
2000            Mode::Range => end_ms.or(Some(now_ms)), // Upper bound: bars < end
2001            _ => None,
2002        };
2003        let mut before_ms: Option<i64> = match mode {
2004            Mode::Backward => end_ms.map(|v| v.saturating_sub(1)),
2005            Mode::Range => start_ms, // Lower bound: bars > start
2006            Mode::Latest => None,
2007        };
2008
2009        // For Range mode, we'll paginate backwards like Backward mode
2010        let mut forward_prepend_mode = matches!(mode, Mode::Range);
2011
2012        // Adjust before_ms to ensure we get data from the API
2013        // OKX API might not have bars for the very recent past
2014        // This handles both explicit end=now and the actor layer setting end=now when it's None
2015        if matches!(mode, Mode::Backward | Mode::Range)
2016            && let Some(b) = before_ms
2017        {
2018            // OKX endpoints have different data availability windows:
2019            // - Regular endpoint: has most recent data but limited depth
2020            // - History endpoint: has deep history but lags behind current time
2021            // Use a small buffer to avoid the "dead zone"
2022            let buffer_ms = slot_ms.max(60_000); // At least 1 minute or 1 bar
2023            if b >= now_ms.saturating_sub(buffer_ms) {
2024                before_ms = Some(now_ms.saturating_sub(buffer_ms));
2025            }
2026        }
2027
2028        let mut have_latest_first_page = false;
2029        let mut progressless_loops = 0u8;
2030
2031        loop {
2032            if let Some(lim) = limit
2033                && lim > 0
2034                && out.len() >= lim as usize
2035            {
2036                break;
2037            }
2038            if pages >= MAX_PAGES_SOFT {
2039                break;
2040            }
2041
2042            let pivot_ms = if let Some(a) = after_ms {
2043                a
2044            } else if let Some(b) = before_ms {
2045                b
2046            } else {
2047                now_ms
2048            };
2049            // Choose endpoint based on how old the data is:
2050            // - Use regular endpoint for recent data (< 1 hour old)
2051            // - Use history endpoint for older data (> 1 hour old)
2052            // This avoids the "gap" where history endpoint has no recent data
2053            // and regular endpoint has limited depth
2054            let age_ms = now_ms.saturating_sub(pivot_ms);
2055            let age_hours = age_ms / (60 * 60 * 1000);
2056            let using_history = age_hours > 1; // Use history if data is > 1 hour old
2057
2058            let page_ceiling = if using_history { 100 } else { 300 };
2059            let remaining = limit
2060                .filter(|&l| l > 0) // Treat limit=0 as no limit
2061                .map_or(page_ceiling, |l| (l as usize).saturating_sub(out.len()));
2062            let page_cap = remaining.min(page_ceiling);
2063
2064            let mut p = GetCandlesticksParamsBuilder::default();
2065            p.inst_id(symbol.as_str())
2066                .bar(&bar_param)
2067                .limit(page_cap as u32);
2068
2069            // Track whether this planned request uses BEFORE or AFTER.
2070            let mut req_used_before = false;
2071
2072            match mode {
2073                Mode::Latest => {
2074                    if have_latest_first_page && let Some(b) = before_ms {
2075                        p.before_ms(b);
2076                        req_used_before = true;
2077                    }
2078                }
2079                Mode::Backward => {
2080                    // Use 'after' to get older bars (OKX API: after=cursor means < cursor)
2081                    if let Some(b) = before_ms {
2082                        p.after_ms(b);
2083                    }
2084                }
2085                Mode::Range => {
2086                    // For Range mode, use both after and before to specify the full range
2087                    // This is much more efficient than pagination
2088                    if let Some(a) = after_ms {
2089                        p.after_ms(a);
2090                    }
2091                    if let Some(b) = before_ms {
2092                        p.before_ms(b);
2093                        req_used_before = true;
2094                    }
2095                }
2096            }
2097
2098            let params = p.build().map_err(anyhow::Error::new)?;
2099
2100            let mut raw = if using_history {
2101                self.inner
2102                    .get_history_candles(params.clone())
2103                    .await
2104                    .map_err(anyhow::Error::new)?
2105            } else {
2106                self.inner
2107                    .get_candles(params.clone())
2108                    .await
2109                    .map_err(anyhow::Error::new)?
2110            };
2111
2112            // --- Fallbacks on empty page ---
2113            if raw.is_empty() {
2114                // LATEST: retry same cursor via history, then step back a page-interval before giving up
2115                if matches!(mode, Mode::Latest)
2116                    && have_latest_first_page
2117                    && !using_history
2118                    && let Some(b) = before_ms
2119                {
2120                    let mut p2 = GetCandlesticksParamsBuilder::default();
2121                    p2.inst_id(symbol.as_str())
2122                        .bar(&bar_param)
2123                        .limit(page_cap as u32);
2124                    p2.before_ms(b);
2125                    let params2 = p2.build().map_err(anyhow::Error::new)?;
2126                    let raw2 = self
2127                        .inner
2128                        .get_history_candles(params2)
2129                        .await
2130                        .map_err(anyhow::Error::new)?;
2131                    if raw2.is_empty() {
2132                        // Step back one page interval and retry loop
2133                        let jump = (page_cap as i64).saturating_mul(slot_ms.max(1));
2134                        before_ms = Some(b.saturating_sub(jump));
2135                        progressless_loops = progressless_loops.saturating_add(1);
2136                        if progressless_loops >= 3 {
2137                            break;
2138                        }
2139                        continue;
2140                    } else {
2141                        raw = raw2;
2142                    }
2143                }
2144
2145                // Range mode doesn't need special bootstrap - it uses the normal flow with before_ms set
2146
2147                // If still empty: for Range after first page, try a single backstep window using BEFORE
2148                if raw.is_empty() && matches!(mode, Mode::Range) && pages > 0 {
2149                    let backstep_ms = (page_cap as i64).saturating_mul(slot_ms.max(1));
2150                    let pivot_back = after_ms.unwrap_or(now_ms).saturating_sub(backstep_ms);
2151
2152                    let mut p2 = GetCandlesticksParamsBuilder::default();
2153                    p2.inst_id(symbol.as_str())
2154                        .bar(&bar_param)
2155                        .limit(page_cap as u32)
2156                        .before_ms(pivot_back);
2157                    let params2 = p2.build().map_err(anyhow::Error::new)?;
2158                    let raw2 = if (now_ms.saturating_sub(pivot_back)) / (24 * 60 * 60 * 1000)
2159                        > HISTORY_SPLIT_DAYS
2160                    {
2161                        self.inner.get_history_candles(params2).await
2162                    } else {
2163                        self.inner.get_candles(params2).await
2164                    }
2165                    .map_err(anyhow::Error::new)?;
2166                    if raw2.is_empty() {
2167                        break;
2168                    } else {
2169                        raw = raw2;
2170                        forward_prepend_mode = true;
2171                        req_used_before = true;
2172                    }
2173                }
2174
2175                // First LATEST page empty: jump back >100d to force history, then continue loop
2176                if raw.is_empty()
2177                    && matches!(mode, Mode::Latest)
2178                    && !have_latest_first_page
2179                    && !using_history
2180                {
2181                    let jump_days_ms = (HISTORY_SPLIT_DAYS + 1) * 86_400_000;
2182                    before_ms = Some(now_ms.saturating_sub(jump_days_ms));
2183                    have_latest_first_page = true;
2184                    continue;
2185                }
2186
2187                // Still empty for any other case? Just break.
2188                if raw.is_empty() {
2189                    break;
2190                }
2191            }
2192            // --- end fallbacks ---
2193
2194            pages += 1;
2195
2196            // Parse, oldest → newest
2197            let ts_init = self.generate_ts_init();
2198            let mut page: Vec<Bar> = Vec::with_capacity(raw.len());
2199            for r in &raw {
2200                page.push(parse_candlestick(
2201                    r,
2202                    bar_type,
2203                    inst.price_precision(),
2204                    inst.size_precision(),
2205                    ts_init,
2206                )?);
2207            }
2208            page.reverse();
2209
2210            let page_oldest_ms = page.first().map(|b| b.ts_event.as_i64() / 1_000_000);
2211            let page_newest_ms = page.last().map(|b| b.ts_event.as_i64() / 1_000_000);
2212
2213            // Range filter (inclusive)
2214            // For Range mode, if we have no bars yet and this is an early page,
2215            // be more tolerant with the start boundary to handle gaps in data
2216            let mut filtered: Vec<Bar> = if matches!(mode, Mode::Range)
2217                && out.is_empty()
2218                && pages < 2
2219            {
2220                // On first pages of Range mode with no data yet, include the most recent bar
2221                // even if it's slightly before our start time (within 2 bar periods)
2222                // BUT we want ALL bars in the page that are within our range
2223                let tolerance_ns = slot_ns * 2; // Allow up to 2 bar periods before start
2224
2225                // Debug: log the page range
2226                if !page.is_empty() {
2227                    log::debug!(
2228                        "Range mode bootstrap page: {} bars from {} to {}, filtering with start={:?} end={:?}",
2229                        page.len(),
2230                        page.first().unwrap().ts_event.as_i64() / 1_000_000,
2231                        page.last().unwrap().ts_event.as_i64() / 1_000_000,
2232                        start_ms,
2233                        end_ms
2234                    );
2235                }
2236
2237                let result: Vec<Bar> = page
2238                    .clone()
2239                    .into_iter()
2240                    .filter(|b| {
2241                        let ts = b.ts_event.as_i64();
2242                        // Accept bars from (start - tolerance) to end
2243                        let ok_after =
2244                            start_ns.is_none_or(|sns| ts >= sns.saturating_sub(tolerance_ns));
2245                        let ok_before = end_ns.is_none_or(|ens| ts <= ens);
2246                        ok_after && ok_before
2247                    })
2248                    .collect();
2249
2250                result
2251            } else {
2252                // Normal filtering
2253                page.clone()
2254                    .into_iter()
2255                    .filter(|b| {
2256                        let ts = b.ts_event.as_i64();
2257                        let ok_after = start_ns.is_none_or(|sns| ts >= sns);
2258                        let ok_before = end_ns.is_none_or(|ens| ts <= ens);
2259                        ok_after && ok_before
2260                    })
2261                    .collect()
2262            };
2263
2264            if !page.is_empty() && filtered.is_empty() {
2265                // For Range mode, if all bars are before our start time, there's no point continuing
2266                if matches!(mode, Mode::Range)
2267                    && !forward_prepend_mode
2268                    && let (Some(newest_ms), Some(start_ms)) = (page_newest_ms, start_ms)
2269                    && newest_ms < start_ms.saturating_sub(slot_ms * 2)
2270                {
2271                    // Bars are too old (more than 2 bar periods before start), stop
2272                    break;
2273                }
2274            }
2275
2276            // Track contribution for progress guard
2277            let contribution;
2278
2279            if out.is_empty() {
2280                contribution = filtered.len();
2281                out = filtered;
2282            } else {
2283                match mode {
2284                    Mode::Backward | Mode::Latest => {
2285                        if let Some(first) = out.first() {
2286                            filtered.retain(|b| b.ts_event < first.ts_event);
2287                        }
2288                        contribution = filtered.len();
2289                        if contribution != 0 {
2290                            let mut new_out = Vec::with_capacity(out.len() + filtered.len());
2291                            new_out.extend_from_slice(&filtered);
2292                            new_out.extend_from_slice(&out);
2293                            out = new_out;
2294                        }
2295                    }
2296                    Mode::Range => {
2297                        if forward_prepend_mode || req_used_before {
2298                            // We are backfilling older pages: prepend them.
2299                            if let Some(first) = out.first() {
2300                                filtered.retain(|b| b.ts_event < first.ts_event);
2301                            }
2302                            contribution = filtered.len();
2303                            if contribution != 0 {
2304                                let mut new_out = Vec::with_capacity(out.len() + filtered.len());
2305                                new_out.extend_from_slice(&filtered);
2306                                new_out.extend_from_slice(&out);
2307                                out = new_out;
2308                            }
2309                        } else {
2310                            // Normal forward: append newer pages.
2311                            if let Some(last) = out.last() {
2312                                filtered.retain(|b| b.ts_event > last.ts_event);
2313                            }
2314                            contribution = filtered.len();
2315                            out.extend(filtered);
2316                        }
2317                    }
2318                }
2319            }
2320
2321            // Duplicate-window mitigation for Latest/Backward/Range
2322            if contribution == 0
2323                && matches!(mode, Mode::Latest | Mode::Backward | Mode::Range)
2324                && let Some(b) = before_ms
2325            {
2326                let jump = (page_cap as i64).saturating_mul(slot_ms.max(1));
2327                let new_b = b.saturating_sub(jump);
2328                if new_b != b {
2329                    before_ms = Some(new_b);
2330                }
2331            }
2332
2333            if contribution == 0 {
2334                progressless_loops = progressless_loops.saturating_add(1);
2335                if progressless_loops >= 3 {
2336                    break;
2337                }
2338            } else {
2339                progressless_loops = 0;
2340
2341                // Advance cursors only when we made progress
2342                match mode {
2343                    Mode::Latest | Mode::Backward => {
2344                        if let Some(oldest) = page_oldest_ms {
2345                            before_ms = Some(oldest.saturating_sub(1));
2346                            have_latest_first_page = true;
2347                        } else {
2348                            break;
2349                        }
2350                    }
2351                    Mode::Range => {
2352                        if forward_prepend_mode || req_used_before {
2353                            if let Some(oldest) = page_oldest_ms {
2354                                // Move back by at least one bar period to avoid getting the same data
2355                                let jump_back = slot_ms.max(60_000); // At least 1 minute
2356                                before_ms = Some(oldest.saturating_sub(jump_back));
2357                                after_ms = None;
2358                            } else {
2359                                break;
2360                            }
2361                        } else if let Some(newest) = page_newest_ms {
2362                            after_ms = Some(newest.saturating_add(1));
2363                            before_ms = None;
2364                        } else {
2365                            break;
2366                        }
2367                    }
2368                }
2369            }
2370
2371            // Stop conditions
2372            if let Some(lim) = limit
2373                && lim > 0
2374                && out.len() >= lim as usize
2375            {
2376                break;
2377            }
2378            if let Some(ens) = end_ns
2379                && let Some(last) = out.last()
2380                && last.ts_event.as_i64() >= ens
2381            {
2382                break;
2383            }
2384            if let Some(sns) = start_ns
2385                && let Some(first) = out.first()
2386                && (matches!(mode, Mode::Backward) || forward_prepend_mode)
2387                && first.ts_event.as_i64() <= sns
2388            {
2389                // For Range mode, check if we have all bars up to the end time
2390                if matches!(mode, Mode::Range) {
2391                    // Don't stop if we haven't reached the end time yet
2392                    if let Some(ens) = end_ns
2393                        && let Some(last) = out.last()
2394                    {
2395                        let last_ts = last.ts_event.as_i64();
2396                        if last_ts < ens {
2397                            // We have bars before start but haven't reached end, need to continue forward
2398                            // Switch from backward to forward pagination
2399                            forward_prepend_mode = false;
2400                            after_ms = Some((last_ts / 1_000_000).saturating_add(1));
2401                            before_ms = None;
2402                            continue;
2403                        }
2404                    }
2405                }
2406                break;
2407            }
2408
2409            tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
2410        }
2411
2412        // Final rescue for FORWARD/RANGE when nothing gathered
2413        if out.is_empty() && matches!(mode, Mode::Range) {
2414            let pivot = end_ms.unwrap_or(now_ms.saturating_sub(1));
2415            let hist = (now_ms.saturating_sub(pivot)) / (24 * 60 * 60 * 1000) > HISTORY_SPLIT_DAYS;
2416            let mut p = GetCandlesticksParamsBuilder::default();
2417            p.inst_id(symbol.as_str())
2418                .bar(&bar_param)
2419                .limit(300)
2420                .before_ms(pivot);
2421            let params = p.build().map_err(anyhow::Error::new)?;
2422            let raw = if hist {
2423                self.inner.get_history_candles(params).await
2424            } else {
2425                self.inner.get_candles(params).await
2426            }
2427            .map_err(anyhow::Error::new)?;
2428            if !raw.is_empty() {
2429                let ts_init = self.generate_ts_init();
2430                let mut page: Vec<Bar> = Vec::with_capacity(raw.len());
2431                for r in &raw {
2432                    page.push(parse_candlestick(
2433                        r,
2434                        bar_type,
2435                        inst.price_precision(),
2436                        inst.size_precision(),
2437                        ts_init,
2438                    )?);
2439                }
2440                page.reverse();
2441                out = page
2442                    .into_iter()
2443                    .filter(|b| {
2444                        let ts = b.ts_event.as_i64();
2445                        let ok_after = start_ns.is_none_or(|sns| ts >= sns);
2446                        let ok_before = end_ns.is_none_or(|ens| ts <= ens);
2447                        ok_after && ok_before
2448                    })
2449                    .collect();
2450            }
2451        }
2452
2453        // Trim against end bound if needed (keep ≤ end)
2454        if let Some(ens) = end_ns {
2455            while out.last().is_some_and(|b| b.ts_event.as_i64() > ens) {
2456                out.pop();
2457            }
2458        }
2459
2460        // Clamp first bar for Range when using forward pagination
2461        if matches!(mode, Mode::Range)
2462            && !forward_prepend_mode
2463            && let Some(sns) = start_ns
2464        {
2465            let lower = sns.saturating_sub(slot_ns);
2466            while out.first().is_some_and(|b| b.ts_event.as_i64() < lower) {
2467                out.remove(0);
2468            }
2469        }
2470
2471        if let Some(lim) = limit
2472            && lim > 0
2473            && out.len() > lim as usize
2474        {
2475            out.truncate(lim as usize);
2476        }
2477
2478        Ok(out)
2479    }
2480
2481    /// Requests historical order status reports for the given parameters.
2482    ///
2483    /// # Errors
2484    ///
2485    /// Returns an error if the request fails.
2486    ///
2487    /// # References
2488    ///
2489    /// - <https://www.okx.com/docs-v5/en/#order-book-trading-trade-get-order-history-last-7-days>.
2490    /// - <https://www.okx.com/docs-v5/en/#order-book-trading-trade-get-order-history-last-3-months>.
2491    #[allow(clippy::too_many_arguments)]
2492    pub async fn request_order_status_reports(
2493        &self,
2494        account_id: AccountId,
2495        instrument_type: Option<OKXInstrumentType>,
2496        instrument_id: Option<InstrumentId>,
2497        start: Option<DateTime<Utc>>,
2498        end: Option<DateTime<Utc>>,
2499        open_only: bool,
2500        limit: Option<u32>,
2501    ) -> anyhow::Result<Vec<OrderStatusReport>> {
2502        let mut history_params = GetOrderHistoryParamsBuilder::default();
2503
2504        let instrument_type = if let Some(instrument_type) = instrument_type {
2505            instrument_type
2506        } else {
2507            let instrument_id = instrument_id.ok_or_else(|| {
2508                anyhow::anyhow!("Instrument ID required if `instrument_type` not provided")
2509            })?;
2510            let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
2511            okx_instrument_type(&instrument)?
2512        };
2513
2514        history_params.inst_type(instrument_type);
2515
2516        if let Some(instrument_id) = instrument_id.as_ref() {
2517            history_params.inst_id(instrument_id.symbol.inner().to_string());
2518        }
2519
2520        if let Some(limit) = limit {
2521            history_params.limit(limit);
2522        }
2523
2524        let history_params = history_params.build().map_err(|e| anyhow::anyhow!(e))?;
2525
2526        let mut pending_params = GetOrderListParamsBuilder::default();
2527        pending_params.inst_type(instrument_type);
2528
2529        if let Some(instrument_id) = instrument_id.as_ref() {
2530            pending_params.inst_id(instrument_id.symbol.inner().to_string());
2531        }
2532
2533        if let Some(limit) = limit {
2534            pending_params.limit(limit);
2535        }
2536
2537        let pending_params = pending_params.build().map_err(|e| anyhow::anyhow!(e))?;
2538
2539        let combined_resp = if open_only {
2540            // Only request pending/open orders
2541            self.inner
2542                .get_orders_pending(pending_params)
2543                .await
2544                .map_err(|e| anyhow::anyhow!(e))?
2545        } else {
2546            // Make both requests concurrently
2547            let (history_resp, pending_resp) = tokio::try_join!(
2548                self.inner.get_orders_history(history_params),
2549                self.inner.get_orders_pending(pending_params)
2550            )
2551            .map_err(|e| anyhow::anyhow!(e))?;
2552
2553            // Combine both responses
2554            let mut combined_resp = history_resp;
2555            combined_resp.extend(pending_resp);
2556            combined_resp
2557        };
2558
2559        // Prepare time range filter
2560        let start_ns = start.map(UnixNanos::from);
2561        let end_ns = end.map(UnixNanos::from);
2562
2563        let ts_init = self.generate_ts_init();
2564        let mut reports = Vec::with_capacity(combined_resp.len());
2565
2566        // Use a seen filter in case pending orders are within the histories "2hr reserve window"
2567        let mut seen: AHashSet<String> = AHashSet::new();
2568
2569        for order in combined_resp {
2570            let seen_key = if !order.cl_ord_id.is_empty() {
2571                order.cl_ord_id.as_str().to_string()
2572            } else if let Some(algo_cl_ord_id) = order
2573                .algo_cl_ord_id
2574                .as_ref()
2575                .filter(|value| !value.as_str().is_empty())
2576            {
2577                algo_cl_ord_id.as_str().to_string()
2578            } else if let Some(algo_id) = order
2579                .algo_id
2580                .as_ref()
2581                .filter(|value| !value.as_str().is_empty())
2582            {
2583                algo_id.as_str().to_string()
2584            } else {
2585                order.ord_id.as_str().to_string()
2586            };
2587
2588            if !seen.insert(seen_key) {
2589                continue; // Reserved pending already reported
2590            }
2591
2592            let Ok(inst) = self.instrument_from_cache(order.inst_id) else {
2593                log::debug!(
2594                    "Skipping order report for instrument not in cache: symbol={}",
2595                    order.inst_id,
2596                );
2597                continue;
2598            };
2599
2600            let report = match parse_order_status_report(
2601                &order,
2602                account_id,
2603                inst.id(),
2604                inst.price_precision(),
2605                inst.size_precision(),
2606                ts_init,
2607            ) {
2608                Ok(report) => report,
2609                Err(e) => {
2610                    log::error!("Failed to parse order status report: {e}");
2611                    continue;
2612                }
2613            };
2614
2615            if let Some(start_ns) = start_ns
2616                && report.ts_last < start_ns
2617            {
2618                continue;
2619            }
2620            if let Some(end_ns) = end_ns
2621                && report.ts_last > end_ns
2622            {
2623                continue;
2624            }
2625
2626            reports.push(report);
2627        }
2628
2629        Ok(reports)
2630    }
2631
2632    /// Requests fill reports (transaction details) for the given parameters.
2633    ///
2634    /// # Errors
2635    ///
2636    /// Returns an error if the request fails.
2637    ///
2638    /// # References
2639    ///
2640    /// <https://www.okx.com/docs-v5/en/#order-book-trading-trade-get-transaction-details-last-3-days>.
2641    pub async fn request_fill_reports(
2642        &self,
2643        account_id: AccountId,
2644        instrument_type: Option<OKXInstrumentType>,
2645        instrument_id: Option<InstrumentId>,
2646        start: Option<DateTime<Utc>>,
2647        end: Option<DateTime<Utc>>,
2648        limit: Option<u32>,
2649    ) -> anyhow::Result<Vec<FillReport>> {
2650        let mut params = GetTransactionDetailsParamsBuilder::default();
2651
2652        let instrument_type = if let Some(instrument_type) = instrument_type {
2653            instrument_type
2654        } else {
2655            let instrument_id = instrument_id.ok_or_else(|| {
2656                anyhow::anyhow!("Instrument ID required if `instrument_type` not provided")
2657            })?;
2658            let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
2659            okx_instrument_type(&instrument)?
2660        };
2661
2662        params.inst_type(instrument_type);
2663
2664        if let Some(instrument_id) = instrument_id {
2665            let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
2666            let instrument_type = okx_instrument_type(&instrument)?;
2667            params.inst_type(instrument_type);
2668            params.inst_id(instrument_id.symbol.inner().to_string());
2669        }
2670
2671        if let Some(limit) = limit {
2672            params.limit(limit);
2673        }
2674
2675        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2676
2677        let resp = self
2678            .inner
2679            .get_fills(params)
2680            .await
2681            .map_err(|e| anyhow::anyhow!(e))?;
2682
2683        // Prepare time range filter
2684        let start_ns = start.map(UnixNanos::from);
2685        let end_ns = end.map(UnixNanos::from);
2686
2687        let ts_init = self.generate_ts_init();
2688        let mut reports = Vec::with_capacity(resp.len());
2689
2690        for detail in resp {
2691            // Skip fills with zero or negative quantity (cancelled orders, etc)
2692            if detail.fill_sz.is_empty() {
2693                continue;
2694            }
2695            if let Ok(qty) = detail.fill_sz.parse::<f64>() {
2696                if qty <= 0.0 {
2697                    continue;
2698                }
2699            } else {
2700                // Skip unparsable quantities
2701                continue;
2702            }
2703
2704            let Ok(inst) = self.instrument_from_cache(detail.inst_id) else {
2705                log::debug!(
2706                    "Skipping fill report for instrument not in cache: symbol={}",
2707                    detail.inst_id,
2708                );
2709                continue;
2710            };
2711
2712            let report = match parse_fill_report(
2713                detail,
2714                account_id,
2715                inst.id(),
2716                inst.price_precision(),
2717                inst.size_precision(),
2718                ts_init,
2719            ) {
2720                Ok(report) => report,
2721                Err(e) => {
2722                    log::error!("Failed to parse fill report: {e}");
2723                    continue;
2724                }
2725            };
2726
2727            if let Some(start_ns) = start_ns
2728                && report.ts_event < start_ns
2729            {
2730                continue;
2731            }
2732
2733            if let Some(end_ns) = end_ns
2734                && report.ts_event > end_ns
2735            {
2736                continue;
2737            }
2738
2739            reports.push(report);
2740        }
2741
2742        Ok(reports)
2743    }
2744
2745    /// Requests current position status reports for the given parameters.
2746    ///
2747    /// # Position Modes
2748    ///
2749    /// OKX supports two position modes, which affects how position data is returned:
2750    ///
2751    /// ## Net Mode (One-way)
2752    /// - `posSide` field will be `"net"`
2753    /// - `pos` field uses **signed quantities**:
2754    ///   - Positive value = Long position
2755    ///   - Negative value = Short position
2756    ///   - Zero = Flat/no position
2757    ///
2758    /// ## Long/Short Mode (Hedge/Dual-side)
2759    /// - `posSide` field will be `"long"` or `"short"`
2760    /// - `pos` field is **always positive** (use `posSide` to determine actual side)
2761    /// - Allows holding simultaneous long and short positions on the same instrument
2762    /// - Position IDs are suffixed with `-LONG` or `-SHORT` for uniqueness
2763    ///
2764    /// # Errors
2765    ///
2766    /// Returns an error if the request fails.
2767    ///
2768    /// # References
2769    ///
2770    /// <https://www.okx.com/docs-v5/en/#trading-account-rest-api-get-positions>
2771    pub async fn request_position_status_reports(
2772        &self,
2773        account_id: AccountId,
2774        instrument_type: Option<OKXInstrumentType>,
2775        instrument_id: Option<InstrumentId>,
2776    ) -> anyhow::Result<Vec<PositionStatusReport>> {
2777        let mut params = GetPositionsParamsBuilder::default();
2778
2779        let instrument_type = if let Some(instrument_type) = instrument_type {
2780            instrument_type
2781        } else {
2782            let instrument_id = instrument_id.ok_or_else(|| {
2783                anyhow::anyhow!("Instrument ID required if `instrument_type` not provided")
2784            })?;
2785            let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
2786            okx_instrument_type(&instrument)?
2787        };
2788
2789        params.inst_type(instrument_type);
2790
2791        instrument_id
2792            .as_ref()
2793            .map(|i| params.inst_id(i.symbol.inner()));
2794
2795        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2796
2797        let resp = self
2798            .inner
2799            .get_positions(params)
2800            .await
2801            .map_err(|e| anyhow::anyhow!(e))?;
2802
2803        let ts_init = self.generate_ts_init();
2804        let mut reports = Vec::with_capacity(resp.len());
2805
2806        for position in resp {
2807            let Ok(inst) = self.instrument_from_cache(position.inst_id) else {
2808                log::debug!(
2809                    "Skipping position report for instrument not in cache: symbol={}",
2810                    position.inst_id,
2811                );
2812                continue;
2813            };
2814
2815            match parse_position_status_report(
2816                position,
2817                account_id,
2818                inst.id(),
2819                inst.size_precision(),
2820                ts_init,
2821            ) {
2822                Ok(report) => reports.push(report),
2823                Err(e) => {
2824                    log::error!("Failed to parse position status report: {e}");
2825                    continue;
2826                }
2827            };
2828        }
2829
2830        Ok(reports)
2831    }
2832
2833    /// Requests spot margin position status reports from account balance.
2834    ///
2835    /// Spot margin positions appear in `/api/v5/account/balance` as balance sheet items
2836    /// with non-zero `liab` (liability) or `spotInUseAmt` fields, rather than in the
2837    /// positions endpoint. This method fetches the balance and converts any margin
2838    /// positions into position status reports.
2839    ///
2840    /// # Errors
2841    ///
2842    /// Returns an error if the request fails or parsing fails.
2843    ///
2844    /// # References
2845    ///
2846    /// <https://www.okx.com/docs-v5/en/#trading-account-rest-api-get-balance>
2847    pub async fn request_spot_margin_position_reports(
2848        &self,
2849        account_id: AccountId,
2850    ) -> anyhow::Result<Vec<PositionStatusReport>> {
2851        let accounts = self
2852            .inner
2853            .get_balance()
2854            .await
2855            .map_err(|e| anyhow::anyhow!(e))?;
2856
2857        let ts_init = self.generate_ts_init();
2858        let mut reports = Vec::new();
2859
2860        for account in accounts {
2861            for balance in account.details {
2862                let ccy_str = balance.ccy.as_str();
2863
2864                // Try to find instrument by constructing potential spot pair symbols
2865                let potential_symbols = [
2866                    format!("{ccy_str}-USDT"),
2867                    format!("{ccy_str}-USD"),
2868                    format!("{ccy_str}-USDC"),
2869                ];
2870
2871                let instrument_result = potential_symbols.iter().find_map(|symbol| {
2872                    self.instrument_from_cache(Ustr::from(symbol))
2873                        .ok()
2874                        .map(|inst| (inst.id(), inst.size_precision()))
2875                });
2876
2877                let (instrument_id, size_precision) = match instrument_result {
2878                    Some((id, prec)) => (id, prec),
2879                    None => {
2880                        log::debug!(
2881                            "Skipping balance for {ccy_str} - no matching instrument in cache"
2882                        );
2883                        continue;
2884                    }
2885                };
2886
2887                match parse_spot_margin_position_from_balance(
2888                    &balance,
2889                    account_id,
2890                    instrument_id,
2891                    size_precision,
2892                    ts_init,
2893                ) {
2894                    Ok(Some(report)) => reports.push(report),
2895                    Ok(None) => {} // No margin position for this currency
2896                    Err(e) => {
2897                        log::error!(
2898                            "Failed to parse spot margin position from balance for {ccy_str}: {e}"
2899                        );
2900                        continue;
2901                    }
2902                };
2903            }
2904        }
2905
2906        Ok(reports)
2907    }
2908
2909    /// Places an algo order via HTTP.
2910    ///
2911    /// # Errors
2912    ///
2913    /// Returns an error if the request fails.
2914    ///
2915    /// # References
2916    ///
2917    /// <https://www.okx.com/docs-v5/en/#order-book-trading-algo-trading-post-place-algo-order>
2918    pub async fn place_algo_order(
2919        &self,
2920        request: OKXPlaceAlgoOrderRequest,
2921    ) -> Result<OKXPlaceAlgoOrderResponse, OKXHttpError> {
2922        let body =
2923            serde_json::to_vec(&request).map_err(|e| OKXHttpError::JsonError(e.to_string()))?;
2924
2925        let resp: Vec<OKXPlaceAlgoOrderResponse> = self
2926            .inner
2927            .send_request::<_, ()>(
2928                Method::POST,
2929                "/api/v5/trade/order-algo",
2930                None,
2931                Some(body),
2932                true,
2933            )
2934            .await?;
2935
2936        resp.into_iter()
2937            .next()
2938            .ok_or_else(|| OKXHttpError::ValidationError("Empty response".to_string()))
2939    }
2940
2941    /// Cancels an algo order via HTTP.
2942    ///
2943    /// # Errors
2944    ///
2945    /// Returns an error if the request fails.
2946    ///
2947    /// # References
2948    ///
2949    /// <https://www.okx.com/docs-v5/en/#order-book-trading-algo-trading-post-cancel-algo-order>
2950    pub async fn cancel_algo_order(
2951        &self,
2952        request: OKXCancelAlgoOrderRequest,
2953    ) -> Result<OKXCancelAlgoOrderResponse, OKXHttpError> {
2954        // OKX expects an array for cancel-algos endpoint
2955        // Serialize once to bytes to keep signing and sending identical
2956        let body =
2957            serde_json::to_vec(&[request]).map_err(|e| OKXHttpError::JsonError(e.to_string()))?;
2958
2959        let resp: Vec<OKXCancelAlgoOrderResponse> = self
2960            .inner
2961            .send_request::<_, ()>(
2962                Method::POST,
2963                "/api/v5/trade/cancel-algos",
2964                None,
2965                Some(body),
2966                true,
2967            )
2968            .await?;
2969
2970        resp.into_iter()
2971            .next()
2972            .ok_or_else(|| OKXHttpError::ValidationError("Empty response".to_string()))
2973    }
2974
2975    /// Places an algo order using domain types.
2976    ///
2977    /// This is a convenience method that accepts Nautilus domain types
2978    /// and builds the appropriate OKX request structure internally.
2979    ///
2980    /// # Errors
2981    ///
2982    /// Returns an error if the request fails.
2983    #[allow(clippy::too_many_arguments)]
2984    pub async fn place_algo_order_with_domain_types(
2985        &self,
2986        instrument_id: InstrumentId,
2987        td_mode: OKXTradeMode,
2988        client_order_id: ClientOrderId,
2989        order_side: OrderSide,
2990        order_type: OrderType,
2991        quantity: Quantity,
2992        trigger_price: Price,
2993        trigger_type: Option<TriggerType>,
2994        limit_price: Option<Price>,
2995        reduce_only: Option<bool>,
2996    ) -> Result<OKXPlaceAlgoOrderResponse, OKXHttpError> {
2997        if !matches!(order_side, OrderSide::Buy | OrderSide::Sell) {
2998            return Err(OKXHttpError::ValidationError(
2999                "Invalid order side".to_string(),
3000            ));
3001        }
3002        let okx_side: OKXSide = order_side.into();
3003
3004        // Map trigger type to OKX format
3005        let trigger_px_type_enum = trigger_type.map_or(OKXTriggerType::Last, Into::into);
3006
3007        // Determine order price based on order type
3008        let order_px = if matches!(order_type, OrderType::StopLimit | OrderType::LimitIfTouched) {
3009            limit_price.map(|p| p.to_string())
3010        } else {
3011            // Market orders use -1 to indicate market execution
3012            Some("-1".to_string())
3013        };
3014
3015        let request = OKXPlaceAlgoOrderRequest {
3016            inst_id: instrument_id.symbol.as_str().to_string(),
3017            td_mode,
3018            side: okx_side,
3019            ord_type: OKXAlgoOrderType::Trigger, // All conditional orders use 'trigger' type
3020            sz: quantity.to_string(),
3021            algo_cl_ord_id: Some(client_order_id.as_str().to_string()),
3022            trigger_px: Some(trigger_price.to_string()),
3023            order_px,
3024            trigger_px_type: Some(trigger_px_type_enum),
3025            tgt_ccy: None,  // Let OKX determine based on instrument
3026            pos_side: None, // Use default position side
3027            close_position: None,
3028            tag: Some(OKX_NAUTILUS_BROKER_ID.to_string()),
3029            reduce_only,
3030        };
3031
3032        self.place_algo_order(request).await
3033    }
3034
3035    /// Cancels an algo order using domain types.
3036    ///
3037    /// This is a convenience method that accepts Nautilus domain types
3038    /// and builds the appropriate OKX request structure internally.
3039    ///
3040    /// # Errors
3041    ///
3042    /// Returns an error if the request fails.
3043    pub async fn cancel_algo_order_with_domain_types(
3044        &self,
3045        instrument_id: InstrumentId,
3046        algo_id: String,
3047    ) -> Result<OKXCancelAlgoOrderResponse, OKXHttpError> {
3048        let request = OKXCancelAlgoOrderRequest {
3049            inst_id: instrument_id.symbol.to_string(),
3050            algo_id: Some(algo_id),
3051            algo_cl_ord_id: None,
3052        };
3053
3054        self.cancel_algo_order(request).await
3055    }
3056
3057    /// Requests algo order status reports.
3058    ///
3059    /// # Errors
3060    ///
3061    /// Returns an error if the request fails.
3062    #[allow(clippy::too_many_arguments)]
3063    pub async fn request_algo_order_status_reports(
3064        &self,
3065        account_id: AccountId,
3066        instrument_type: Option<OKXInstrumentType>,
3067        instrument_id: Option<InstrumentId>,
3068        algo_id: Option<String>,
3069        algo_client_order_id: Option<ClientOrderId>,
3070        state: Option<OKXOrderStatus>,
3071        limit: Option<u32>,
3072    ) -> anyhow::Result<Vec<OrderStatusReport>> {
3073        let mut instruments_cache: AHashMap<Ustr, InstrumentAny> = AHashMap::new();
3074
3075        let inst_type = if let Some(inst_type) = instrument_type {
3076            inst_type
3077        } else if let Some(inst_id) = instrument_id {
3078            let instrument = self.instrument_from_cache(inst_id.symbol.inner())?;
3079            let inst_type = okx_instrument_type(&instrument)?;
3080            instruments_cache.insert(inst_id.symbol.inner(), instrument);
3081            inst_type
3082        } else {
3083            anyhow::bail!("instrument_type or instrument_id required for algo order query")
3084        };
3085
3086        let mut params_builder = GetAlgoOrdersParamsBuilder::default();
3087        params_builder.inst_type(inst_type);
3088        if let Some(inst_id) = instrument_id {
3089            params_builder.inst_id(inst_id.symbol.inner().to_string());
3090        }
3091        if let Some(algo_id) = algo_id.as_ref() {
3092            params_builder.algo_id(algo_id.clone());
3093        }
3094        if let Some(client_order_id) = algo_client_order_id.as_ref() {
3095            params_builder.algo_cl_ord_id(client_order_id.as_str().to_string());
3096        }
3097        if let Some(state) = state {
3098            params_builder.state(state);
3099        }
3100        if let Some(limit) = limit {
3101            params_builder.limit(limit);
3102        }
3103
3104        let params = params_builder
3105            .build()
3106            .map_err(|e| anyhow::anyhow!(format!("Failed to build algo order params: {e}")))?;
3107
3108        let ts_init = self.generate_ts_init();
3109        let mut reports = Vec::new();
3110        let mut seen: AHashSet<(String, String)> = AHashSet::new();
3111
3112        let pending = match self.inner.get_order_algo_pending(params.clone()).await {
3113            Ok(result) => result,
3114            Err(OKXHttpError::UnexpectedStatus { status, .. })
3115                if status == StatusCode::NOT_FOUND =>
3116            {
3117                Vec::new()
3118            }
3119            Err(e) => return Err(e.into()),
3120        };
3121        self.collect_algo_reports(
3122            account_id,
3123            &pending,
3124            &mut instruments_cache,
3125            ts_init,
3126            &mut seen,
3127            &mut reports,
3128        )
3129        .await?;
3130
3131        let history = match self.inner.get_order_algo_history(params).await {
3132            Ok(result) => result,
3133            Err(OKXHttpError::UnexpectedStatus { status, .. })
3134                if status == StatusCode::NOT_FOUND =>
3135            {
3136                Vec::new()
3137            }
3138            Err(e) => return Err(e.into()),
3139        };
3140        self.collect_algo_reports(
3141            account_id,
3142            &history,
3143            &mut instruments_cache,
3144            ts_init,
3145            &mut seen,
3146            &mut reports,
3147        )
3148        .await?;
3149
3150        Ok(reports)
3151    }
3152
3153    /// Requests an algo order status report by client order identifier.
3154    ///
3155    /// # Errors
3156    ///
3157    /// Returns an error if the request fails.
3158    pub async fn request_algo_order_status_report(
3159        &self,
3160        account_id: AccountId,
3161        instrument_id: InstrumentId,
3162        algo_client_order_id: ClientOrderId,
3163    ) -> anyhow::Result<Option<OrderStatusReport>> {
3164        let reports = self
3165            .request_algo_order_status_reports(
3166                account_id,
3167                None,
3168                Some(instrument_id),
3169                None,
3170                Some(algo_client_order_id),
3171                None,
3172                Some(50_u32),
3173            )
3174            .await?;
3175
3176        Ok(reports.into_iter().next())
3177    }
3178
3179    /// Exposes raw HTTP client for testing purposes
3180    pub fn raw_client(&self) -> &Arc<OKXRawHttpClient> {
3181        &self.inner
3182    }
3183
3184    async fn collect_algo_reports(
3185        &self,
3186        account_id: AccountId,
3187        orders: &[OKXOrderAlgo],
3188        instruments_cache: &mut AHashMap<Ustr, InstrumentAny>,
3189        ts_init: UnixNanos,
3190        seen: &mut AHashSet<(String, String)>,
3191        reports: &mut Vec<OrderStatusReport>,
3192    ) -> anyhow::Result<()> {
3193        for order in orders {
3194            let key = (order.algo_id.clone(), order.algo_cl_ord_id.clone());
3195            if !seen.insert(key) {
3196                continue;
3197            }
3198
3199            let instrument = if let Some(instrument) = instruments_cache.get(&order.inst_id) {
3200                instrument.clone()
3201            } else {
3202                let Ok(instrument) = self.instrument_from_cache(order.inst_id) else {
3203                    log::debug!(
3204                        "Skipping algo order report for instrument not in cache: symbol={}",
3205                        order.inst_id,
3206                    );
3207                    continue;
3208                };
3209                instruments_cache.insert(order.inst_id, instrument.clone());
3210                instrument
3211            };
3212
3213            match parse_http_algo_order(order, account_id, &instrument, ts_init) {
3214                Ok(report) => reports.push(report),
3215                Err(e) => {
3216                    log::error!("Failed to parse algo order report: {e}");
3217                }
3218            }
3219        }
3220
3221        Ok(())
3222    }
3223}
3224
3225fn parse_http_algo_order(
3226    order: &OKXOrderAlgo,
3227    account_id: AccountId,
3228    instrument: &InstrumentAny,
3229    ts_init: UnixNanos,
3230) -> anyhow::Result<OrderStatusReport> {
3231    let ord_px = if order.ord_px.is_empty() {
3232        "-1".to_string()
3233    } else {
3234        order.ord_px.clone()
3235    };
3236
3237    let reduce_only = if order.reduce_only.is_empty() {
3238        "false".to_string()
3239    } else {
3240        order.reduce_only.clone()
3241    };
3242
3243    let msg = OKXAlgoOrderMsg {
3244        algo_id: order.algo_id.clone(),
3245        algo_cl_ord_id: order.algo_cl_ord_id.clone(),
3246        cl_ord_id: order.cl_ord_id.clone(),
3247        ord_id: order.ord_id.clone(),
3248        inst_id: order.inst_id,
3249        inst_type: order.inst_type,
3250        ord_type: order.ord_type,
3251        state: order.state,
3252        side: order.side,
3253        pos_side: order.pos_side,
3254        sz: order.sz.clone(),
3255        trigger_px: order.trigger_px.clone(),
3256        trigger_px_type: order.trigger_px_type.unwrap_or(OKXTriggerType::None),
3257        ord_px,
3258        td_mode: order.td_mode,
3259        lever: order.lever.clone(),
3260        reduce_only,
3261        actual_px: order.actual_px.clone(),
3262        actual_sz: order.actual_sz.clone(),
3263        notional_usd: order.notional_usd.clone(),
3264        c_time: order.c_time,
3265        u_time: order.u_time,
3266        trigger_time: order.trigger_time.clone(),
3267        tag: order.tag.clone(),
3268    };
3269
3270    parse_algo_order_status_report(&msg, instrument, account_id, ts_init)
3271}