nautilus_okx/http/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides an ergonomic wrapper around the **OKX v5 REST API** –
17//! <https://www.okx.com/docs-v5/en/>.
18//!
19//! The core type exported by this module is [`OKXHttpClient`].  It offers an
20//! interface to all exchange endpoints currently required by NautilusTrader.
21//!
22//! Key responsibilities handled internally:
23//! • Request signing and header composition for private routes (HMAC-SHA256).
24//! • Rate-limiting based on the public OKX specification.
25//! • Deserialization of JSON payloads into domain models.
26//! • Conversion of raw exchange errors into the rich [`OKXHttpError`] enum.
27//!
28//! # Official documentation
29//!
30//! | Endpoint                             | Reference                                              |
31//! |--------------------------------------|--------------------------------------------------------|
32//! | Market data                          | <https://www.okx.com/docs-v5/en/#rest-api-market-data> |
33//! | Account & positions                  | <https://www.okx.com/docs-v5/en/#rest-api-account>     |
34//! | Funding & asset balances             | <https://www.okx.com/docs-v5/en/#rest-api-funding>     |
35
36use std::{
37    collections::HashMap,
38    fmt::{Debug, Formatter},
39    num::NonZeroU32,
40    str::FromStr,
41    sync::{
42        Arc, LazyLock,
43        atomic::{AtomicBool, Ordering},
44    },
45};
46
47use ahash::{AHashMap, AHashSet};
48use chrono::{DateTime, Utc};
49use dashmap::DashMap;
50use nautilus_core::{
51    UnixNanos, consts::NAUTILUS_USER_AGENT, env::get_or_env_var, time::get_atomic_clock_realtime,
52};
53use nautilus_model::{
54    data::{Bar, BarType, IndexPriceUpdate, MarkPriceUpdate, TradeTick},
55    enums::{AggregationSource, BarAggregation, OrderSide, OrderType, TriggerType},
56    events::AccountState,
57    identifiers::{AccountId, ClientOrderId, InstrumentId},
58    instruments::{Instrument, InstrumentAny},
59    reports::{FillReport, OrderStatusReport, PositionStatusReport},
60    types::{Price, Quantity},
61};
62use nautilus_network::{
63    http::{HttpClient, Method, StatusCode, USER_AGENT},
64    ratelimiter::quota::Quota,
65    retry::{RetryConfig, RetryManager},
66};
67use rust_decimal::Decimal;
68use serde::{Deserialize, Serialize, de::DeserializeOwned};
69use tokio_util::sync::CancellationToken;
70use ustr::Ustr;
71
72use super::{
73    error::OKXHttpError,
74    models::{
75        OKXAccount, OKXCancelAlgoOrderRequest, OKXCancelAlgoOrderResponse, OKXFeeRate,
76        OKXIndexTicker, OKXMarkPrice, OKXOrderAlgo, OKXOrderHistory, OKXPlaceAlgoOrderRequest,
77        OKXPlaceAlgoOrderResponse, OKXPosition, OKXPositionHistory, OKXPositionTier, OKXServerTime,
78        OKXTransactionDetail,
79    },
80    query::{
81        GetAlgoOrdersParams, GetAlgoOrdersParamsBuilder, GetCandlesticksParams,
82        GetCandlesticksParamsBuilder, GetIndexTickerParams, GetIndexTickerParamsBuilder,
83        GetInstrumentsParams, GetInstrumentsParamsBuilder, GetMarkPriceParams,
84        GetMarkPriceParamsBuilder, GetOrderHistoryParams, GetOrderHistoryParamsBuilder,
85        GetOrderListParams, GetOrderListParamsBuilder, GetPositionTiersParams,
86        GetPositionsHistoryParams, GetPositionsParams, GetPositionsParamsBuilder,
87        GetTradeFeeParams, GetTradesParams, GetTradesParamsBuilder, GetTransactionDetailsParams,
88        GetTransactionDetailsParamsBuilder, SetPositionModeParams, SetPositionModeParamsBuilder,
89    },
90};
91use crate::{
92    common::{
93        consts::{OKX_HTTP_URL, OKX_NAUTILUS_BROKER_ID, should_retry_error_code},
94        credential::Credential,
95        enums::{
96            OKXAlgoOrderType, OKXContractType, OKXInstrumentStatus, OKXInstrumentType,
97            OKXOrderStatus, OKXPositionMode, OKXSide, OKXTradeMode, OKXTriggerType,
98        },
99        models::OKXInstrument,
100        parse::{
101            okx_instrument_type, okx_instrument_type_from_symbol, parse_account_state,
102            parse_candlestick, parse_fill_report, parse_index_price_update, parse_instrument_any,
103            parse_mark_price_update, parse_order_status_report, parse_position_status_report,
104            parse_spot_margin_position_from_balance, parse_trade_tick,
105        },
106    },
107    http::{
108        models::{OKXCandlestick, OKXTrade},
109        query::GetOrderParams,
110    },
111    websocket::{messages::OKXAlgoOrderMsg, parse::parse_algo_order_status_report},
112};
113
114const OKX_SUCCESS_CODE: &str = "0";
115
116/// Default OKX REST API rate limit: 500 requests per 2 seconds.
117///
118/// - Sub-account order limit: 1000 requests per 2 seconds.
119/// - Account balance: 10 requests per 2 seconds.
120/// - Account instruments: 20 requests per 2 seconds.
121///
122/// We use a conservative 250 requests per second (500 per 2 seconds) as a general limit
123/// that should accommodate most use cases while respecting OKX's documented limits.
124pub static OKX_REST_QUOTA: LazyLock<Quota> =
125    LazyLock::new(|| Quota::per_second(NonZeroU32::new(250).unwrap()));
126
127const OKX_GLOBAL_RATE_KEY: &str = "okx:global";
128
129/// Represents an OKX HTTP response.
130#[derive(Debug, Serialize, Deserialize)]
131pub struct OKXResponse<T> {
132    /// The OKX response code, which is `"0"` for success.
133    pub code: String,
134    /// A message string which can be informational or describe an error cause.
135    pub msg: String,
136    /// The typed data returned by the OKX endpoint.
137    pub data: Vec<T>,
138}
139
140/// Provides a raw HTTP client for interacting with the [OKX](https://okx.com) REST API.
141///
142/// This client wraps the underlying [`HttpClient`] to handle functionality
143/// specific to OKX, such as request signing (for authenticated endpoints),
144/// forming request URLs, and deserializing responses into OKX specific data models.
145pub struct OKXRawHttpClient {
146    base_url: String,
147    client: HttpClient,
148    credential: Option<Credential>,
149    retry_manager: RetryManager<OKXHttpError>,
150    cancellation_token: CancellationToken,
151    is_demo: bool,
152}
153
154impl Default for OKXRawHttpClient {
155    fn default() -> Self {
156        Self::new(None, Some(60), None, None, None, false, None)
157            .expect("Failed to create default OKXRawHttpClient")
158    }
159}
160
161impl Debug for OKXRawHttpClient {
162    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
163        let credential = self.credential.as_ref().map(|_| "<redacted>");
164        f.debug_struct(stringify!(OKXRawHttpClient))
165            .field("base_url", &self.base_url)
166            .field("credential", &credential)
167            .finish_non_exhaustive()
168    }
169}
170
171impl OKXRawHttpClient {
172    fn rate_limiter_quotas() -> Vec<(String, Quota)> {
173        vec![
174            (OKX_GLOBAL_RATE_KEY.to_string(), *OKX_REST_QUOTA),
175            (
176                "okx:/api/v5/account/balance".to_string(),
177                Quota::per_second(NonZeroU32::new(5).unwrap()),
178            ),
179            (
180                "okx:/api/v5/public/instruments".to_string(),
181                Quota::per_second(NonZeroU32::new(10).unwrap()),
182            ),
183            (
184                "okx:/api/v5/market/candles".to_string(),
185                Quota::per_second(NonZeroU32::new(50).unwrap()),
186            ),
187            (
188                "okx:/api/v5/market/history-candles".to_string(),
189                Quota::per_second(NonZeroU32::new(20).unwrap()),
190            ),
191            (
192                "okx:/api/v5/market/history-trades".to_string(),
193                Quota::per_second(NonZeroU32::new(30).unwrap()),
194            ),
195            (
196                "okx:/api/v5/trade/order".to_string(),
197                Quota::per_second(NonZeroU32::new(30).unwrap()), // 60 requests / 2 seconds (per instrument)
198            ),
199            (
200                "okx:/api/v5/trade/orders-pending".to_string(),
201                Quota::per_second(NonZeroU32::new(20).unwrap()),
202            ),
203            (
204                "okx:/api/v5/trade/orders-history".to_string(),
205                Quota::per_second(NonZeroU32::new(20).unwrap()),
206            ),
207            (
208                "okx:/api/v5/trade/fills".to_string(),
209                Quota::per_second(NonZeroU32::new(30).unwrap()),
210            ),
211            (
212                "okx:/api/v5/trade/order-algo".to_string(),
213                Quota::per_second(NonZeroU32::new(10).unwrap()),
214            ),
215            (
216                "okx:/api/v5/trade/cancel-algos".to_string(),
217                Quota::per_second(NonZeroU32::new(10).unwrap()),
218            ),
219        ]
220    }
221
222    fn rate_limit_keys(endpoint: &str) -> Vec<Ustr> {
223        let normalized = endpoint.split('?').next().unwrap_or(endpoint);
224        let route = format!("okx:{normalized}");
225
226        vec![Ustr::from(OKX_GLOBAL_RATE_KEY), Ustr::from(route.as_str())]
227    }
228
229    /// Cancel all pending HTTP requests.
230    pub fn cancel_all_requests(&self) {
231        self.cancellation_token.cancel();
232    }
233
234    /// Get the cancellation token for this client.
235    pub fn cancellation_token(&self) -> &CancellationToken {
236        &self.cancellation_token
237    }
238
239    /// Creates a new [`OKXHttpClient`] using the default OKX HTTP URL,
240    /// optionally overridden with a custom base URL.
241    ///
242    /// This version of the client has **no credentials**, so it can only
243    /// call publicly accessible endpoints.
244    ///
245    /// # Errors
246    ///
247    /// Returns an error if the retry manager cannot be created.
248    pub fn new(
249        base_url: Option<String>,
250        timeout_secs: Option<u64>,
251        max_retries: Option<u32>,
252        retry_delay_ms: Option<u64>,
253        retry_delay_max_ms: Option<u64>,
254        is_demo: bool,
255        proxy_url: Option<String>,
256    ) -> Result<Self, OKXHttpError> {
257        let retry_config = RetryConfig {
258            max_retries: max_retries.unwrap_or(3),
259            initial_delay_ms: retry_delay_ms.unwrap_or(1000),
260            max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
261            backoff_factor: 2.0,
262            jitter_ms: 1000,
263            operation_timeout_ms: Some(60_000),
264            immediate_first: false,
265            max_elapsed_ms: Some(180_000),
266        };
267
268        let retry_manager = RetryManager::new(retry_config);
269
270        Ok(Self {
271            base_url: base_url.unwrap_or(OKX_HTTP_URL.to_string()),
272            client: HttpClient::new(
273                Self::default_headers(is_demo),
274                vec![],
275                Self::rate_limiter_quotas(),
276                Some(*OKX_REST_QUOTA),
277                timeout_secs,
278                proxy_url,
279            )
280            .map_err(|e| {
281                OKXHttpError::ValidationError(format!("Failed to create HTTP client: {e}"))
282            })?,
283            credential: None,
284            retry_manager,
285            cancellation_token: CancellationToken::new(),
286            is_demo,
287        })
288    }
289
290    /// Creates a new [`OKXHttpClient`] configured with credentials
291    /// for authenticated requests, optionally using a custom base URL.
292    ///
293    /// # Errors
294    ///
295    /// Returns an error if the retry manager cannot be created.
296    #[allow(clippy::too_many_arguments)]
297    pub fn with_credentials(
298        api_key: String,
299        api_secret: String,
300        api_passphrase: String,
301        base_url: String,
302        timeout_secs: Option<u64>,
303        max_retries: Option<u32>,
304        retry_delay_ms: Option<u64>,
305        retry_delay_max_ms: Option<u64>,
306        is_demo: bool,
307        proxy_url: Option<String>,
308    ) -> Result<Self, OKXHttpError> {
309        let retry_config = RetryConfig {
310            max_retries: max_retries.unwrap_or(3),
311            initial_delay_ms: retry_delay_ms.unwrap_or(1000),
312            max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
313            backoff_factor: 2.0,
314            jitter_ms: 1000,
315            operation_timeout_ms: Some(60_000),
316            immediate_first: false,
317            max_elapsed_ms: Some(180_000),
318        };
319
320        let retry_manager = RetryManager::new(retry_config);
321
322        Ok(Self {
323            base_url,
324            client: HttpClient::new(
325                Self::default_headers(is_demo),
326                vec![],
327                Self::rate_limiter_quotas(),
328                Some(*OKX_REST_QUOTA),
329                timeout_secs,
330                proxy_url,
331            )
332            .map_err(|e| {
333                OKXHttpError::ValidationError(format!("Failed to create HTTP client: {e}"))
334            })?,
335            credential: Some(Credential::new(api_key, api_secret, api_passphrase)),
336            retry_manager,
337            cancellation_token: CancellationToken::new(),
338            is_demo,
339        })
340    }
341
342    /// Builds the default headers to include with each request (e.g., `User-Agent`).
343    fn default_headers(is_demo: bool) -> HashMap<String, String> {
344        let mut headers =
345            HashMap::from([(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())]);
346
347        if is_demo {
348            headers.insert("x-simulated-trading".to_string(), "1".to_string());
349        }
350
351        headers
352    }
353
354    /// Signs an OKX request with timestamp, API key, passphrase, and signature.
355    ///
356    /// # Errors
357    ///
358    /// Returns [`OKXHttpError::MissingCredentials`] if no credentials are set
359    /// but the request requires authentication.
360    fn sign_request(
361        &self,
362        method: &Method,
363        path: &str,
364        body: Option<&[u8]>,
365    ) -> Result<HashMap<String, String>, OKXHttpError> {
366        let credential = match self.credential.as_ref() {
367            Some(c) => c,
368            None => return Err(OKXHttpError::MissingCredentials),
369        };
370
371        let api_key = credential.api_key.to_string();
372        let api_passphrase = credential.api_passphrase.clone();
373
374        // OKX requires milliseconds in the timestamp (ISO 8601 with milliseconds)
375        let now = Utc::now();
376        let millis = now.timestamp_subsec_millis();
377        let timestamp = now.format("%Y-%m-%dT%H:%M:%S").to_string() + &format!(".{millis:03}Z");
378        let signature = credential.sign_bytes(&timestamp, method.as_str(), path, body);
379
380        let mut headers = HashMap::new();
381        headers.insert("OK-ACCESS-KEY".to_string(), api_key);
382        headers.insert("OK-ACCESS-PASSPHRASE".to_string(), api_passphrase);
383        headers.insert("OK-ACCESS-TIMESTAMP".to_string(), timestamp);
384        headers.insert("OK-ACCESS-SIGN".to_string(), signature);
385
386        Ok(headers)
387    }
388
389    /// Sends an HTTP request to OKX and parses the response into `Vec<T>`.
390    ///
391    /// Internally, this method handles:
392    /// - Building the URL from `base_url` + `path`.
393    /// - Optionally signing the request.
394    /// - Deserializing JSON responses into typed models, or returning a [`OKXHttpError`].
395    /// - Retrying with exponential backoff on transient errors.
396    ///
397    /// # Errors
398    ///
399    /// Returns an error if:
400    /// - The HTTP request fails.
401    /// - Authentication is required but credentials are missing.
402    /// - The response cannot be deserialized into the expected type.
403    /// - The OKX API returns an error response.
404    async fn send_request<T: DeserializeOwned, P: Serialize>(
405        &self,
406        method: Method,
407        path: &str,
408        params: Option<&P>,
409        body: Option<Vec<u8>>,
410        authenticate: bool,
411    ) -> Result<Vec<T>, OKXHttpError> {
412        let url = format!("{}{path}", self.base_url);
413
414        // Pre-compute rate limit keys once outside the retry closure
415        let rate_keys: Vec<String> = Self::rate_limit_keys(path)
416            .into_iter()
417            .map(|k| k.to_string())
418            .collect();
419
420        let operation = || {
421            let url = url.clone();
422            let method = method.clone();
423            let body = body.clone();
424            let rate_keys = rate_keys.clone();
425
426            async move {
427                // Serialize params to query string for signing (if needed)
428                let query_string = if let Some(p) = params {
429                    serde_urlencoded::to_string(p).map_err(|e| {
430                        OKXHttpError::JsonError(format!("Failed to serialize params: {e}"))
431                    })?
432                } else {
433                    String::new()
434                };
435
436                // Build full path with query string for signing
437                let full_path = if query_string.is_empty() {
438                    path.to_string()
439                } else {
440                    format!("{path}?{query_string}")
441                };
442
443                let mut headers = if authenticate {
444                    self.sign_request(&method, &full_path, body.as_deref())?
445                } else {
446                    HashMap::new()
447                };
448
449                // Always set Content-Type header when body is present
450                if body.is_some() {
451                    headers.insert("Content-Type".to_string(), "application/json".to_string());
452                }
453
454                let resp = self
455                    .client
456                    .request_with_params(
457                        method.clone(),
458                        url,
459                        params,
460                        Some(headers),
461                        body,
462                        None,
463                        Some(rate_keys),
464                    )
465                    .await?;
466
467                tracing::trace!("Response: {resp:?}");
468
469                if resp.status.is_success() {
470                    let okx_response: OKXResponse<T> =
471                        serde_json::from_slice(&resp.body).map_err(|e| {
472                            tracing::error!("Failed to deserialize OKXResponse: {e}");
473                            OKXHttpError::JsonError(e.to_string())
474                        })?;
475
476                    if okx_response.code != OKX_SUCCESS_CODE {
477                        return Err(OKXHttpError::OkxError {
478                            error_code: okx_response.code,
479                            message: okx_response.msg,
480                        });
481                    }
482
483                    Ok(okx_response.data)
484                } else {
485                    let error_body = String::from_utf8_lossy(&resp.body);
486                    if resp.status.as_u16() == StatusCode::NOT_FOUND.as_u16() {
487                        tracing::debug!("HTTP 404 with body: {error_body}");
488                    } else {
489                        tracing::error!(
490                            "HTTP error {} with body: {error_body}",
491                            resp.status.as_str()
492                        );
493                    }
494
495                    if let Ok(parsed_error) = serde_json::from_slice::<OKXResponse<T>>(&resp.body) {
496                        return Err(OKXHttpError::OkxError {
497                            error_code: parsed_error.code,
498                            message: parsed_error.msg,
499                        });
500                    }
501
502                    Err(OKXHttpError::UnexpectedStatus {
503                        status: StatusCode::from_u16(resp.status.as_u16()).unwrap(),
504                        body: error_body.to_string(),
505                    })
506                }
507            }
508        };
509
510        // Retry strategy based on OKX error responses and HTTP status codes:
511        //
512        // 1. Network errors: always retry (transient connection issues)
513        // 2. HTTP 5xx/429: server errors and rate limiting should be retried
514        // 3. OKX specific retryable error codes (defined in common::consts)
515        //
516        // Note: OKX returns many permanent errors which should NOT be retried
517        // (e.g., "Invalid instrument", "Insufficient balance", "Invalid API Key")
518        let should_retry = |error: &OKXHttpError| -> bool {
519            match error {
520                OKXHttpError::HttpClientError(_) => true,
521                OKXHttpError::UnexpectedStatus { status, .. } => {
522                    status.as_u16() >= 500 || status.as_u16() == 429
523                }
524                OKXHttpError::OkxError { error_code, .. } => should_retry_error_code(error_code),
525                _ => false,
526            }
527        };
528
529        let create_error = |msg: String| -> OKXHttpError {
530            if msg == "canceled" {
531                OKXHttpError::Canceled("Adapter disconnecting or shutting down".to_string())
532            } else {
533                OKXHttpError::ValidationError(msg)
534            }
535        };
536
537        self.retry_manager
538            .execute_with_retry_with_cancel(
539                path,
540                operation,
541                should_retry,
542                create_error,
543                &self.cancellation_token,
544            )
545            .await
546    }
547
548    /// Sets the position mode for an account.
549    ///
550    /// # Errors
551    ///
552    /// Returns an error if JSON serialization of `params` fails, if the HTTP
553    /// request fails, or if the response body cannot be deserialized.
554    ///
555    /// # References
556    ///
557    /// <https://www.okx.com/docs-v5/en/#trading-account-rest-api-set-position-mode>
558    pub async fn set_position_mode(
559        &self,
560        params: SetPositionModeParams,
561    ) -> Result<Vec<serde_json::Value>, OKXHttpError> {
562        let path = "/api/v5/account/set-position-mode";
563        let body = serde_json::to_vec(&params)?;
564        self.send_request::<_, ()>(Method::POST, path, None, Some(body), true)
565            .await
566    }
567
568    /// Requests position tiers information, maximum leverage depends on your borrowings and margin ratio.
569    ///
570    /// # Errors
571    ///
572    /// Returns an error if the HTTP request fails, authentication is rejected
573    /// or the response cannot be deserialized.
574    ///
575    /// # References
576    ///
577    /// <https://www.okx.com/docs-v5/en/#public-data-rest-api-get-position-tiers>
578    pub async fn get_position_tiers(
579        &self,
580        params: GetPositionTiersParams,
581    ) -> Result<Vec<OKXPositionTier>, OKXHttpError> {
582        self.send_request(
583            Method::GET,
584            "/api/v5/public/position-tiers",
585            Some(&params),
586            None,
587            false,
588        )
589        .await
590    }
591
592    /// Requests a list of instruments with open contracts.
593    ///
594    /// # Errors
595    ///
596    /// Returns an error if JSON serialization of `params` fails, if the HTTP
597    /// request fails, or if the response body cannot be deserialized.
598    ///
599    /// # References
600    ///
601    /// <https://www.okx.com/docs-v5/en/#public-data-rest-api-get-instruments>
602    pub async fn get_instruments(
603        &self,
604        params: GetInstrumentsParams,
605    ) -> Result<Vec<OKXInstrument>, OKXHttpError> {
606        self.send_request(
607            Method::GET,
608            "/api/v5/public/instruments",
609            Some(&params),
610            None,
611            false,
612        )
613        .await
614    }
615
616    /// Requests the current server time from OKX.
617    ///
618    /// Retrieves the OKX system time in Unix timestamp (milliseconds). This is useful for
619    /// synchronizing local clocks with the exchange server and logging time drift.
620    ///
621    /// # Errors
622    ///
623    /// Returns an error if the HTTP request fails or if the response body
624    /// cannot be parsed into [`OKXServerTime`].
625    ///
626    /// # References
627    ///
628    /// <https://www.okx.com/docs-v5/en/#public-data-rest-api-get-system-time>
629    pub async fn get_server_time(&self) -> Result<u64, OKXHttpError> {
630        let response: Vec<OKXServerTime> = self
631            .send_request::<_, ()>(Method::GET, "/api/v5/public/time", None, None, false)
632            .await?;
633        response
634            .first()
635            .map(|t| t.ts)
636            .ok_or_else(|| OKXHttpError::JsonError("Empty server time response".to_string()))
637    }
638
639    /// Requests a mark price.
640    ///
641    /// We set the mark price based on the SPOT index and at a reasonable basis to prevent individual
642    /// users from manipulating the market and causing the contract price to fluctuate.
643    ///
644    /// # Errors
645    ///
646    /// Returns an error if the HTTP request fails or if the response body
647    /// cannot be parsed into [`OKXMarkPrice`].
648    ///
649    /// # References
650    ///
651    /// <https://www.okx.com/docs-v5/en/#public-data-rest-api-get-mark-price>
652    pub async fn get_mark_price(
653        &self,
654        params: GetMarkPriceParams,
655    ) -> Result<Vec<OKXMarkPrice>, OKXHttpError> {
656        self.send_request(
657            Method::GET,
658            "/api/v5/public/mark-price",
659            Some(&params),
660            None,
661            false,
662        )
663        .await
664    }
665
666    /// Requests the latest index price.
667    ///
668    /// # Errors
669    ///
670    /// Returns an error if the operation fails.
671    ///
672    /// # References
673    ///
674    /// <https://www.okx.com/docs-v5/en/#public-data-rest-api-get-index-tickers>
675    pub async fn get_index_tickers(
676        &self,
677        params: GetIndexTickerParams,
678    ) -> Result<Vec<OKXIndexTicker>, OKXHttpError> {
679        self.send_request(
680            Method::GET,
681            "/api/v5/market/index-tickers",
682            Some(&params),
683            None,
684            false,
685        )
686        .await
687    }
688
689    /// Requests trades history.
690    ///
691    /// # Errors
692    ///
693    /// Returns an error if the operation fails.
694    ///
695    /// # References
696    ///
697    /// <https://www.okx.com/docs-v5/en/#order-book-trading-market-data-get-trades-history>
698    pub async fn get_history_trades(
699        &self,
700        params: GetTradesParams,
701    ) -> Result<Vec<OKXTrade>, OKXHttpError> {
702        self.send_request(
703            Method::GET,
704            "/api/v5/market/history-trades",
705            Some(&params),
706            None,
707            false,
708        )
709        .await
710    }
711
712    /// Requests recent candlestick data.
713    ///
714    /// # Errors
715    ///
716    /// Returns an error if the operation fails.
717    ///
718    /// # References
719    ///
720    /// <https://www.okx.com/docs-v5/en/#order-book-trading-market-data-get-candlesticks>
721    pub async fn get_candles(
722        &self,
723        params: GetCandlesticksParams,
724    ) -> Result<Vec<OKXCandlestick>, OKXHttpError> {
725        self.send_request(
726            Method::GET,
727            "/api/v5/market/candles",
728            Some(&params),
729            None,
730            false,
731        )
732        .await
733    }
734
735    /// Requests historical candlestick data.
736    ///
737    /// # Errors
738    ///
739    /// Returns an error if the operation fails.
740    ///
741    /// # References
742    ///
743    /// <https://www.okx.com/docs-v5/en/#order-book-trading-market-data-get-candlesticks-history>
744    pub async fn get_history_candles(
745        &self,
746        params: GetCandlesticksParams,
747    ) -> Result<Vec<OKXCandlestick>, OKXHttpError> {
748        self.send_request(
749            Method::GET,
750            "/api/v5/market/history-candles",
751            Some(&params),
752            None,
753            false,
754        )
755        .await
756    }
757
758    /// Requests a list of assets (with non-zero balance), remaining balance, and available amount
759    /// in the trading account.
760    ///
761    /// # Errors
762    ///
763    /// Returns an error if the operation fails.
764    ///
765    /// # References
766    ///
767    /// <https://www.okx.com/docs-v5/en/#trading-account-rest-api-get-balance>
768    pub async fn get_balance(&self) -> Result<Vec<OKXAccount>, OKXHttpError> {
769        let path = "/api/v5/account/balance";
770        self.send_request::<_, ()>(Method::GET, path, None, None, true)
771            .await
772    }
773
774    /// Requests fee rates for the account.
775    ///
776    /// Returns fee rates for the specified instrument type and the user's VIP level.
777    ///
778    /// # Errors
779    ///
780    /// Returns an error if the operation fails.
781    ///
782    /// # References
783    ///
784    /// <https://www.okx.com/docs-v5/en/#trading-account-rest-api-get-fee-rates>
785    pub async fn get_trade_fee(
786        &self,
787        params: GetTradeFeeParams,
788    ) -> Result<Vec<OKXFeeRate>, OKXHttpError> {
789        self.send_request(
790            Method::GET,
791            "/api/v5/account/trade-fee",
792            Some(&params),
793            None,
794            true,
795        )
796        .await
797    }
798
799    /// Retrieves a single order’s details.
800    ///
801    /// # Errors
802    ///
803    /// Returns an error if the operation fails.
804    ///
805    /// # References
806    ///
807    /// <https://www.okx.com/docs-v5/en/#order-book-trading-trade-get-order>
808    pub async fn get_order(
809        &self,
810        params: GetOrderParams,
811    ) -> Result<Vec<OKXOrderHistory>, OKXHttpError> {
812        self.send_request(
813            Method::GET,
814            "/api/v5/trade/order",
815            Some(&params),
816            None,
817            true,
818        )
819        .await
820    }
821
822    /// Requests order list (pending orders).
823    ///
824    /// # Errors
825    ///
826    /// Returns an error if the operation fails.
827    ///
828    /// # References
829    ///
830    /// <https://www.okx.com/docs-v5/en/#order-book-trading-trade-get-order-list>
831    pub async fn get_orders_pending(
832        &self,
833        params: GetOrderListParams,
834    ) -> Result<Vec<OKXOrderHistory>, OKXHttpError> {
835        self.send_request(
836            Method::GET,
837            "/api/v5/trade/orders-pending",
838            Some(&params),
839            None,
840            true,
841        )
842        .await
843    }
844
845    /// Requests historical order records.
846    ///
847    /// # Errors
848    ///
849    /// Returns an error if the operation fails.
850    ///
851    /// # References
852    ///
853    /// <https://www.okx.com/docs-v5/en/#order-book-trading-trade-get-orders-history>
854    pub async fn get_orders_history(
855        &self,
856        params: GetOrderHistoryParams,
857    ) -> Result<Vec<OKXOrderHistory>, OKXHttpError> {
858        self.send_request(
859            Method::GET,
860            "/api/v5/trade/orders-history",
861            Some(&params),
862            None,
863            true,
864        )
865        .await
866    }
867
868    /// Requests pending algo orders.
869    ///
870    /// # Errors
871    ///
872    /// Returns an error if the operation fails.
873    pub async fn get_order_algo_pending(
874        &self,
875        params: GetAlgoOrdersParams,
876    ) -> Result<Vec<OKXOrderAlgo>, OKXHttpError> {
877        self.send_request(
878            Method::GET,
879            "/api/v5/trade/order-algo-pending",
880            Some(&params),
881            None,
882            true,
883        )
884        .await
885    }
886
887    /// Requests historical algo orders.
888    ///
889    /// # Errors
890    ///
891    /// Returns an error if the operation fails.
892    pub async fn get_order_algo_history(
893        &self,
894        params: GetAlgoOrdersParams,
895    ) -> Result<Vec<OKXOrderAlgo>, OKXHttpError> {
896        self.send_request(
897            Method::GET,
898            "/api/v5/trade/order-algo-history",
899            Some(&params),
900            None,
901            true,
902        )
903        .await
904    }
905
906    /// Requests transaction details (fills) for the given parameters.
907    ///
908    /// # Errors
909    ///
910    /// Returns an error if the operation fails.
911    ///
912    /// # References
913    ///
914    /// <https://www.okx.com/docs-v5/en/#order-book-trading-trade-get-transaction-details-last-3-days>
915    pub async fn get_fills(
916        &self,
917        params: GetTransactionDetailsParams,
918    ) -> Result<Vec<OKXTransactionDetail>, OKXHttpError> {
919        self.send_request(
920            Method::GET,
921            "/api/v5/trade/fills",
922            Some(&params),
923            None,
924            true,
925        )
926        .await
927    }
928
929    /// Requests information on your positions. When the account is in net mode, net positions will
930    /// be displayed, and when the account is in long/short mode, long or short positions will be
931    /// displayed. Returns in reverse chronological order using ctime.
932    ///
933    /// # Errors
934    ///
935    /// Returns an error if the operation fails.
936    ///
937    /// # References
938    ///
939    /// <https://www.okx.com/docs-v5/en/#trading-account-rest-api-get-positions>
940    pub async fn get_positions(
941        &self,
942        params: GetPositionsParams,
943    ) -> Result<Vec<OKXPosition>, OKXHttpError> {
944        self.send_request(
945            Method::GET,
946            "/api/v5/account/positions",
947            Some(&params),
948            None,
949            true,
950        )
951        .await
952    }
953
954    /// Requests closed or historical position data.
955    ///
956    /// # Errors
957    ///
958    /// Returns an error if the operation fails.
959    ///
960    /// # References
961    ///
962    /// <https://www.okx.com/docs-v5/en/#trading-account-rest-api-get-positions-history>
963    pub async fn get_positions_history(
964        &self,
965        params: GetPositionsHistoryParams,
966    ) -> Result<Vec<OKXPositionHistory>, OKXHttpError> {
967        self.send_request(
968            Method::GET,
969            "/api/v5/account/positions-history",
970            Some(&params),
971            None,
972            true,
973        )
974        .await
975    }
976}
977
978/// Provides a higher-level HTTP client for the [OKX](https://okx.com) REST API.
979///
980/// This client wraps the underlying `OKXHttpInnerClient` to handle conversions
981/// into the Nautilus domain model.
982#[derive(Debug)]
983#[cfg_attr(
984    feature = "python",
985    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.adapters")
986)]
987pub struct OKXHttpClient {
988    pub(crate) inner: Arc<OKXRawHttpClient>,
989    pub(crate) instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
990    cache_initialized: AtomicBool,
991}
992
993impl Clone for OKXHttpClient {
994    fn clone(&self) -> Self {
995        let cache_initialized = AtomicBool::new(false);
996
997        let is_initialized = self.cache_initialized.load(Ordering::Acquire);
998        if is_initialized {
999            cache_initialized.store(true, Ordering::Release);
1000        }
1001
1002        Self {
1003            inner: self.inner.clone(),
1004            instruments_cache: self.instruments_cache.clone(),
1005            cache_initialized,
1006        }
1007    }
1008}
1009
1010impl Default for OKXHttpClient {
1011    fn default() -> Self {
1012        Self::new(None, Some(60), None, None, None, false, None)
1013            .expect("Failed to create default OKXHttpClient")
1014    }
1015}
1016
1017impl OKXHttpClient {
1018    /// Creates a new [`OKXHttpClient`] using the default OKX HTTP URL,
1019    /// optionally overridden with a custom base url.
1020    ///
1021    /// This version of the client has **no credentials**, so it can only
1022    /// call publicly accessible endpoints.
1023    ///
1024    /// # Errors
1025    ///
1026    /// Returns an error if the retry manager cannot be created.
1027    pub fn new(
1028        base_url: Option<String>,
1029        timeout_secs: Option<u64>,
1030        max_retries: Option<u32>,
1031        retry_delay_ms: Option<u64>,
1032        retry_delay_max_ms: Option<u64>,
1033        is_demo: bool,
1034        proxy_url: Option<String>,
1035    ) -> anyhow::Result<Self> {
1036        Ok(Self {
1037            inner: Arc::new(OKXRawHttpClient::new(
1038                base_url,
1039                timeout_secs,
1040                max_retries,
1041                retry_delay_ms,
1042                retry_delay_max_ms,
1043                is_demo,
1044                proxy_url,
1045            )?),
1046            instruments_cache: Arc::new(DashMap::new()),
1047            cache_initialized: AtomicBool::new(false),
1048        })
1049    }
1050
1051    /// Generates a timestamp for initialization.
1052    fn generate_ts_init(&self) -> UnixNanos {
1053        get_atomic_clock_realtime().get_time_ns()
1054    }
1055
1056    /// Creates a new authenticated [`OKXHttpClient`] using environment variables and
1057    /// the default OKX HTTP base url.
1058    ///
1059    /// # Errors
1060    ///
1061    /// Returns an error if the operation fails.
1062    pub fn from_env() -> anyhow::Result<Self> {
1063        Self::with_credentials(None, None, None, None, None, None, None, None, false, None)
1064    }
1065
1066    /// Creates a new [`OKXHttpClient`] configured with credentials
1067    /// for authenticated requests, optionally using a custom base url.
1068    ///
1069    /// # Errors
1070    ///
1071    /// Returns an error if the operation fails.
1072    #[allow(clippy::too_many_arguments)]
1073    pub fn with_credentials(
1074        api_key: Option<String>,
1075        api_secret: Option<String>,
1076        api_passphrase: Option<String>,
1077        base_url: Option<String>,
1078        timeout_secs: Option<u64>,
1079        max_retries: Option<u32>,
1080        retry_delay_ms: Option<u64>,
1081        retry_delay_max_ms: Option<u64>,
1082        is_demo: bool,
1083        proxy_url: Option<String>,
1084    ) -> anyhow::Result<Self> {
1085        let api_key = get_or_env_var(api_key, "OKX_API_KEY")?;
1086        let api_secret = get_or_env_var(api_secret, "OKX_API_SECRET")?;
1087        let api_passphrase = get_or_env_var(api_passphrase, "OKX_API_PASSPHRASE")?;
1088        let base_url = base_url.unwrap_or(OKX_HTTP_URL.to_string());
1089
1090        Ok(Self {
1091            inner: Arc::new(OKXRawHttpClient::with_credentials(
1092                api_key,
1093                api_secret,
1094                api_passphrase,
1095                base_url,
1096                timeout_secs,
1097                max_retries,
1098                retry_delay_ms,
1099                retry_delay_max_ms,
1100                is_demo,
1101                proxy_url,
1102            )?),
1103            instruments_cache: Arc::new(DashMap::new()),
1104            cache_initialized: AtomicBool::new(false),
1105        })
1106    }
1107
1108    /// Retrieves an instrument from the cache.
1109    ///
1110    /// # Errors
1111    ///
1112    /// Returns an error if the instrument is not found in the cache.
1113    fn instrument_from_cache(&self, symbol: Ustr) -> anyhow::Result<InstrumentAny> {
1114        self.instruments_cache
1115            .get(&symbol)
1116            .map(|entry| entry.value().clone())
1117            .ok_or_else(|| anyhow::anyhow!("Instrument {symbol} not in cache"))
1118    }
1119
1120    /// Cancel all pending HTTP requests.
1121    pub fn cancel_all_requests(&self) {
1122        self.inner.cancel_all_requests();
1123    }
1124
1125    /// Get the cancellation token for this client.
1126    pub fn cancellation_token(&self) -> &CancellationToken {
1127        self.inner.cancellation_token()
1128    }
1129
1130    /// Returns the base url being used by the client.
1131    pub fn base_url(&self) -> &str {
1132        self.inner.base_url.as_str()
1133    }
1134
1135    /// Returns the public API key being used by the client.
1136    pub fn api_key(&self) -> Option<&str> {
1137        self.inner.credential.as_ref().map(|c| c.api_key.as_str())
1138    }
1139
1140    /// Returns a masked version of the API key for logging purposes.
1141    #[must_use]
1142    pub fn api_key_masked(&self) -> Option<String> {
1143        self.inner.credential.as_ref().map(|c| c.api_key_masked())
1144    }
1145
1146    /// Returns whether the client is configured for demo trading.
1147    #[must_use]
1148    pub fn is_demo(&self) -> bool {
1149        self.inner.is_demo
1150    }
1151
1152    /// Requests the current server time from OKX.
1153    ///
1154    /// Returns the OKX system time as a Unix timestamp in milliseconds.
1155    ///
1156    /// # Errors
1157    ///
1158    /// Returns an error if the HTTP request fails or if the response cannot be parsed.
1159    pub async fn get_server_time(&self) -> Result<u64, OKXHttpError> {
1160        self.inner.get_server_time().await
1161    }
1162
1163    /// Checks if the client is initialized.
1164    ///
1165    /// The client is considered initialized if any instruments have been cached from the venue.
1166    #[must_use]
1167    pub fn is_initialized(&self) -> bool {
1168        self.cache_initialized.load(Ordering::Acquire)
1169    }
1170
1171    /// Returns a snapshot of all instrument symbols currently held in the
1172    /// internal cache.
1173    #[must_use]
1174    pub fn get_cached_symbols(&self) -> Vec<String> {
1175        self.instruments_cache
1176            .iter()
1177            .map(|entry| entry.key().to_string())
1178            .collect()
1179    }
1180
1181    /// Caches multiple instruments.
1182    ///
1183    /// Any existing instruments with the same symbols will be replaced.
1184    pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
1185        for inst in instruments {
1186            self.instruments_cache
1187                .insert(inst.raw_symbol().inner(), inst);
1188        }
1189        self.cache_initialized.store(true, Ordering::Release);
1190    }
1191
1192    /// Caches a single instrument.
1193    ///
1194    /// Any existing instrument with the same symbol will be replaced.
1195    pub fn cache_instrument(&self, instrument: InstrumentAny) {
1196        self.instruments_cache
1197            .insert(instrument.raw_symbol().inner(), instrument);
1198        self.cache_initialized.store(true, Ordering::Release);
1199    }
1200
1201    /// Gets an instrument from the cache by symbol.
1202    pub fn get_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
1203        self.instruments_cache
1204            .get(symbol)
1205            .map(|entry| entry.value().clone())
1206    }
1207
1208    /// Requests the account state for the `account_id` from OKX.
1209    ///
1210    /// # Errors
1211    ///
1212    /// Returns an error if the HTTP request fails or no account state is returned.
1213    pub async fn request_account_state(
1214        &self,
1215        account_id: AccountId,
1216    ) -> anyhow::Result<AccountState> {
1217        let resp = self
1218            .inner
1219            .get_balance()
1220            .await
1221            .map_err(|e| anyhow::anyhow!(e))?;
1222
1223        let ts_init = self.generate_ts_init();
1224        let raw = resp
1225            .first()
1226            .ok_or_else(|| anyhow::anyhow!("No account state returned from OKX"))?;
1227        let account_state = parse_account_state(raw, account_id, ts_init)?;
1228
1229        Ok(account_state)
1230    }
1231
1232    /// Sets the position mode for the account.
1233    ///
1234    /// Defaults to NetMode if no position mode is provided.
1235    ///
1236    /// # Errors
1237    ///
1238    /// Returns an error if the HTTP request fails or the position mode cannot be set.
1239    ///
1240    /// # Note
1241    ///
1242    /// This endpoint only works for accounts with derivatives trading enabled.
1243    /// If the account only has spot trading, this will return an error.
1244    pub async fn set_position_mode(&self, position_mode: OKXPositionMode) -> anyhow::Result<()> {
1245        let mut params = SetPositionModeParamsBuilder::default();
1246        params.pos_mode(position_mode);
1247        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1248
1249        match self.inner.set_position_mode(params).await {
1250            Ok(_) => Ok(()),
1251            Err(e) => {
1252                if let OKXHttpError::OkxError {
1253                    error_code,
1254                    message,
1255                } = &e
1256                    && error_code == "50115"
1257                {
1258                    tracing::warn!(
1259                        "Account does not support position mode setting (derivatives trading not enabled): {message}"
1260                    );
1261                    return Ok(()); // Gracefully handle this case
1262                }
1263                anyhow::bail!(e)
1264            }
1265        }
1266    }
1267
1268    /// Requests all instruments for the `instrument_type` from OKX.
1269    ///
1270    /// # Errors
1271    ///
1272    /// Returns an error if the HTTP request fails or instrument parsing fails.
1273    pub async fn request_instruments(
1274        &self,
1275        instrument_type: OKXInstrumentType,
1276        instrument_family: Option<String>,
1277    ) -> anyhow::Result<Vec<InstrumentAny>> {
1278        let mut params = GetInstrumentsParamsBuilder::default();
1279        params.inst_type(instrument_type);
1280
1281        if let Some(family) = instrument_family.clone() {
1282            params.inst_family(family);
1283        }
1284
1285        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1286
1287        let resp = self
1288            .inner
1289            .get_instruments(params)
1290            .await
1291            .map_err(|e| anyhow::anyhow!(e))?;
1292
1293        let fee_rate_opt = {
1294            let fee_params = GetTradeFeeParams {
1295                inst_type: instrument_type,
1296                uly: None,
1297                inst_family: instrument_family,
1298            };
1299
1300            match self.inner.get_trade_fee(fee_params).await {
1301                Ok(rates) => rates.into_iter().next(),
1302                Err(OKXHttpError::MissingCredentials) => {
1303                    tracing::debug!("Missing credentials for fee rates, using None");
1304                    None
1305                }
1306                Err(e) => {
1307                    tracing::warn!("Failed to fetch fee rates for {instrument_type}: {e}");
1308                    None
1309                }
1310            }
1311        };
1312
1313        let ts_init = self.generate_ts_init();
1314
1315        let mut instruments: Vec<InstrumentAny> = Vec::new();
1316        for inst in &resp {
1317            // Skip pre-open instruments which have incomplete/empty field values
1318            // Keep suspended instruments as they have valid metadata and may return to live
1319            if inst.state == OKXInstrumentStatus::Preopen {
1320                continue;
1321            }
1322
1323            // Determine which fee fields to use based on contract type
1324            // OKX fee rate convention: positive = rebate, negative = commission
1325            // Nautilus convention: negative = rebate, positive = commission
1326            // Negate to convert between conventions
1327            let (maker_fee, taker_fee) = if let Some(ref fee_rate) = fee_rate_opt {
1328                let is_usdt_margined = inst.ct_type == OKXContractType::Linear;
1329                let (maker_str, taker_str) = if is_usdt_margined {
1330                    (&fee_rate.maker_u, &fee_rate.taker_u)
1331                } else {
1332                    (&fee_rate.maker, &fee_rate.taker)
1333                };
1334
1335                let maker = if !maker_str.is_empty() {
1336                    Decimal::from_str(maker_str).ok().map(|v| -v)
1337                } else {
1338                    None
1339                };
1340                let taker = if !taker_str.is_empty() {
1341                    Decimal::from_str(taker_str).ok().map(|v| -v)
1342                } else {
1343                    None
1344                };
1345
1346                (maker, taker)
1347            } else {
1348                (None, None)
1349            };
1350
1351            match parse_instrument_any(inst, None, None, maker_fee, taker_fee, ts_init) {
1352                Ok(Some(instrument_any)) => {
1353                    instruments.push(instrument_any);
1354                }
1355                Ok(None) => {
1356                    // Unsupported instrument type, skip silently
1357                }
1358                Err(e) => {
1359                    tracing::warn!("Failed to parse instrument {}: {e}", inst.inst_id);
1360                }
1361            }
1362        }
1363
1364        Ok(instruments)
1365    }
1366
1367    /// Requests a single instrument by `instrument_id` from OKX.
1368    ///
1369    /// Fetches the instrument from the API, caches it, and returns it.
1370    ///
1371    /// # Errors
1372    ///
1373    /// This function will return an error if:
1374    /// - The API request fails.
1375    /// - The instrument is not found.
1376    /// - Failed to parse instrument data.
1377    pub async fn request_instrument(
1378        &self,
1379        instrument_id: InstrumentId,
1380    ) -> anyhow::Result<InstrumentAny> {
1381        let symbol = instrument_id.symbol.as_str();
1382        let instrument_type = okx_instrument_type_from_symbol(symbol);
1383
1384        let mut params = GetInstrumentsParamsBuilder::default();
1385        params.inst_type(instrument_type);
1386        params.inst_id(symbol);
1387
1388        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1389
1390        let resp = self
1391            .inner
1392            .get_instruments(params)
1393            .await
1394            .map_err(|e| anyhow::anyhow!(e))?;
1395
1396        let raw_inst = resp
1397            .first()
1398            .ok_or_else(|| anyhow::anyhow!("Instrument {symbol} not found"))?;
1399
1400        // Skip pre-open instruments which have incomplete/empty field values
1401        if raw_inst.state == OKXInstrumentStatus::Preopen {
1402            anyhow::bail!("Instrument {symbol} is in pre-open state");
1403        }
1404
1405        let fee_rate_opt = {
1406            let fee_params = GetTradeFeeParams {
1407                inst_type: instrument_type,
1408                uly: None,
1409                inst_family: None,
1410            };
1411
1412            match self.inner.get_trade_fee(fee_params).await {
1413                Ok(rates) => rates.into_iter().next(),
1414                Err(OKXHttpError::MissingCredentials) => {
1415                    tracing::debug!("Missing credentials for fee rates, using None");
1416                    None
1417                }
1418                Err(e) => {
1419                    tracing::warn!("Failed to fetch fee rates for {symbol}: {e}");
1420                    None
1421                }
1422            }
1423        };
1424
1425        // OKX fee rate convention: positive = rebate, negative = commission
1426        // Nautilus convention: negative = rebate, positive = commission
1427        // Negate to convert between conventions
1428        let (maker_fee, taker_fee) = if let Some(ref fee_rate) = fee_rate_opt {
1429            let is_usdt_margined = raw_inst.ct_type == OKXContractType::Linear;
1430            let (maker_str, taker_str) = if is_usdt_margined {
1431                (&fee_rate.maker_u, &fee_rate.taker_u)
1432            } else {
1433                (&fee_rate.maker, &fee_rate.taker)
1434            };
1435
1436            let maker = if !maker_str.is_empty() {
1437                Decimal::from_str(maker_str).ok().map(|v| -v)
1438            } else {
1439                None
1440            };
1441            let taker = if !taker_str.is_empty() {
1442                Decimal::from_str(taker_str).ok().map(|v| -v)
1443            } else {
1444                None
1445            };
1446
1447            (maker, taker)
1448        } else {
1449            (None, None)
1450        };
1451
1452        let ts_init = self.generate_ts_init();
1453        let instrument = parse_instrument_any(raw_inst, None, None, maker_fee, taker_fee, ts_init)?
1454            .ok_or_else(|| anyhow::anyhow!("Unsupported instrument type for {symbol}"))?;
1455
1456        self.cache_instrument(instrument.clone());
1457
1458        Ok(instrument)
1459    }
1460
1461    /// Requests the latest mark price for the `instrument_type` from OKX.
1462    ///
1463    /// # Errors
1464    ///
1465    /// Returns an error if the HTTP request fails or no mark price is returned.
1466    pub async fn request_mark_price(
1467        &self,
1468        instrument_id: InstrumentId,
1469    ) -> anyhow::Result<MarkPriceUpdate> {
1470        let mut params = GetMarkPriceParamsBuilder::default();
1471        params.inst_id(instrument_id.symbol.inner());
1472        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1473
1474        let resp = self
1475            .inner
1476            .get_mark_price(params)
1477            .await
1478            .map_err(|e| anyhow::anyhow!(e))?;
1479
1480        let raw = resp
1481            .first()
1482            .ok_or_else(|| anyhow::anyhow!("No mark price returned from OKX"))?;
1483        let inst = self.instrument_from_cache(instrument_id.symbol.inner())?;
1484        let ts_init = self.generate_ts_init();
1485
1486        let mark_price =
1487            parse_mark_price_update(raw, instrument_id, inst.price_precision(), ts_init)
1488                .map_err(|e| anyhow::anyhow!(e))?;
1489        Ok(mark_price)
1490    }
1491
1492    /// Requests the latest index price for the `instrument_id` from OKX.
1493    ///
1494    /// # Errors
1495    ///
1496    /// Returns an error if the HTTP request fails or no index price is returned.
1497    pub async fn request_index_price(
1498        &self,
1499        instrument_id: InstrumentId,
1500    ) -> anyhow::Result<IndexPriceUpdate> {
1501        let mut params = GetIndexTickerParamsBuilder::default();
1502        params.inst_id(instrument_id.symbol.inner());
1503        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1504
1505        let resp = self
1506            .inner
1507            .get_index_tickers(params)
1508            .await
1509            .map_err(|e| anyhow::anyhow!(e))?;
1510
1511        let raw = resp
1512            .first()
1513            .ok_or_else(|| anyhow::anyhow!("No index price returned from OKX"))?;
1514        let inst = self.instrument_from_cache(instrument_id.symbol.inner())?;
1515        let ts_init = self.generate_ts_init();
1516
1517        let index_price =
1518            parse_index_price_update(raw, instrument_id, inst.price_precision(), ts_init)
1519                .map_err(|e| anyhow::anyhow!(e))?;
1520        Ok(index_price)
1521    }
1522
1523    /// Requests trades for the `instrument_id` and `start` -> `end` time range.
1524    ///
1525    /// # Errors
1526    ///
1527    /// Returns an error if the HTTP request fails or trade parsing fails.
1528    ///
1529    /// # Panics
1530    ///
1531    /// Panics if the API returns an empty response when debug logging is enabled.
1532    pub async fn request_trades(
1533        &self,
1534        instrument_id: InstrumentId,
1535        start: Option<DateTime<Utc>>,
1536        end: Option<DateTime<Utc>>,
1537        limit: Option<u32>,
1538    ) -> anyhow::Result<Vec<TradeTick>> {
1539        const OKX_TRADES_MAX_LIMIT: u32 = 100;
1540
1541        let limit = if limit == Some(0) { None } else { limit };
1542
1543        if let (Some(s), Some(e)) = (start, end) {
1544            anyhow::ensure!(s < e, "Invalid time range: start={s:?} end={e:?}");
1545        }
1546
1547        let now = Utc::now();
1548
1549        if let Some(s) = start
1550            && s > now
1551        {
1552            return Ok(Vec::new());
1553        }
1554
1555        let end = if let Some(e) = end
1556            && e > now
1557        {
1558            Some(now)
1559        } else {
1560            end
1561        };
1562
1563        #[derive(Clone, Copy, Debug, PartialEq, Eq)]
1564        enum Mode {
1565            Latest,
1566            Backward,
1567            Range,
1568        }
1569
1570        let mode = match (start, end) {
1571            (None, None) => Mode::Latest,
1572            (Some(_), None) => Mode::Backward,
1573            (None, Some(_)) => Mode::Backward,
1574            (Some(_), Some(_)) => Mode::Range,
1575        };
1576
1577        let start_ms = start.map(|s| s.timestamp_millis());
1578        let end_ms = end.map(|e| e.timestamp_millis());
1579
1580        let ts_init = self.generate_ts_init();
1581        let inst = self.instrument_from_cache(instrument_id.symbol.inner())?;
1582
1583        // Historical pagination walks backwards using trade IDs, OKX does not honour timestamps for
1584        // standalone `before` requests (type=2)
1585        if matches!(mode, Mode::Backward | Mode::Range) {
1586            let mut before_trade_id: Option<String> = None;
1587            let mut pages = 0usize;
1588            let mut page_results: Vec<Vec<TradeTick>> = Vec::new();
1589            let mut seen_trades: std::collections::HashSet<(String, i64)> =
1590                std::collections::HashSet::new();
1591            let mut unique_count = 0usize;
1592            let mut consecutive_empty_pages = 0usize;
1593            const MAX_PAGES: usize = 500;
1594            const MAX_CONSECUTIVE_EMPTY: usize = 3;
1595
1596            // Only apply default limit when there's no start boundary
1597            // (start provides a natural stopping point, end alone allows infinite backward pagination)
1598            let effective_limit = if start.is_some() {
1599                limit.unwrap_or(u32::MAX)
1600            } else {
1601                limit.unwrap_or(OKX_TRADES_MAX_LIMIT)
1602            };
1603
1604            tracing::debug!(
1605                "Starting trades pagination: mode={:?}, start={:?}, end={:?}, limit={:?}, effective_limit={}",
1606                mode,
1607                start,
1608                end,
1609                limit,
1610                effective_limit
1611            );
1612
1613            loop {
1614                if pages >= MAX_PAGES {
1615                    tracing::warn!("Hit MAX_PAGES limit of {}", MAX_PAGES);
1616                    break;
1617                }
1618
1619                if effective_limit < u32::MAX && unique_count >= effective_limit as usize {
1620                    tracing::debug!("Reached effective limit: unique_count={}", unique_count);
1621                    break;
1622                }
1623
1624                let remaining = (effective_limit as usize).saturating_sub(unique_count);
1625                let page_cap = remaining.min(OKX_TRADES_MAX_LIMIT as usize) as u32;
1626
1627                tracing::debug!(
1628                    "Requesting page {}: before_id={:?}, page_cap={}, unique_count={}",
1629                    pages + 1,
1630                    before_trade_id,
1631                    page_cap,
1632                    unique_count
1633                );
1634
1635                let mut params_builder = GetTradesParamsBuilder::default();
1636                params_builder
1637                    .inst_id(instrument_id.symbol.inner())
1638                    .limit(page_cap)
1639                    .pagination_type(1);
1640
1641                // Use 'after' to get older trades (OKX API: after=cursor means < cursor)
1642                if let Some(ref before_id) = before_trade_id {
1643                    params_builder.after(before_id.clone());
1644                }
1645
1646                let params = params_builder.build().map_err(anyhow::Error::new)?;
1647                let raw = self
1648                    .inner
1649                    .get_history_trades(params)
1650                    .await
1651                    .map_err(anyhow::Error::new)?;
1652
1653                tracing::debug!("Received {} raw trades from API", raw.len());
1654
1655                if !raw.is_empty() {
1656                    let first_id = &raw.first().unwrap().trade_id;
1657                    let last_id = &raw.last().unwrap().trade_id;
1658                    tracing::debug!(
1659                        "Raw response trade ID range: first={} (newest), last={} (oldest)",
1660                        first_id,
1661                        last_id
1662                    );
1663                }
1664
1665                if raw.is_empty() {
1666                    tracing::debug!("API returned empty page, stopping pagination");
1667                    break;
1668                }
1669
1670                pages += 1;
1671
1672                let mut page_trades: Vec<TradeTick> = Vec::with_capacity(raw.len());
1673                let mut hit_start_boundary = false;
1674                let mut filtered_out = 0usize;
1675                let mut duplicates = 0usize;
1676
1677                for r in &raw {
1678                    match parse_trade_tick(
1679                        r,
1680                        instrument_id,
1681                        inst.price_precision(),
1682                        inst.size_precision(),
1683                        ts_init,
1684                    ) {
1685                        Ok(trade) => {
1686                            let ts_ms = trade.ts_event.as_i64() / 1_000_000;
1687
1688                            if let Some(e_ms) = end_ms
1689                                && ts_ms > e_ms
1690                            {
1691                                filtered_out += 1;
1692                                continue;
1693                            }
1694
1695                            if let Some(s_ms) = start_ms
1696                                && ts_ms < s_ms
1697                            {
1698                                hit_start_boundary = true;
1699                                filtered_out += 1;
1700                                break;
1701                            }
1702
1703                            let trade_key = (trade.trade_id.to_string(), trade.ts_event.as_i64());
1704                            if seen_trades.insert(trade_key) {
1705                                unique_count += 1;
1706                                page_trades.push(trade);
1707                            } else {
1708                                duplicates += 1;
1709                            }
1710                        }
1711                        Err(e) => tracing::error!("{e}"),
1712                    }
1713                }
1714
1715                tracing::debug!(
1716                    "Page {} processed: {} trades kept, {} filtered out, {} duplicates, hit_start_boundary={}",
1717                    pages,
1718                    page_trades.len(),
1719                    filtered_out,
1720                    duplicates,
1721                    hit_start_boundary
1722                );
1723
1724                // Extract oldest unique trade ID for next page cursor
1725                let oldest_trade_id = if !page_trades.is_empty() {
1726                    // Use oldest deduplicated trade ID before reversing
1727                    let oldest_id = page_trades.last().map(|t| {
1728                        let id = t.trade_id.to_string();
1729                        tracing::debug!(
1730                            "Setting cursor from deduplicated trades: oldest_id={}, ts_event={}",
1731                            id,
1732                            t.ts_event.as_i64()
1733                        );
1734                        id
1735                    });
1736                    page_trades.reverse();
1737                    page_results.push(page_trades);
1738                    consecutive_empty_pages = 0;
1739                    oldest_id
1740                } else {
1741                    // Only apply consecutive empty guard if we've already collected some trades
1742                    // This allows historical backfills to paginate through empty prelude
1743                    if unique_count > 0 {
1744                        consecutive_empty_pages += 1;
1745                        if consecutive_empty_pages >= MAX_CONSECUTIVE_EMPTY {
1746                            tracing::debug!(
1747                                "Stopping: {} consecutive pages with no trades in range after collecting {} trades",
1748                                consecutive_empty_pages,
1749                                unique_count
1750                            );
1751                            break;
1752                        }
1753                    }
1754                    // No unique trades on page, use raw response for cursor
1755                    raw.last().map(|t| {
1756                        let id = t.trade_id.to_string();
1757                        tracing::debug!(
1758                            "Setting cursor from raw response (no unique trades): oldest_id={}",
1759                            id
1760                        );
1761                        id
1762                    })
1763                };
1764
1765                if let Some(ref old_id) = before_trade_id
1766                    && oldest_trade_id.as_ref() == Some(old_id)
1767                {
1768                    break;
1769                }
1770
1771                if oldest_trade_id.is_none() {
1772                    break;
1773                }
1774
1775                before_trade_id = oldest_trade_id;
1776
1777                if hit_start_boundary {
1778                    break;
1779                }
1780
1781                tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
1782            }
1783
1784            tracing::debug!(
1785                "Pagination complete: {} pages, {} unique trades collected",
1786                pages,
1787                unique_count
1788            );
1789
1790            let mut out: Vec<TradeTick> = Vec::new();
1791            for page in page_results.into_iter().rev() {
1792                out.extend(page);
1793            }
1794
1795            // Deduplicate by (trade_id, ts_event) composite key
1796            let mut dedup_keys = std::collections::HashSet::new();
1797            let pre_dedup_len = out.len();
1798            out.retain(|trade| {
1799                dedup_keys.insert((trade.trade_id.to_string(), trade.ts_event.as_i64()))
1800            });
1801
1802            if out.len() < pre_dedup_len {
1803                tracing::debug!(
1804                    "Removed {} duplicate trades during final dedup",
1805                    pre_dedup_len - out.len()
1806                );
1807            }
1808
1809            if let Some(lim) = limit
1810                && lim > 0
1811                && out.len() > lim as usize
1812            {
1813                let excess = out.len() - lim as usize;
1814                tracing::debug!("Trimming {} oldest trades to respect limit={}", excess, lim);
1815                out.drain(0..excess);
1816            }
1817
1818            tracing::debug!("Returning {} trades", out.len());
1819            return Ok(out);
1820        }
1821
1822        let req_limit = limit
1823            .unwrap_or(OKX_TRADES_MAX_LIMIT)
1824            .min(OKX_TRADES_MAX_LIMIT);
1825        let params = GetTradesParamsBuilder::default()
1826            .inst_id(instrument_id.symbol.inner())
1827            .limit(req_limit)
1828            .build()
1829            .map_err(anyhow::Error::new)?;
1830
1831        let raw = self
1832            .inner
1833            .get_history_trades(params)
1834            .await
1835            .map_err(anyhow::Error::new)?;
1836
1837        let mut trades: Vec<TradeTick> = Vec::with_capacity(raw.len());
1838        for r in &raw {
1839            match parse_trade_tick(
1840                r,
1841                instrument_id,
1842                inst.price_precision(),
1843                inst.size_precision(),
1844                ts_init,
1845            ) {
1846                Ok(trade) => trades.push(trade),
1847                Err(e) => tracing::error!("{e}"),
1848            }
1849        }
1850
1851        // OKX returns newest-first, reverse to oldest-first
1852        trades.reverse();
1853
1854        if let Some(lim) = limit
1855            && lim > 0
1856            && trades.len() > lim as usize
1857        {
1858            trades.drain(0..trades.len() - lim as usize);
1859        }
1860
1861        Ok(trades)
1862    }
1863
1864    /// Requests historical bars for the given bar type and time range.
1865    ///
1866    /// The aggregation source must be `EXTERNAL`. Time range validation ensures start < end.
1867    /// Returns bars sorted oldest to newest.
1868    ///
1869    /// # Errors
1870    ///
1871    /// Returns an error if the request fails.
1872    ///
1873    /// # Endpoint Selection
1874    ///
1875    /// The OKX API has different endpoints with different limits:
1876    /// - Regular endpoint (`/api/v5/market/candles`): ≤ 300 rows/call, ≤ 40 req/2s
1877    ///   - Used when: start is None OR age ≤ 100 days
1878    /// - History endpoint (`/api/v5/market/history-candles`): ≤ 100 rows/call, ≤ 20 req/2s
1879    ///   - Used when: start is Some AND age > 100 days
1880    ///
1881    /// Age is calculated as `Utc::now() - start` at the time of the first request.
1882    ///
1883    /// # Supported Aggregations
1884    ///
1885    /// Maps to OKX bar query parameter:
1886    /// - `Second` → `{n}s`
1887    /// - `Minute` → `{n}m`
1888    /// - `Hour` → `{n}H`
1889    /// - `Day` → `{n}D`
1890    /// - `Week` → `{n}W`
1891    /// - `Month` → `{n}M`
1892    ///
1893    /// # Pagination
1894    ///
1895    /// - Uses `before` parameter for backwards pagination
1896    /// - Pages backwards from end time (or now) to start time
1897    /// - Stops when: limit reached, time window covered, or API returns empty
1898    /// - Rate limit safety: ≥ 50ms between requests
1899    ///
1900    /// # Panics
1901    ///
1902    /// May panic if internal data structures are in an unexpected state.
1903    ///
1904    /// # References
1905    ///
1906    /// - <https://tr.okx.com/docs-v5/en/#order-book-trading-market-data-get-candlesticks>
1907    /// - <https://tr.okx.com/docs-v5/en/#order-book-trading-market-data-get-candlesticks-history>
1908    pub async fn request_bars(
1909        &self,
1910        bar_type: BarType,
1911        start: Option<DateTime<Utc>>,
1912        mut end: Option<DateTime<Utc>>,
1913        limit: Option<u32>,
1914    ) -> anyhow::Result<Vec<Bar>> {
1915        const HISTORY_SPLIT_DAYS: i64 = 100;
1916        const MAX_PAGES_SOFT: usize = 500;
1917
1918        let limit = if limit == Some(0) { None } else { limit };
1919
1920        anyhow::ensure!(
1921            bar_type.aggregation_source() == AggregationSource::External,
1922            "Only EXTERNAL aggregation is supported"
1923        );
1924
1925        if let (Some(s), Some(e)) = (start, end) {
1926            anyhow::ensure!(s < e, "Invalid time range: start={s:?} end={e:?}");
1927        }
1928
1929        let now = Utc::now();
1930
1931        if let Some(s) = start
1932            && s > now
1933        {
1934            return Ok(Vec::new());
1935        }
1936        if let Some(e) = end
1937            && e > now
1938        {
1939            end = Some(now);
1940        }
1941
1942        let spec = bar_type.spec();
1943        let step = spec.step.get();
1944        let bar_param = match spec.aggregation {
1945            BarAggregation::Second => format!("{step}s"),
1946            BarAggregation::Minute => format!("{step}m"),
1947            BarAggregation::Hour => format!("{step}H"),
1948            BarAggregation::Day => format!("{step}D"),
1949            BarAggregation::Week => format!("{step}W"),
1950            BarAggregation::Month => format!("{step}M"),
1951            a => anyhow::bail!("OKX does not support {a:?} aggregation"),
1952        };
1953
1954        let slot_ms: i64 = match spec.aggregation {
1955            BarAggregation::Second => (step as i64) * 1_000,
1956            BarAggregation::Minute => (step as i64) * 60_000,
1957            BarAggregation::Hour => (step as i64) * 3_600_000,
1958            BarAggregation::Day => (step as i64) * 86_400_000,
1959            BarAggregation::Week => (step as i64) * 7 * 86_400_000,
1960            BarAggregation::Month => (step as i64) * 30 * 86_400_000,
1961            _ => unreachable!("Unsupported aggregation should have been caught above"),
1962        };
1963        let slot_ns: i64 = slot_ms * 1_000_000;
1964
1965        #[derive(Clone, Copy, Debug, PartialEq, Eq)]
1966        enum Mode {
1967            Latest,
1968            Backward,
1969            Range,
1970        }
1971
1972        let mode = match (start, end) {
1973            (None, None) => Mode::Latest,
1974            (Some(_), None) => Mode::Backward, // Changed: when only start is provided, work backward from now
1975            (None, Some(_)) => Mode::Backward,
1976            (Some(_), Some(_)) => Mode::Range,
1977        };
1978
1979        let start_ns = start.and_then(|s| s.timestamp_nanos_opt());
1980        let end_ns = end.and_then(|e| e.timestamp_nanos_opt());
1981
1982        // Floor start and ceiling end to bar boundaries for cleaner API requests
1983        let start_ms = start.map(|s| {
1984            let ms = s.timestamp_millis();
1985            if slot_ms > 0 {
1986                (ms / slot_ms) * slot_ms // Floor to nearest bar boundary
1987            } else {
1988                ms
1989            }
1990        });
1991        let end_ms = end.map(|e| {
1992            let ms = e.timestamp_millis();
1993            if slot_ms > 0 {
1994                ((ms + slot_ms - 1) / slot_ms) * slot_ms // Ceiling to nearest bar boundary
1995            } else {
1996                ms
1997            }
1998        });
1999        let now_ms = now.timestamp_millis();
2000
2001        let symbol = bar_type.instrument_id().symbol;
2002        let inst = self.instrument_from_cache(symbol.inner())?;
2003
2004        let mut out: Vec<Bar> = Vec::new();
2005        let mut pages = 0usize;
2006
2007        // IMPORTANT: OKX API has COUNTER-INTUITIVE semantics (same for bars and trades):
2008        // - after=X returns records with timestamp < X (upper bound, despite the name!)
2009        // - before=X returns records with timestamp > X (lower bound, despite the name!)
2010        // For Range [start, end], use: before=start (lower bound), after=end (upper bound)
2011        let mut after_ms: Option<i64> = match mode {
2012            Mode::Range => end_ms.or(Some(now_ms)), // Upper bound: bars < end
2013            _ => None,
2014        };
2015        let mut before_ms: Option<i64> = match mode {
2016            Mode::Backward => end_ms.map(|v| v.saturating_sub(1)),
2017            Mode::Range => start_ms, // Lower bound: bars > start
2018            Mode::Latest => None,
2019        };
2020
2021        // For Range mode, we'll paginate backwards like Backward mode
2022        let mut forward_prepend_mode = matches!(mode, Mode::Range);
2023
2024        // Adjust before_ms to ensure we get data from the API
2025        // OKX API might not have bars for the very recent past
2026        // This handles both explicit end=now and the actor layer setting end=now when it's None
2027        if matches!(mode, Mode::Backward | Mode::Range)
2028            && let Some(b) = before_ms
2029        {
2030            // OKX endpoints have different data availability windows:
2031            // - Regular endpoint: has most recent data but limited depth
2032            // - History endpoint: has deep history but lags behind current time
2033            // Use a small buffer to avoid the "dead zone"
2034            let buffer_ms = slot_ms.max(60_000); // At least 1 minute or 1 bar
2035            if b >= now_ms.saturating_sub(buffer_ms) {
2036                before_ms = Some(now_ms.saturating_sub(buffer_ms));
2037            }
2038        }
2039
2040        let mut have_latest_first_page = false;
2041        let mut progressless_loops = 0u8;
2042
2043        loop {
2044            if let Some(lim) = limit
2045                && lim > 0
2046                && out.len() >= lim as usize
2047            {
2048                break;
2049            }
2050            if pages >= MAX_PAGES_SOFT {
2051                break;
2052            }
2053
2054            let pivot_ms = if let Some(a) = after_ms {
2055                a
2056            } else if let Some(b) = before_ms {
2057                b
2058            } else {
2059                now_ms
2060            };
2061            // Choose endpoint based on how old the data is:
2062            // - Use regular endpoint for recent data (< 1 hour old)
2063            // - Use history endpoint for older data (> 1 hour old)
2064            // This avoids the "gap" where history endpoint has no recent data
2065            // and regular endpoint has limited depth
2066            let age_ms = now_ms.saturating_sub(pivot_ms);
2067            let age_hours = age_ms / (60 * 60 * 1000);
2068            let using_history = age_hours > 1; // Use history if data is > 1 hour old
2069
2070            let page_ceiling = if using_history { 100 } else { 300 };
2071            let remaining = limit
2072                .filter(|&l| l > 0) // Treat limit=0 as no limit
2073                .map_or(page_ceiling, |l| (l as usize).saturating_sub(out.len()));
2074            let page_cap = remaining.min(page_ceiling);
2075
2076            let mut p = GetCandlesticksParamsBuilder::default();
2077            p.inst_id(symbol.as_str())
2078                .bar(&bar_param)
2079                .limit(page_cap as u32);
2080
2081            // Track whether this planned request uses BEFORE or AFTER.
2082            let mut req_used_before = false;
2083
2084            match mode {
2085                Mode::Latest => {
2086                    if have_latest_first_page && let Some(b) = before_ms {
2087                        p.before_ms(b);
2088                        req_used_before = true;
2089                    }
2090                }
2091                Mode::Backward => {
2092                    // Use 'after' to get older bars (OKX API: after=cursor means < cursor)
2093                    if let Some(b) = before_ms {
2094                        p.after_ms(b);
2095                    }
2096                }
2097                Mode::Range => {
2098                    // For Range mode, use both after and before to specify the full range
2099                    // This is much more efficient than pagination
2100                    if let Some(a) = after_ms {
2101                        p.after_ms(a);
2102                    }
2103                    if let Some(b) = before_ms {
2104                        p.before_ms(b);
2105                        req_used_before = true;
2106                    }
2107                }
2108            }
2109
2110            let params = p.build().map_err(anyhow::Error::new)?;
2111
2112            let mut raw = if using_history {
2113                self.inner
2114                    .get_history_candles(params.clone())
2115                    .await
2116                    .map_err(anyhow::Error::new)?
2117            } else {
2118                self.inner
2119                    .get_candles(params.clone())
2120                    .await
2121                    .map_err(anyhow::Error::new)?
2122            };
2123
2124            // --- Fallbacks on empty page ---
2125            if raw.is_empty() {
2126                // LATEST: retry same cursor via history, then step back a page-interval before giving up
2127                if matches!(mode, Mode::Latest)
2128                    && have_latest_first_page
2129                    && !using_history
2130                    && let Some(b) = before_ms
2131                {
2132                    let mut p2 = GetCandlesticksParamsBuilder::default();
2133                    p2.inst_id(symbol.as_str())
2134                        .bar(&bar_param)
2135                        .limit(page_cap as u32);
2136                    p2.before_ms(b);
2137                    let params2 = p2.build().map_err(anyhow::Error::new)?;
2138                    let raw2 = self
2139                        .inner
2140                        .get_history_candles(params2)
2141                        .await
2142                        .map_err(anyhow::Error::new)?;
2143                    if !raw2.is_empty() {
2144                        raw = raw2;
2145                    } else {
2146                        // Step back one page interval and retry loop
2147                        let jump = (page_cap as i64).saturating_mul(slot_ms.max(1));
2148                        before_ms = Some(b.saturating_sub(jump));
2149                        progressless_loops = progressless_loops.saturating_add(1);
2150                        if progressless_loops >= 3 {
2151                            break;
2152                        }
2153                        continue;
2154                    }
2155                }
2156
2157                // Range mode doesn't need special bootstrap - it uses the normal flow with before_ms set
2158
2159                // If still empty: for Range after first page, try a single backstep window using BEFORE
2160                if raw.is_empty() && matches!(mode, Mode::Range) && pages > 0 {
2161                    let backstep_ms = (page_cap as i64).saturating_mul(slot_ms.max(1));
2162                    let pivot_back = after_ms.unwrap_or(now_ms).saturating_sub(backstep_ms);
2163
2164                    let mut p2 = GetCandlesticksParamsBuilder::default();
2165                    p2.inst_id(symbol.as_str())
2166                        .bar(&bar_param)
2167                        .limit(page_cap as u32)
2168                        .before_ms(pivot_back);
2169                    let params2 = p2.build().map_err(anyhow::Error::new)?;
2170                    let raw2 = if (now_ms.saturating_sub(pivot_back)) / (24 * 60 * 60 * 1000)
2171                        > HISTORY_SPLIT_DAYS
2172                    {
2173                        self.inner.get_history_candles(params2).await
2174                    } else {
2175                        self.inner.get_candles(params2).await
2176                    }
2177                    .map_err(anyhow::Error::new)?;
2178                    if raw2.is_empty() {
2179                        break;
2180                    } else {
2181                        raw = raw2;
2182                        forward_prepend_mode = true;
2183                        req_used_before = true;
2184                    }
2185                }
2186
2187                // First LATEST page empty: jump back >100d to force history, then continue loop
2188                if raw.is_empty()
2189                    && matches!(mode, Mode::Latest)
2190                    && !have_latest_first_page
2191                    && !using_history
2192                {
2193                    let jump_days_ms = (HISTORY_SPLIT_DAYS + 1) * 86_400_000;
2194                    before_ms = Some(now_ms.saturating_sub(jump_days_ms));
2195                    have_latest_first_page = true;
2196                    continue;
2197                }
2198
2199                // Still empty for any other case? Just break.
2200                if raw.is_empty() {
2201                    break;
2202                }
2203            }
2204            // --- end fallbacks ---
2205
2206            pages += 1;
2207
2208            // Parse, oldest → newest
2209            let ts_init = self.generate_ts_init();
2210            let mut page: Vec<Bar> = Vec::with_capacity(raw.len());
2211            for r in &raw {
2212                page.push(parse_candlestick(
2213                    r,
2214                    bar_type,
2215                    inst.price_precision(),
2216                    inst.size_precision(),
2217                    ts_init,
2218                )?);
2219            }
2220            page.reverse();
2221
2222            let page_oldest_ms = page.first().map(|b| b.ts_event.as_i64() / 1_000_000);
2223            let page_newest_ms = page.last().map(|b| b.ts_event.as_i64() / 1_000_000);
2224
2225            // Range filter (inclusive)
2226            // For Range mode, if we have no bars yet and this is an early page,
2227            // be more tolerant with the start boundary to handle gaps in data
2228            let mut filtered: Vec<Bar> = if matches!(mode, Mode::Range)
2229                && out.is_empty()
2230                && pages < 2
2231            {
2232                // On first pages of Range mode with no data yet, include the most recent bar
2233                // even if it's slightly before our start time (within 2 bar periods)
2234                // BUT we want ALL bars in the page that are within our range
2235                let tolerance_ns = slot_ns * 2; // Allow up to 2 bar periods before start
2236
2237                // Debug: log the page range
2238                if !page.is_empty() {
2239                    tracing::debug!(
2240                        "Range mode bootstrap page: {} bars from {} to {}, filtering with start={:?} end={:?}",
2241                        page.len(),
2242                        page.first().unwrap().ts_event.as_i64() / 1_000_000,
2243                        page.last().unwrap().ts_event.as_i64() / 1_000_000,
2244                        start_ms,
2245                        end_ms
2246                    );
2247                }
2248
2249                let result: Vec<Bar> = page
2250                    .clone()
2251                    .into_iter()
2252                    .filter(|b| {
2253                        let ts = b.ts_event.as_i64();
2254                        // Accept bars from (start - tolerance) to end
2255                        let ok_after =
2256                            start_ns.is_none_or(|sns| ts >= sns.saturating_sub(tolerance_ns));
2257                        let ok_before = end_ns.is_none_or(|ens| ts <= ens);
2258                        ok_after && ok_before
2259                    })
2260                    .collect();
2261
2262                result
2263            } else {
2264                // Normal filtering
2265                page.clone()
2266                    .into_iter()
2267                    .filter(|b| {
2268                        let ts = b.ts_event.as_i64();
2269                        let ok_after = start_ns.is_none_or(|sns| ts >= sns);
2270                        let ok_before = end_ns.is_none_or(|ens| ts <= ens);
2271                        ok_after && ok_before
2272                    })
2273                    .collect()
2274            };
2275
2276            if !page.is_empty() && filtered.is_empty() {
2277                // For Range mode, if all bars are before our start time, there's no point continuing
2278                if matches!(mode, Mode::Range)
2279                    && !forward_prepend_mode
2280                    && let (Some(newest_ms), Some(start_ms)) = (page_newest_ms, start_ms)
2281                    && newest_ms < start_ms.saturating_sub(slot_ms * 2)
2282                {
2283                    // Bars are too old (more than 2 bar periods before start), stop
2284                    break;
2285                }
2286            }
2287
2288            // Track contribution for progress guard
2289            let contribution;
2290
2291            if out.is_empty() {
2292                contribution = filtered.len();
2293                out = filtered;
2294            } else {
2295                match mode {
2296                    Mode::Backward | Mode::Latest => {
2297                        if let Some(first) = out.first() {
2298                            filtered.retain(|b| b.ts_event < first.ts_event);
2299                        }
2300                        contribution = filtered.len();
2301                        if contribution != 0 {
2302                            let mut new_out = Vec::with_capacity(out.len() + filtered.len());
2303                            new_out.extend_from_slice(&filtered);
2304                            new_out.extend_from_slice(&out);
2305                            out = new_out;
2306                        }
2307                    }
2308                    Mode::Range => {
2309                        if forward_prepend_mode || req_used_before {
2310                            // We are backfilling older pages: prepend them.
2311                            if let Some(first) = out.first() {
2312                                filtered.retain(|b| b.ts_event < first.ts_event);
2313                            }
2314                            contribution = filtered.len();
2315                            if contribution != 0 {
2316                                let mut new_out = Vec::with_capacity(out.len() + filtered.len());
2317                                new_out.extend_from_slice(&filtered);
2318                                new_out.extend_from_slice(&out);
2319                                out = new_out;
2320                            }
2321                        } else {
2322                            // Normal forward: append newer pages.
2323                            if let Some(last) = out.last() {
2324                                filtered.retain(|b| b.ts_event > last.ts_event);
2325                            }
2326                            contribution = filtered.len();
2327                            out.extend(filtered);
2328                        }
2329                    }
2330                }
2331            }
2332
2333            // Duplicate-window mitigation for Latest/Backward/Range
2334            if contribution == 0
2335                && matches!(mode, Mode::Latest | Mode::Backward | Mode::Range)
2336                && let Some(b) = before_ms
2337            {
2338                let jump = (page_cap as i64).saturating_mul(slot_ms.max(1));
2339                let new_b = b.saturating_sub(jump);
2340                if new_b != b {
2341                    before_ms = Some(new_b);
2342                }
2343            }
2344
2345            if contribution == 0 {
2346                progressless_loops = progressless_loops.saturating_add(1);
2347                if progressless_loops >= 3 {
2348                    break;
2349                }
2350            } else {
2351                progressless_loops = 0;
2352
2353                // Advance cursors only when we made progress
2354                match mode {
2355                    Mode::Latest | Mode::Backward => {
2356                        if let Some(oldest) = page_oldest_ms {
2357                            before_ms = Some(oldest.saturating_sub(1));
2358                            have_latest_first_page = true;
2359                        } else {
2360                            break;
2361                        }
2362                    }
2363                    Mode::Range => {
2364                        if forward_prepend_mode || req_used_before {
2365                            if let Some(oldest) = page_oldest_ms {
2366                                // Move back by at least one bar period to avoid getting the same data
2367                                let jump_back = slot_ms.max(60_000); // At least 1 minute
2368                                before_ms = Some(oldest.saturating_sub(jump_back));
2369                                after_ms = None;
2370                            } else {
2371                                break;
2372                            }
2373                        } else if let Some(newest) = page_newest_ms {
2374                            after_ms = Some(newest.saturating_add(1));
2375                            before_ms = None;
2376                        } else {
2377                            break;
2378                        }
2379                    }
2380                }
2381            }
2382
2383            // Stop conditions
2384            if let Some(lim) = limit
2385                && lim > 0
2386                && out.len() >= lim as usize
2387            {
2388                break;
2389            }
2390            if let Some(ens) = end_ns
2391                && let Some(last) = out.last()
2392                && last.ts_event.as_i64() >= ens
2393            {
2394                break;
2395            }
2396            if let Some(sns) = start_ns
2397                && let Some(first) = out.first()
2398                && (matches!(mode, Mode::Backward) || forward_prepend_mode)
2399                && first.ts_event.as_i64() <= sns
2400            {
2401                // For Range mode, check if we have all bars up to the end time
2402                if matches!(mode, Mode::Range) {
2403                    // Don't stop if we haven't reached the end time yet
2404                    if let Some(ens) = end_ns
2405                        && let Some(last) = out.last()
2406                    {
2407                        let last_ts = last.ts_event.as_i64();
2408                        if last_ts < ens {
2409                            // We have bars before start but haven't reached end, need to continue forward
2410                            // Switch from backward to forward pagination
2411                            forward_prepend_mode = false;
2412                            after_ms = Some((last_ts / 1_000_000).saturating_add(1));
2413                            before_ms = None;
2414                            continue;
2415                        }
2416                    }
2417                }
2418                break;
2419            }
2420
2421            tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
2422        }
2423
2424        // Final rescue for FORWARD/RANGE when nothing gathered
2425        if out.is_empty() && matches!(mode, Mode::Range) {
2426            let pivot = end_ms.unwrap_or(now_ms.saturating_sub(1));
2427            let hist = (now_ms.saturating_sub(pivot)) / (24 * 60 * 60 * 1000) > HISTORY_SPLIT_DAYS;
2428            let mut p = GetCandlesticksParamsBuilder::default();
2429            p.inst_id(symbol.as_str())
2430                .bar(&bar_param)
2431                .limit(300)
2432                .before_ms(pivot);
2433            let params = p.build().map_err(anyhow::Error::new)?;
2434            let raw = if hist {
2435                self.inner.get_history_candles(params).await
2436            } else {
2437                self.inner.get_candles(params).await
2438            }
2439            .map_err(anyhow::Error::new)?;
2440            if !raw.is_empty() {
2441                let ts_init = self.generate_ts_init();
2442                let mut page: Vec<Bar> = Vec::with_capacity(raw.len());
2443                for r in &raw {
2444                    page.push(parse_candlestick(
2445                        r,
2446                        bar_type,
2447                        inst.price_precision(),
2448                        inst.size_precision(),
2449                        ts_init,
2450                    )?);
2451                }
2452                page.reverse();
2453                out = page
2454                    .into_iter()
2455                    .filter(|b| {
2456                        let ts = b.ts_event.as_i64();
2457                        let ok_after = start_ns.is_none_or(|sns| ts >= sns);
2458                        let ok_before = end_ns.is_none_or(|ens| ts <= ens);
2459                        ok_after && ok_before
2460                    })
2461                    .collect();
2462            }
2463        }
2464
2465        // Trim against end bound if needed (keep ≤ end)
2466        if let Some(ens) = end_ns {
2467            while out.last().is_some_and(|b| b.ts_event.as_i64() > ens) {
2468                out.pop();
2469            }
2470        }
2471
2472        // Clamp first bar for Range when using forward pagination
2473        if matches!(mode, Mode::Range)
2474            && !forward_prepend_mode
2475            && let Some(sns) = start_ns
2476        {
2477            let lower = sns.saturating_sub(slot_ns);
2478            while out.first().is_some_and(|b| b.ts_event.as_i64() < lower) {
2479                out.remove(0);
2480            }
2481        }
2482
2483        if let Some(lim) = limit
2484            && lim > 0
2485            && out.len() > lim as usize
2486        {
2487            out.truncate(lim as usize);
2488        }
2489
2490        Ok(out)
2491    }
2492
2493    /// Requests historical order status reports for the given parameters.
2494    ///
2495    /// # Errors
2496    ///
2497    /// Returns an error if the request fails.
2498    ///
2499    /// # References
2500    ///
2501    /// - <https://www.okx.com/docs-v5/en/#order-book-trading-trade-get-order-history-last-7-days>.
2502    /// - <https://www.okx.com/docs-v5/en/#order-book-trading-trade-get-order-history-last-3-months>.
2503    #[allow(clippy::too_many_arguments)]
2504    pub async fn request_order_status_reports(
2505        &self,
2506        account_id: AccountId,
2507        instrument_type: Option<OKXInstrumentType>,
2508        instrument_id: Option<InstrumentId>,
2509        start: Option<DateTime<Utc>>,
2510        end: Option<DateTime<Utc>>,
2511        open_only: bool,
2512        limit: Option<u32>,
2513    ) -> anyhow::Result<Vec<OrderStatusReport>> {
2514        let mut history_params = GetOrderHistoryParamsBuilder::default();
2515
2516        let instrument_type = if let Some(instrument_type) = instrument_type {
2517            instrument_type
2518        } else {
2519            let instrument_id = instrument_id.ok_or_else(|| {
2520                anyhow::anyhow!("Instrument ID required if `instrument_type` not provided")
2521            })?;
2522            let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
2523            okx_instrument_type(&instrument)?
2524        };
2525
2526        history_params.inst_type(instrument_type);
2527
2528        if let Some(instrument_id) = instrument_id.as_ref() {
2529            history_params.inst_id(instrument_id.symbol.inner().to_string());
2530        }
2531
2532        if let Some(limit) = limit {
2533            history_params.limit(limit);
2534        }
2535
2536        let history_params = history_params.build().map_err(|e| anyhow::anyhow!(e))?;
2537
2538        let mut pending_params = GetOrderListParamsBuilder::default();
2539        pending_params.inst_type(instrument_type);
2540
2541        if let Some(instrument_id) = instrument_id.as_ref() {
2542            pending_params.inst_id(instrument_id.symbol.inner().to_string());
2543        }
2544
2545        if let Some(limit) = limit {
2546            pending_params.limit(limit);
2547        }
2548
2549        let pending_params = pending_params.build().map_err(|e| anyhow::anyhow!(e))?;
2550
2551        let combined_resp = if open_only {
2552            // Only request pending/open orders
2553            self.inner
2554                .get_orders_pending(pending_params)
2555                .await
2556                .map_err(|e| anyhow::anyhow!(e))?
2557        } else {
2558            // Make both requests concurrently
2559            let (history_resp, pending_resp) = tokio::try_join!(
2560                self.inner.get_orders_history(history_params),
2561                self.inner.get_orders_pending(pending_params)
2562            )
2563            .map_err(|e| anyhow::anyhow!(e))?;
2564
2565            // Combine both responses
2566            let mut combined_resp = history_resp;
2567            combined_resp.extend(pending_resp);
2568            combined_resp
2569        };
2570
2571        // Prepare time range filter
2572        let start_ns = start.map(UnixNanos::from);
2573        let end_ns = end.map(UnixNanos::from);
2574
2575        let ts_init = self.generate_ts_init();
2576        let mut reports = Vec::with_capacity(combined_resp.len());
2577
2578        // Use a seen filter in case pending orders are within the histories "2hr reserve window"
2579        let mut seen: AHashSet<String> = AHashSet::new();
2580
2581        for order in combined_resp {
2582            let seen_key = if !order.cl_ord_id.is_empty() {
2583                order.cl_ord_id.as_str().to_string()
2584            } else if let Some(algo_cl_ord_id) = order
2585                .algo_cl_ord_id
2586                .as_ref()
2587                .filter(|value| !value.as_str().is_empty())
2588            {
2589                algo_cl_ord_id.as_str().to_string()
2590            } else if let Some(algo_id) = order
2591                .algo_id
2592                .as_ref()
2593                .filter(|value| !value.as_str().is_empty())
2594            {
2595                algo_id.as_str().to_string()
2596            } else {
2597                order.ord_id.as_str().to_string()
2598            };
2599
2600            if !seen.insert(seen_key) {
2601                continue; // Reserved pending already reported
2602            }
2603
2604            let Ok(inst) = self.instrument_from_cache(order.inst_id) else {
2605                tracing::debug!(
2606                    symbol = %order.inst_id,
2607                    "Skipping order report for instrument not in cache"
2608                );
2609                continue;
2610            };
2611
2612            let report = match parse_order_status_report(
2613                &order,
2614                account_id,
2615                inst.id(),
2616                inst.price_precision(),
2617                inst.size_precision(),
2618                ts_init,
2619            ) {
2620                Ok(report) => report,
2621                Err(e) => {
2622                    tracing::error!("Failed to parse order status report: {e}");
2623                    continue;
2624                }
2625            };
2626
2627            if let Some(start_ns) = start_ns
2628                && report.ts_last < start_ns
2629            {
2630                continue;
2631            }
2632            if let Some(end_ns) = end_ns
2633                && report.ts_last > end_ns
2634            {
2635                continue;
2636            }
2637
2638            reports.push(report);
2639        }
2640
2641        Ok(reports)
2642    }
2643
2644    /// Requests fill reports (transaction details) for the given parameters.
2645    ///
2646    /// # Errors
2647    ///
2648    /// Returns an error if the request fails.
2649    ///
2650    /// # References
2651    ///
2652    /// <https://www.okx.com/docs-v5/en/#order-book-trading-trade-get-transaction-details-last-3-days>.
2653    pub async fn request_fill_reports(
2654        &self,
2655        account_id: AccountId,
2656        instrument_type: Option<OKXInstrumentType>,
2657        instrument_id: Option<InstrumentId>,
2658        start: Option<DateTime<Utc>>,
2659        end: Option<DateTime<Utc>>,
2660        limit: Option<u32>,
2661    ) -> anyhow::Result<Vec<FillReport>> {
2662        let mut params = GetTransactionDetailsParamsBuilder::default();
2663
2664        let instrument_type = if let Some(instrument_type) = instrument_type {
2665            instrument_type
2666        } else {
2667            let instrument_id = instrument_id.ok_or_else(|| {
2668                anyhow::anyhow!("Instrument ID required if `instrument_type` not provided")
2669            })?;
2670            let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
2671            okx_instrument_type(&instrument)?
2672        };
2673
2674        params.inst_type(instrument_type);
2675
2676        if let Some(instrument_id) = instrument_id {
2677            let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
2678            let instrument_type = okx_instrument_type(&instrument)?;
2679            params.inst_type(instrument_type);
2680            params.inst_id(instrument_id.symbol.inner().to_string());
2681        }
2682
2683        if let Some(limit) = limit {
2684            params.limit(limit);
2685        }
2686
2687        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2688
2689        let resp = self
2690            .inner
2691            .get_fills(params)
2692            .await
2693            .map_err(|e| anyhow::anyhow!(e))?;
2694
2695        // Prepare time range filter
2696        let start_ns = start.map(UnixNanos::from);
2697        let end_ns = end.map(UnixNanos::from);
2698
2699        let ts_init = self.generate_ts_init();
2700        let mut reports = Vec::with_capacity(resp.len());
2701
2702        for detail in resp {
2703            // Skip fills with zero or negative quantity (cancelled orders, etc)
2704            if detail.fill_sz.is_empty() {
2705                continue;
2706            }
2707            if let Ok(qty) = detail.fill_sz.parse::<f64>() {
2708                if qty <= 0.0 {
2709                    continue;
2710                }
2711            } else {
2712                // Skip unparsable quantities
2713                continue;
2714            }
2715
2716            let Ok(inst) = self.instrument_from_cache(detail.inst_id) else {
2717                tracing::debug!(
2718                    symbol = %detail.inst_id,
2719                    "Skipping fill report for instrument not in cache"
2720                );
2721                continue;
2722            };
2723
2724            let report = match parse_fill_report(
2725                detail,
2726                account_id,
2727                inst.id(),
2728                inst.price_precision(),
2729                inst.size_precision(),
2730                ts_init,
2731            ) {
2732                Ok(report) => report,
2733                Err(e) => {
2734                    tracing::error!("Failed to parse fill report: {e}");
2735                    continue;
2736                }
2737            };
2738
2739            if let Some(start_ns) = start_ns
2740                && report.ts_event < start_ns
2741            {
2742                continue;
2743            }
2744
2745            if let Some(end_ns) = end_ns
2746                && report.ts_event > end_ns
2747            {
2748                continue;
2749            }
2750
2751            reports.push(report);
2752        }
2753
2754        Ok(reports)
2755    }
2756
2757    /// Requests current position status reports for the given parameters.
2758    ///
2759    /// # Position Modes
2760    ///
2761    /// OKX supports two position modes, which affects how position data is returned:
2762    ///
2763    /// ## Net Mode (One-way)
2764    /// - `posSide` field will be `"net"`
2765    /// - `pos` field uses **signed quantities**:
2766    ///   - Positive value = Long position
2767    ///   - Negative value = Short position
2768    ///   - Zero = Flat/no position
2769    ///
2770    /// ## Long/Short Mode (Hedge/Dual-side)
2771    /// - `posSide` field will be `"long"` or `"short"`
2772    /// - `pos` field is **always positive** (use `posSide` to determine actual side)
2773    /// - Allows holding simultaneous long and short positions on the same instrument
2774    /// - Position IDs are suffixed with `-LONG` or `-SHORT` for uniqueness
2775    ///
2776    /// # Errors
2777    ///
2778    /// Returns an error if the request fails.
2779    ///
2780    /// # References
2781    ///
2782    /// <https://www.okx.com/docs-v5/en/#trading-account-rest-api-get-positions>
2783    pub async fn request_position_status_reports(
2784        &self,
2785        account_id: AccountId,
2786        instrument_type: Option<OKXInstrumentType>,
2787        instrument_id: Option<InstrumentId>,
2788    ) -> anyhow::Result<Vec<PositionStatusReport>> {
2789        let mut params = GetPositionsParamsBuilder::default();
2790
2791        let instrument_type = if let Some(instrument_type) = instrument_type {
2792            instrument_type
2793        } else {
2794            let instrument_id = instrument_id.ok_or_else(|| {
2795                anyhow::anyhow!("Instrument ID required if `instrument_type` not provided")
2796            })?;
2797            let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
2798            okx_instrument_type(&instrument)?
2799        };
2800
2801        params.inst_type(instrument_type);
2802
2803        instrument_id
2804            .as_ref()
2805            .map(|i| params.inst_id(i.symbol.inner()));
2806
2807        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2808
2809        let resp = self
2810            .inner
2811            .get_positions(params)
2812            .await
2813            .map_err(|e| anyhow::anyhow!(e))?;
2814
2815        let ts_init = self.generate_ts_init();
2816        let mut reports = Vec::with_capacity(resp.len());
2817
2818        for position in resp {
2819            let Ok(inst) = self.instrument_from_cache(position.inst_id) else {
2820                tracing::debug!(
2821                    symbol = %position.inst_id,
2822                    "Skipping position report for instrument not in cache"
2823                );
2824                continue;
2825            };
2826
2827            match parse_position_status_report(
2828                position,
2829                account_id,
2830                inst.id(),
2831                inst.size_precision(),
2832                ts_init,
2833            ) {
2834                Ok(report) => reports.push(report),
2835                Err(e) => {
2836                    tracing::error!("Failed to parse position status report: {e}");
2837                    continue;
2838                }
2839            };
2840        }
2841
2842        Ok(reports)
2843    }
2844
2845    /// Requests spot margin position status reports from account balance.
2846    ///
2847    /// Spot margin positions appear in `/api/v5/account/balance` as balance sheet items
2848    /// with non-zero `liab` (liability) or `spotInUseAmt` fields, rather than in the
2849    /// positions endpoint. This method fetches the balance and converts any margin
2850    /// positions into position status reports.
2851    ///
2852    /// # Errors
2853    ///
2854    /// Returns an error if the request fails or parsing fails.
2855    ///
2856    /// # References
2857    ///
2858    /// <https://www.okx.com/docs-v5/en/#trading-account-rest-api-get-balance>
2859    pub async fn request_spot_margin_position_reports(
2860        &self,
2861        account_id: AccountId,
2862    ) -> anyhow::Result<Vec<PositionStatusReport>> {
2863        let accounts = self
2864            .inner
2865            .get_balance()
2866            .await
2867            .map_err(|e| anyhow::anyhow!(e))?;
2868
2869        let ts_init = self.generate_ts_init();
2870        let mut reports = Vec::new();
2871
2872        for account in accounts {
2873            for balance in account.details {
2874                let ccy_str = balance.ccy.as_str();
2875
2876                // Try to find instrument by constructing potential spot pair symbols
2877                let potential_symbols = [
2878                    format!("{ccy_str}-USDT"),
2879                    format!("{ccy_str}-USD"),
2880                    format!("{ccy_str}-USDC"),
2881                ];
2882
2883                let instrument_result = potential_symbols.iter().find_map(|symbol| {
2884                    self.instrument_from_cache(Ustr::from(symbol))
2885                        .ok()
2886                        .map(|inst| (inst.id(), inst.size_precision()))
2887                });
2888
2889                let (instrument_id, size_precision) = match instrument_result {
2890                    Some((id, prec)) => (id, prec),
2891                    None => {
2892                        tracing::debug!(
2893                            "Skipping balance for {} - no matching instrument in cache",
2894                            ccy_str
2895                        );
2896                        continue;
2897                    }
2898                };
2899
2900                match parse_spot_margin_position_from_balance(
2901                    &balance,
2902                    account_id,
2903                    instrument_id,
2904                    size_precision,
2905                    ts_init,
2906                ) {
2907                    Ok(Some(report)) => reports.push(report),
2908                    Ok(None) => {} // No margin position for this currency
2909                    Err(e) => {
2910                        tracing::error!(
2911                            "Failed to parse spot margin position from balance for {}: {e}",
2912                            ccy_str
2913                        );
2914                        continue;
2915                    }
2916                };
2917            }
2918        }
2919
2920        Ok(reports)
2921    }
2922
2923    /// Places an algo order via HTTP.
2924    ///
2925    /// # Errors
2926    ///
2927    /// Returns an error if the request fails.
2928    ///
2929    /// # References
2930    ///
2931    /// <https://www.okx.com/docs-v5/en/#order-book-trading-algo-trading-post-place-algo-order>
2932    pub async fn place_algo_order(
2933        &self,
2934        request: OKXPlaceAlgoOrderRequest,
2935    ) -> Result<OKXPlaceAlgoOrderResponse, OKXHttpError> {
2936        let body =
2937            serde_json::to_vec(&request).map_err(|e| OKXHttpError::JsonError(e.to_string()))?;
2938
2939        let resp: Vec<OKXPlaceAlgoOrderResponse> = self
2940            .inner
2941            .send_request::<_, ()>(
2942                Method::POST,
2943                "/api/v5/trade/order-algo",
2944                None,
2945                Some(body),
2946                true,
2947            )
2948            .await?;
2949
2950        resp.into_iter()
2951            .next()
2952            .ok_or_else(|| OKXHttpError::ValidationError("Empty response".to_string()))
2953    }
2954
2955    /// Cancels an algo order via HTTP.
2956    ///
2957    /// # Errors
2958    ///
2959    /// Returns an error if the request fails.
2960    ///
2961    /// # References
2962    ///
2963    /// <https://www.okx.com/docs-v5/en/#order-book-trading-algo-trading-post-cancel-algo-order>
2964    pub async fn cancel_algo_order(
2965        &self,
2966        request: OKXCancelAlgoOrderRequest,
2967    ) -> Result<OKXCancelAlgoOrderResponse, OKXHttpError> {
2968        // OKX expects an array for cancel-algos endpoint
2969        // Serialize once to bytes to keep signing and sending identical
2970        let body =
2971            serde_json::to_vec(&[request]).map_err(|e| OKXHttpError::JsonError(e.to_string()))?;
2972
2973        let resp: Vec<OKXCancelAlgoOrderResponse> = self
2974            .inner
2975            .send_request::<_, ()>(
2976                Method::POST,
2977                "/api/v5/trade/cancel-algos",
2978                None,
2979                Some(body),
2980                true,
2981            )
2982            .await?;
2983
2984        resp.into_iter()
2985            .next()
2986            .ok_or_else(|| OKXHttpError::ValidationError("Empty response".to_string()))
2987    }
2988
2989    /// Places an algo order using domain types.
2990    ///
2991    /// This is a convenience method that accepts Nautilus domain types
2992    /// and builds the appropriate OKX request structure internally.
2993    ///
2994    /// # Errors
2995    ///
2996    /// Returns an error if the request fails.
2997    #[allow(clippy::too_many_arguments)]
2998    pub async fn place_algo_order_with_domain_types(
2999        &self,
3000        instrument_id: InstrumentId,
3001        td_mode: OKXTradeMode,
3002        client_order_id: ClientOrderId,
3003        order_side: OrderSide,
3004        order_type: OrderType,
3005        quantity: Quantity,
3006        trigger_price: Price,
3007        trigger_type: Option<TriggerType>,
3008        limit_price: Option<Price>,
3009        reduce_only: Option<bool>,
3010    ) -> Result<OKXPlaceAlgoOrderResponse, OKXHttpError> {
3011        if !matches!(order_side, OrderSide::Buy | OrderSide::Sell) {
3012            return Err(OKXHttpError::ValidationError(
3013                "Invalid order side".to_string(),
3014            ));
3015        }
3016        let okx_side: OKXSide = order_side.into();
3017
3018        // Map trigger type to OKX format
3019        let trigger_px_type_enum = trigger_type.map_or(OKXTriggerType::Last, Into::into);
3020
3021        // Determine order price based on order type
3022        let order_px = if matches!(order_type, OrderType::StopLimit | OrderType::LimitIfTouched) {
3023            limit_price.map(|p| p.to_string())
3024        } else {
3025            // Market orders use -1 to indicate market execution
3026            Some("-1".to_string())
3027        };
3028
3029        let request = OKXPlaceAlgoOrderRequest {
3030            inst_id: instrument_id.symbol.as_str().to_string(),
3031            td_mode,
3032            side: okx_side,
3033            ord_type: OKXAlgoOrderType::Trigger, // All conditional orders use 'trigger' type
3034            sz: quantity.to_string(),
3035            algo_cl_ord_id: Some(client_order_id.as_str().to_string()),
3036            trigger_px: Some(trigger_price.to_string()),
3037            order_px,
3038            trigger_px_type: Some(trigger_px_type_enum),
3039            tgt_ccy: None,  // Let OKX determine based on instrument
3040            pos_side: None, // Use default position side
3041            close_position: None,
3042            tag: Some(OKX_NAUTILUS_BROKER_ID.to_string()),
3043            reduce_only,
3044        };
3045
3046        self.place_algo_order(request).await
3047    }
3048
3049    /// Cancels an algo order using domain types.
3050    ///
3051    /// This is a convenience method that accepts Nautilus domain types
3052    /// and builds the appropriate OKX request structure internally.
3053    ///
3054    /// # Errors
3055    ///
3056    /// Returns an error if the request fails.
3057    pub async fn cancel_algo_order_with_domain_types(
3058        &self,
3059        instrument_id: InstrumentId,
3060        algo_id: String,
3061    ) -> Result<OKXCancelAlgoOrderResponse, OKXHttpError> {
3062        let request = OKXCancelAlgoOrderRequest {
3063            inst_id: instrument_id.symbol.to_string(),
3064            algo_id: Some(algo_id),
3065            algo_cl_ord_id: None,
3066        };
3067
3068        self.cancel_algo_order(request).await
3069    }
3070
3071    /// Requests algo order status reports.
3072    ///
3073    /// # Errors
3074    ///
3075    /// Returns an error if the request fails.
3076    #[allow(clippy::too_many_arguments)]
3077    pub async fn request_algo_order_status_reports(
3078        &self,
3079        account_id: AccountId,
3080        instrument_type: Option<OKXInstrumentType>,
3081        instrument_id: Option<InstrumentId>,
3082        algo_id: Option<String>,
3083        algo_client_order_id: Option<ClientOrderId>,
3084        state: Option<OKXOrderStatus>,
3085        limit: Option<u32>,
3086    ) -> anyhow::Result<Vec<OrderStatusReport>> {
3087        let mut instruments_cache: AHashMap<Ustr, InstrumentAny> = AHashMap::new();
3088
3089        let inst_type = if let Some(inst_type) = instrument_type {
3090            inst_type
3091        } else if let Some(inst_id) = instrument_id {
3092            let instrument = self.instrument_from_cache(inst_id.symbol.inner())?;
3093            let inst_type = okx_instrument_type(&instrument)?;
3094            instruments_cache.insert(inst_id.symbol.inner(), instrument);
3095            inst_type
3096        } else {
3097            anyhow::bail!("instrument_type or instrument_id required for algo order query")
3098        };
3099
3100        let mut params_builder = GetAlgoOrdersParamsBuilder::default();
3101        params_builder.inst_type(inst_type);
3102        if let Some(inst_id) = instrument_id {
3103            params_builder.inst_id(inst_id.symbol.inner().to_string());
3104        }
3105        if let Some(algo_id) = algo_id.as_ref() {
3106            params_builder.algo_id(algo_id.clone());
3107        }
3108        if let Some(client_order_id) = algo_client_order_id.as_ref() {
3109            params_builder.algo_cl_ord_id(client_order_id.as_str().to_string());
3110        }
3111        if let Some(state) = state {
3112            params_builder.state(state);
3113        }
3114        if let Some(limit) = limit {
3115            params_builder.limit(limit);
3116        }
3117
3118        let params = params_builder
3119            .build()
3120            .map_err(|e| anyhow::anyhow!(format!("Failed to build algo order params: {e}")))?;
3121
3122        let ts_init = self.generate_ts_init();
3123        let mut reports = Vec::new();
3124        let mut seen: AHashSet<(String, String)> = AHashSet::new();
3125
3126        let pending = match self.inner.get_order_algo_pending(params.clone()).await {
3127            Ok(result) => result,
3128            Err(OKXHttpError::UnexpectedStatus { status, .. })
3129                if status == StatusCode::NOT_FOUND =>
3130            {
3131                Vec::new()
3132            }
3133            Err(e) => return Err(e.into()),
3134        };
3135        self.collect_algo_reports(
3136            account_id,
3137            &pending,
3138            &mut instruments_cache,
3139            ts_init,
3140            &mut seen,
3141            &mut reports,
3142        )
3143        .await?;
3144
3145        let history = match self.inner.get_order_algo_history(params).await {
3146            Ok(result) => result,
3147            Err(OKXHttpError::UnexpectedStatus { status, .. })
3148                if status == StatusCode::NOT_FOUND =>
3149            {
3150                Vec::new()
3151            }
3152            Err(e) => return Err(e.into()),
3153        };
3154        self.collect_algo_reports(
3155            account_id,
3156            &history,
3157            &mut instruments_cache,
3158            ts_init,
3159            &mut seen,
3160            &mut reports,
3161        )
3162        .await?;
3163
3164        Ok(reports)
3165    }
3166
3167    /// Requests an algo order status report by client order identifier.
3168    ///
3169    /// # Errors
3170    ///
3171    /// Returns an error if the request fails.
3172    pub async fn request_algo_order_status_report(
3173        &self,
3174        account_id: AccountId,
3175        instrument_id: InstrumentId,
3176        algo_client_order_id: ClientOrderId,
3177    ) -> anyhow::Result<Option<OrderStatusReport>> {
3178        let reports = self
3179            .request_algo_order_status_reports(
3180                account_id,
3181                None,
3182                Some(instrument_id),
3183                None,
3184                Some(algo_client_order_id),
3185                None,
3186                Some(50_u32),
3187            )
3188            .await?;
3189
3190        Ok(reports.into_iter().next())
3191    }
3192
3193    /// Exposes raw HTTP client for testing purposes
3194    pub fn raw_client(&self) -> &Arc<OKXRawHttpClient> {
3195        &self.inner
3196    }
3197
3198    async fn collect_algo_reports(
3199        &self,
3200        account_id: AccountId,
3201        orders: &[OKXOrderAlgo],
3202        instruments_cache: &mut AHashMap<Ustr, InstrumentAny>,
3203        ts_init: UnixNanos,
3204        seen: &mut AHashSet<(String, String)>,
3205        reports: &mut Vec<OrderStatusReport>,
3206    ) -> anyhow::Result<()> {
3207        for order in orders {
3208            let key = (order.algo_id.clone(), order.algo_cl_ord_id.clone());
3209            if !seen.insert(key) {
3210                continue;
3211            }
3212
3213            let instrument = if let Some(instrument) = instruments_cache.get(&order.inst_id) {
3214                instrument.clone()
3215            } else {
3216                let Ok(instrument) = self.instrument_from_cache(order.inst_id) else {
3217                    tracing::debug!(
3218                        symbol = %order.inst_id,
3219                        "Skipping algo order report for instrument not in cache"
3220                    );
3221                    continue;
3222                };
3223                instruments_cache.insert(order.inst_id, instrument.clone());
3224                instrument
3225            };
3226
3227            match parse_http_algo_order(order, account_id, &instrument, ts_init) {
3228                Ok(report) => reports.push(report),
3229                Err(e) => {
3230                    tracing::error!("Failed to parse algo order report: {e}");
3231                }
3232            }
3233        }
3234
3235        Ok(())
3236    }
3237}
3238
3239fn parse_http_algo_order(
3240    order: &OKXOrderAlgo,
3241    account_id: AccountId,
3242    instrument: &InstrumentAny,
3243    ts_init: UnixNanos,
3244) -> anyhow::Result<OrderStatusReport> {
3245    let ord_px = if order.ord_px.is_empty() {
3246        "-1".to_string()
3247    } else {
3248        order.ord_px.clone()
3249    };
3250
3251    let reduce_only = if order.reduce_only.is_empty() {
3252        "false".to_string()
3253    } else {
3254        order.reduce_only.clone()
3255    };
3256
3257    let msg = OKXAlgoOrderMsg {
3258        algo_id: order.algo_id.clone(),
3259        algo_cl_ord_id: order.algo_cl_ord_id.clone(),
3260        cl_ord_id: order.cl_ord_id.clone(),
3261        ord_id: order.ord_id.clone(),
3262        inst_id: order.inst_id,
3263        inst_type: order.inst_type,
3264        ord_type: order.ord_type,
3265        state: order.state,
3266        side: order.side,
3267        pos_side: order.pos_side,
3268        sz: order.sz.clone(),
3269        trigger_px: order.trigger_px.clone(),
3270        trigger_px_type: order.trigger_px_type.unwrap_or(OKXTriggerType::None),
3271        ord_px,
3272        td_mode: order.td_mode,
3273        lever: order.lever.clone(),
3274        reduce_only,
3275        actual_px: order.actual_px.clone(),
3276        actual_sz: order.actual_sz.clone(),
3277        notional_usd: order.notional_usd.clone(),
3278        c_time: order.c_time,
3279        u_time: order.u_time,
3280        trigger_time: order.trigger_time.clone(),
3281        tag: order.tag.clone(),
3282    };
3283
3284    parse_algo_order_status_report(&msg, instrument, account_id, ts_init)
3285}