nautilus_okx/http/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides an ergonomic wrapper around the **OKX v5 REST API** –
17//! <https://www.okx.com/docs-v5/en/>.
18//!
19//! The core type exported by this module is [`OKXHttpClient`].  It offers an
20//! interface to all exchange endpoints currently required by NautilusTrader.
21//!
22//! Key responsibilities handled internally:
23//! • Request signing and header composition for private routes (HMAC-SHA256).
24//! • Rate-limiting based on the public OKX specification.
25//! • Zero-copy deserialization of large JSON payloads into domain models.
26//! • Conversion of raw exchange errors into the rich [`OKXHttpError`] enum.
27//!
28//! # Official documentation
29//!
30//! | Endpoint                             | Reference                                              |
31//! |--------------------------------------|--------------------------------------------------------|
32//! | Market data                          | <https://www.okx.com/docs-v5/en/#rest-api-market-data> |
33//! | Account & positions                  | <https://www.okx.com/docs-v5/en/#rest-api-account>     |
34//! | Funding & asset balances             | <https://www.okx.com/docs-v5/en/#rest-api-funding>     |
35
36use std::{
37    collections::HashMap,
38    fmt::{Debug, Formatter},
39    num::NonZeroU32,
40    str::FromStr,
41    sync::{
42        Arc, LazyLock,
43        atomic::{AtomicBool, Ordering},
44    },
45};
46
47use ahash::{AHashMap, AHashSet};
48use chrono::{DateTime, Utc};
49use dashmap::DashMap;
50use nautilus_core::{
51    UnixNanos, consts::NAUTILUS_USER_AGENT, env::get_or_env_var, time::get_atomic_clock_realtime,
52};
53use nautilus_model::{
54    data::{Bar, BarType, IndexPriceUpdate, MarkPriceUpdate, TradeTick},
55    enums::{AggregationSource, BarAggregation, OrderSide, OrderType, TriggerType},
56    events::AccountState,
57    identifiers::{AccountId, ClientOrderId, InstrumentId},
58    instruments::{Instrument, InstrumentAny},
59    reports::{FillReport, OrderStatusReport, PositionStatusReport},
60    types::{Price, Quantity},
61};
62use nautilus_network::{
63    http::HttpClient,
64    ratelimiter::quota::Quota,
65    retry::{RetryConfig, RetryManager},
66};
67use reqwest::{Method, StatusCode, header::USER_AGENT};
68use rust_decimal::Decimal;
69use serde::{Deserialize, Serialize, de::DeserializeOwned};
70use tokio_util::sync::CancellationToken;
71use ustr::Ustr;
72
73use super::{
74    error::OKXHttpError,
75    models::{
76        OKXAccount, OKXCancelAlgoOrderRequest, OKXCancelAlgoOrderResponse, OKXFeeRate,
77        OKXIndexTicker, OKXMarkPrice, OKXOrderAlgo, OKXOrderHistory, OKXPlaceAlgoOrderRequest,
78        OKXPlaceAlgoOrderResponse, OKXPosition, OKXPositionHistory, OKXPositionTier, OKXServerTime,
79        OKXTransactionDetail,
80    },
81    query::{
82        GetAlgoOrdersParams, GetAlgoOrdersParamsBuilder, GetCandlesticksParams,
83        GetCandlesticksParamsBuilder, GetIndexTickerParams, GetIndexTickerParamsBuilder,
84        GetInstrumentsParams, GetInstrumentsParamsBuilder, GetMarkPriceParams,
85        GetMarkPriceParamsBuilder, GetOrderHistoryParams, GetOrderHistoryParamsBuilder,
86        GetOrderListParams, GetOrderListParamsBuilder, GetPositionTiersParams,
87        GetPositionsHistoryParams, GetPositionsParams, GetPositionsParamsBuilder,
88        GetTradeFeeParams, GetTradesParams, GetTradesParamsBuilder, GetTransactionDetailsParams,
89        GetTransactionDetailsParamsBuilder, SetPositionModeParams, SetPositionModeParamsBuilder,
90    },
91};
92use crate::{
93    common::{
94        consts::{OKX_HTTP_URL, OKX_NAUTILUS_BROKER_ID, should_retry_error_code},
95        credential::Credential,
96        enums::{
97            OKXAlgoOrderType, OKXContractType, OKXInstrumentStatus, OKXInstrumentType,
98            OKXOrderStatus, OKXPositionMode, OKXSide, OKXTradeMode, OKXTriggerType,
99        },
100        models::OKXInstrument,
101        parse::{
102            okx_instrument_type, okx_instrument_type_from_symbol, parse_account_state,
103            parse_candlestick, parse_fill_report, parse_index_price_update, parse_instrument_any,
104            parse_mark_price_update, parse_order_status_report, parse_position_status_report,
105            parse_spot_margin_position_from_balance, parse_trade_tick,
106        },
107    },
108    http::{
109        models::{OKXCandlestick, OKXTrade},
110        query::GetOrderParams,
111    },
112    websocket::{messages::OKXAlgoOrderMsg, parse::parse_algo_order_status_report},
113};
114
115const OKX_SUCCESS_CODE: &str = "0";
116
117/// Default OKX REST API rate limit: 500 requests per 2 seconds.
118///
119/// - Sub-account order limit: 1000 requests per 2 seconds.
120/// - Account balance: 10 requests per 2 seconds.
121/// - Account instruments: 20 requests per 2 seconds.
122///
123/// We use a conservative 250 requests per second (500 per 2 seconds) as a general limit
124/// that should accommodate most use cases while respecting OKX's documented limits.
125pub static OKX_REST_QUOTA: LazyLock<Quota> =
126    LazyLock::new(|| Quota::per_second(NonZeroU32::new(250).unwrap()));
127
128const OKX_GLOBAL_RATE_KEY: &str = "okx:global";
129
130/// Represents an OKX HTTP response.
131#[derive(Debug, Serialize, Deserialize)]
132pub struct OKXResponse<T> {
133    /// The OKX response code, which is `"0"` for success.
134    pub code: String,
135    /// A message string which can be informational or describe an error cause.
136    pub msg: String,
137    /// The typed data returned by the OKX endpoint.
138    pub data: Vec<T>,
139}
140
141/// Provides a raw HTTP client for interacting with the [OKX](https://okx.com) REST API.
142///
143/// This client wraps the underlying [`HttpClient`] to handle functionality
144/// specific to OKX, such as request signing (for authenticated endpoints),
145/// forming request URLs, and deserializing responses into OKX specific data models.
146pub struct OKXRawHttpClient {
147    base_url: String,
148    client: HttpClient,
149    credential: Option<Credential>,
150    retry_manager: RetryManager<OKXHttpError>,
151    cancellation_token: CancellationToken,
152    is_demo: bool,
153}
154
155impl Default for OKXRawHttpClient {
156    fn default() -> Self {
157        Self::new(None, Some(60), None, None, None, false, None)
158            .expect("Failed to create default OKXRawHttpClient")
159    }
160}
161
162impl Debug for OKXRawHttpClient {
163    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
164        let credential = self.credential.as_ref().map(|_| "<redacted>");
165        f.debug_struct(stringify!(OKXRawHttpClient))
166            .field("base_url", &self.base_url)
167            .field("credential", &credential)
168            .finish_non_exhaustive()
169    }
170}
171
172impl OKXRawHttpClient {
173    fn rate_limiter_quotas() -> Vec<(String, Quota)> {
174        vec![
175            (OKX_GLOBAL_RATE_KEY.to_string(), *OKX_REST_QUOTA),
176            (
177                "okx:/api/v5/account/balance".to_string(),
178                Quota::per_second(NonZeroU32::new(5).unwrap()),
179            ),
180            (
181                "okx:/api/v5/public/instruments".to_string(),
182                Quota::per_second(NonZeroU32::new(10).unwrap()),
183            ),
184            (
185                "okx:/api/v5/market/candles".to_string(),
186                Quota::per_second(NonZeroU32::new(50).unwrap()),
187            ),
188            (
189                "okx:/api/v5/market/history-candles".to_string(),
190                Quota::per_second(NonZeroU32::new(20).unwrap()),
191            ),
192            (
193                "okx:/api/v5/market/history-trades".to_string(),
194                Quota::per_second(NonZeroU32::new(30).unwrap()),
195            ),
196            (
197                "okx:/api/v5/trade/order".to_string(),
198                Quota::per_second(NonZeroU32::new(30).unwrap()), // 60 requests / 2 seconds (per instrument)
199            ),
200            (
201                "okx:/api/v5/trade/orders-pending".to_string(),
202                Quota::per_second(NonZeroU32::new(20).unwrap()),
203            ),
204            (
205                "okx:/api/v5/trade/orders-history".to_string(),
206                Quota::per_second(NonZeroU32::new(20).unwrap()),
207            ),
208            (
209                "okx:/api/v5/trade/fills".to_string(),
210                Quota::per_second(NonZeroU32::new(30).unwrap()),
211            ),
212            (
213                "okx:/api/v5/trade/order-algo".to_string(),
214                Quota::per_second(NonZeroU32::new(10).unwrap()),
215            ),
216            (
217                "okx:/api/v5/trade/cancel-algos".to_string(),
218                Quota::per_second(NonZeroU32::new(10).unwrap()),
219            ),
220        ]
221    }
222
223    fn rate_limit_keys(endpoint: &str) -> Vec<Ustr> {
224        let normalized = endpoint.split('?').next().unwrap_or(endpoint);
225        let route = format!("okx:{normalized}");
226
227        vec![Ustr::from(OKX_GLOBAL_RATE_KEY), Ustr::from(route.as_str())]
228    }
229
230    /// Cancel all pending HTTP requests.
231    pub fn cancel_all_requests(&self) {
232        self.cancellation_token.cancel();
233    }
234
235    /// Get the cancellation token for this client.
236    pub fn cancellation_token(&self) -> &CancellationToken {
237        &self.cancellation_token
238    }
239
240    /// Creates a new [`OKXHttpClient`] using the default OKX HTTP URL,
241    /// optionally overridden with a custom base URL.
242    ///
243    /// This version of the client has **no credentials**, so it can only
244    /// call publicly accessible endpoints.
245    ///
246    /// # Errors
247    ///
248    /// Returns an error if the retry manager cannot be created.
249    pub fn new(
250        base_url: Option<String>,
251        timeout_secs: Option<u64>,
252        max_retries: Option<u32>,
253        retry_delay_ms: Option<u64>,
254        retry_delay_max_ms: Option<u64>,
255        is_demo: bool,
256        proxy_url: Option<String>,
257    ) -> Result<Self, OKXHttpError> {
258        let retry_config = RetryConfig {
259            max_retries: max_retries.unwrap_or(3),
260            initial_delay_ms: retry_delay_ms.unwrap_or(1000),
261            max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
262            backoff_factor: 2.0,
263            jitter_ms: 1000,
264            operation_timeout_ms: Some(60_000),
265            immediate_first: false,
266            max_elapsed_ms: Some(180_000),
267        };
268
269        let retry_manager = RetryManager::new(retry_config);
270
271        Ok(Self {
272            base_url: base_url.unwrap_or(OKX_HTTP_URL.to_string()),
273            client: HttpClient::new(
274                Self::default_headers(is_demo),
275                vec![],
276                Self::rate_limiter_quotas(),
277                Some(*OKX_REST_QUOTA),
278                timeout_secs,
279                proxy_url,
280            )
281            .map_err(|e| {
282                OKXHttpError::ValidationError(format!("Failed to create HTTP client: {e}"))
283            })?,
284            credential: None,
285            retry_manager,
286            cancellation_token: CancellationToken::new(),
287            is_demo,
288        })
289    }
290
291    /// Creates a new [`OKXHttpClient`] configured with credentials
292    /// for authenticated requests, optionally using a custom base URL.
293    ///
294    /// # Errors
295    ///
296    /// Returns an error if the retry manager cannot be created.
297    #[allow(clippy::too_many_arguments)]
298    pub fn with_credentials(
299        api_key: String,
300        api_secret: String,
301        api_passphrase: String,
302        base_url: String,
303        timeout_secs: Option<u64>,
304        max_retries: Option<u32>,
305        retry_delay_ms: Option<u64>,
306        retry_delay_max_ms: Option<u64>,
307        is_demo: bool,
308        proxy_url: Option<String>,
309    ) -> Result<Self, OKXHttpError> {
310        let retry_config = RetryConfig {
311            max_retries: max_retries.unwrap_or(3),
312            initial_delay_ms: retry_delay_ms.unwrap_or(1000),
313            max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
314            backoff_factor: 2.0,
315            jitter_ms: 1000,
316            operation_timeout_ms: Some(60_000),
317            immediate_first: false,
318            max_elapsed_ms: Some(180_000),
319        };
320
321        let retry_manager = RetryManager::new(retry_config);
322
323        Ok(Self {
324            base_url,
325            client: HttpClient::new(
326                Self::default_headers(is_demo),
327                vec![],
328                Self::rate_limiter_quotas(),
329                Some(*OKX_REST_QUOTA),
330                timeout_secs,
331                proxy_url,
332            )
333            .map_err(|e| {
334                OKXHttpError::ValidationError(format!("Failed to create HTTP client: {e}"))
335            })?,
336            credential: Some(Credential::new(api_key, api_secret, api_passphrase)),
337            retry_manager,
338            cancellation_token: CancellationToken::new(),
339            is_demo,
340        })
341    }
342
343    /// Builds the default headers to include with each request (e.g., `User-Agent`).
344    fn default_headers(is_demo: bool) -> HashMap<String, String> {
345        let mut headers =
346            HashMap::from([(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())]);
347
348        if is_demo {
349            headers.insert("x-simulated-trading".to_string(), "1".to_string());
350        }
351
352        headers
353    }
354
355    /// Signs an OKX request with timestamp, API key, passphrase, and signature.
356    ///
357    /// # Errors
358    ///
359    /// Returns [`OKXHttpError::MissingCredentials`] if no credentials are set
360    /// but the request requires authentication.
361    fn sign_request(
362        &self,
363        method: &Method,
364        path: &str,
365        body: Option<&[u8]>,
366    ) -> Result<HashMap<String, String>, OKXHttpError> {
367        let credential = match self.credential.as_ref() {
368            Some(c) => c,
369            None => return Err(OKXHttpError::MissingCredentials),
370        };
371
372        let api_key = credential.api_key.to_string();
373        let api_passphrase = credential.api_passphrase.clone();
374
375        // OKX requires milliseconds in the timestamp (ISO 8601 with milliseconds)
376        let now = Utc::now();
377        let millis = now.timestamp_subsec_millis();
378        let timestamp = now.format("%Y-%m-%dT%H:%M:%S").to_string() + &format!(".{:03}Z", millis);
379        let signature = credential.sign_bytes(&timestamp, method.as_str(), path, body);
380
381        let mut headers = HashMap::new();
382        headers.insert("OK-ACCESS-KEY".to_string(), api_key);
383        headers.insert("OK-ACCESS-PASSPHRASE".to_string(), api_passphrase);
384        headers.insert("OK-ACCESS-TIMESTAMP".to_string(), timestamp);
385        headers.insert("OK-ACCESS-SIGN".to_string(), signature);
386
387        Ok(headers)
388    }
389
390    /// Sends an HTTP request to OKX and parses the response into `Vec<T>`.
391    ///
392    /// Internally, this method handles:
393    /// - Building the URL from `base_url` + `path`.
394    /// - Optionally signing the request.
395    /// - Deserializing JSON responses into typed models, or returning a [`OKXHttpError`].
396    /// - Retrying with exponential backoff on transient errors.
397    ///
398    /// # Errors
399    ///
400    /// Returns an error if:
401    /// - The HTTP request fails.
402    /// - Authentication is required but credentials are missing.
403    /// - The response cannot be deserialized into the expected type.
404    /// - The OKX API returns an error response.
405    async fn send_request<T: DeserializeOwned, P: Serialize>(
406        &self,
407        method: Method,
408        path: &str,
409        params: Option<&P>,
410        body: Option<Vec<u8>>,
411        authenticate: bool,
412    ) -> Result<Vec<T>, OKXHttpError> {
413        let url = format!("{}{path}", self.base_url);
414        let endpoint = path.to_string();
415        let method_clone = method.clone();
416        let body_clone = body.clone();
417
418        let operation = || {
419            let url = url.clone();
420            let method = method_clone.clone();
421            let body = body_clone.clone();
422            let endpoint = endpoint.clone();
423
424            async move {
425                // Serialize params to query string for signing (if needed)
426                let query_string = if let Some(p) = params {
427                    serde_urlencoded::to_string(p).map_err(|e| {
428                        OKXHttpError::JsonError(format!("Failed to serialize params: {e}"))
429                    })?
430                } else {
431                    String::new()
432                };
433
434                // Build full path with query string for signing
435                let full_path = if query_string.is_empty() {
436                    endpoint.clone()
437                } else {
438                    format!("{}?{}", endpoint, query_string)
439                };
440
441                let mut headers = if authenticate {
442                    self.sign_request(&method, &full_path, body.as_deref())?
443                } else {
444                    HashMap::new()
445                };
446
447                // Always set Content-Type header when body is present
448                if body.is_some() {
449                    headers.insert("Content-Type".to_string(), "application/json".to_string());
450                }
451
452                let rate_keys = Self::rate_limit_keys(endpoint.as_str())
453                    .into_iter()
454                    .map(|k| k.to_string())
455                    .collect();
456                let resp = self
457                    .client
458                    .request_with_params(
459                        method.clone(),
460                        url,
461                        params,
462                        Some(headers),
463                        body,
464                        None,
465                        Some(rate_keys),
466                    )
467                    .await?;
468
469                tracing::trace!("Response: {resp:?}");
470
471                if resp.status.is_success() {
472                    let okx_response: OKXResponse<T> =
473                        serde_json::from_slice(&resp.body).map_err(|e| {
474                            tracing::error!("Failed to deserialize OKXResponse: {e}");
475                            OKXHttpError::JsonError(e.to_string())
476                        })?;
477
478                    if okx_response.code != OKX_SUCCESS_CODE {
479                        return Err(OKXHttpError::OkxError {
480                            error_code: okx_response.code,
481                            message: okx_response.msg,
482                        });
483                    }
484
485                    Ok(okx_response.data)
486                } else {
487                    let error_body = String::from_utf8_lossy(&resp.body);
488                    if resp.status.as_u16() == StatusCode::NOT_FOUND.as_u16() {
489                        tracing::debug!("HTTP 404 with body: {error_body}");
490                    } else {
491                        tracing::error!(
492                            "HTTP error {} with body: {error_body}",
493                            resp.status.as_str()
494                        );
495                    }
496
497                    if let Ok(parsed_error) = serde_json::from_slice::<OKXResponse<T>>(&resp.body) {
498                        return Err(OKXHttpError::OkxError {
499                            error_code: parsed_error.code,
500                            message: parsed_error.msg,
501                        });
502                    }
503
504                    Err(OKXHttpError::UnexpectedStatus {
505                        status: StatusCode::from_u16(resp.status.as_u16()).unwrap(),
506                        body: error_body.to_string(),
507                    })
508                }
509            }
510        };
511
512        // Retry strategy based on OKX error responses and HTTP status codes:
513        //
514        // 1. Network errors: always retry (transient connection issues)
515        // 2. HTTP 5xx/429: server errors and rate limiting should be retried
516        // 3. OKX specific retryable error codes (defined in common::consts)
517        //
518        // Note: OKX returns many permanent errors which should NOT be retried
519        // (e.g., "Invalid instrument", "Insufficient balance", "Invalid API Key")
520        let should_retry = |error: &OKXHttpError| -> bool {
521            match error {
522                OKXHttpError::HttpClientError(_) => true,
523                OKXHttpError::UnexpectedStatus { status, .. } => {
524                    status.as_u16() >= 500 || status.as_u16() == 429
525                }
526                OKXHttpError::OkxError { error_code, .. } => should_retry_error_code(error_code),
527                _ => false,
528            }
529        };
530
531        let create_error = |msg: String| -> OKXHttpError {
532            if msg == "canceled" {
533                OKXHttpError::Canceled("Adapter disconnecting or shutting down".to_string())
534            } else {
535                OKXHttpError::ValidationError(msg)
536            }
537        };
538
539        self.retry_manager
540            .execute_with_retry_with_cancel(
541                endpoint.as_str(),
542                operation,
543                should_retry,
544                create_error,
545                &self.cancellation_token,
546            )
547            .await
548    }
549
550    /// Sets the position mode for an account.
551    ///
552    /// # Errors
553    ///
554    /// Returns an error if JSON serialization of `params` fails, if the HTTP
555    /// request fails, or if the response body cannot be deserialized.
556    ///
557    /// # References
558    ///
559    /// <https://www.okx.com/docs-v5/en/#trading-account-rest-api-set-position-mode>
560    pub async fn set_position_mode(
561        &self,
562        params: SetPositionModeParams,
563    ) -> Result<Vec<serde_json::Value>, OKXHttpError> {
564        let path = "/api/v5/account/set-position-mode";
565        let body = serde_json::to_vec(&params)?;
566        self.send_request::<_, ()>(Method::POST, path, None, Some(body), true)
567            .await
568    }
569
570    /// Requests position tiers information, maximum leverage depends on your borrowings and margin ratio.
571    ///
572    /// # Errors
573    ///
574    /// Returns an error if the HTTP request fails, authentication is rejected
575    /// or the response cannot be deserialized.
576    ///
577    /// # References
578    ///
579    /// <https://www.okx.com/docs-v5/en/#public-data-rest-api-get-position-tiers>
580    pub async fn get_position_tiers(
581        &self,
582        params: GetPositionTiersParams,
583    ) -> Result<Vec<OKXPositionTier>, OKXHttpError> {
584        self.send_request(
585            Method::GET,
586            "/api/v5/public/position-tiers",
587            Some(&params),
588            None,
589            false,
590        )
591        .await
592    }
593
594    /// Requests a list of instruments with open contracts.
595    ///
596    /// # Errors
597    ///
598    /// Returns an error if JSON serialization of `params` fails, if the HTTP
599    /// request fails, or if the response body cannot be deserialized.
600    ///
601    /// # References
602    ///
603    /// <https://www.okx.com/docs-v5/en/#public-data-rest-api-get-instruments>
604    pub async fn get_instruments(
605        &self,
606        params: GetInstrumentsParams,
607    ) -> Result<Vec<OKXInstrument>, OKXHttpError> {
608        self.send_request(
609            Method::GET,
610            "/api/v5/public/instruments",
611            Some(&params),
612            None,
613            false,
614        )
615        .await
616    }
617
618    /// Requests the current server time from OKX.
619    ///
620    /// Retrieves the OKX system time in Unix timestamp (milliseconds). This is useful for
621    /// synchronizing local clocks with the exchange server and logging time drift.
622    ///
623    /// # Errors
624    ///
625    /// Returns an error if the HTTP request fails or if the response body
626    /// cannot be parsed into [`OKXServerTime`].
627    ///
628    /// # References
629    ///
630    /// <https://www.okx.com/docs-v5/en/#public-data-rest-api-get-system-time>
631    pub async fn get_server_time(&self) -> Result<u64, OKXHttpError> {
632        let response: Vec<OKXServerTime> = self
633            .send_request::<_, ()>(Method::GET, "/api/v5/public/time", None, None, false)
634            .await?;
635        response
636            .first()
637            .map(|t| t.ts)
638            .ok_or_else(|| OKXHttpError::JsonError("Empty server time response".to_string()))
639    }
640
641    /// Requests a mark price.
642    ///
643    /// We set the mark price based on the SPOT index and at a reasonable basis to prevent individual
644    /// users from manipulating the market and causing the contract price to fluctuate.
645    ///
646    /// # Errors
647    ///
648    /// Returns an error if the HTTP request fails or if the response body
649    /// cannot be parsed into [`OKXMarkPrice`].
650    ///
651    /// # References
652    ///
653    /// <https://www.okx.com/docs-v5/en/#public-data-rest-api-get-mark-price>
654    pub async fn get_mark_price(
655        &self,
656        params: GetMarkPriceParams,
657    ) -> Result<Vec<OKXMarkPrice>, OKXHttpError> {
658        self.send_request(
659            Method::GET,
660            "/api/v5/public/mark-price",
661            Some(&params),
662            None,
663            false,
664        )
665        .await
666    }
667
668    /// Requests the latest index price.
669    ///
670    /// # Errors
671    ///
672    /// Returns an error if the operation fails.
673    ///
674    /// # References
675    ///
676    /// <https://www.okx.com/docs-v5/en/#public-data-rest-api-get-index-tickers>
677    pub async fn get_index_tickers(
678        &self,
679        params: GetIndexTickerParams,
680    ) -> Result<Vec<OKXIndexTicker>, OKXHttpError> {
681        self.send_request(
682            Method::GET,
683            "/api/v5/market/index-tickers",
684            Some(&params),
685            None,
686            false,
687        )
688        .await
689    }
690
691    /// Requests trades history.
692    ///
693    /// # Errors
694    ///
695    /// Returns an error if the operation fails.
696    ///
697    /// # References
698    ///
699    /// <https://www.okx.com/docs-v5/en/#order-book-trading-market-data-get-trades-history>
700    pub async fn get_history_trades(
701        &self,
702        params: GetTradesParams,
703    ) -> Result<Vec<OKXTrade>, OKXHttpError> {
704        self.send_request(
705            Method::GET,
706            "/api/v5/market/history-trades",
707            Some(&params),
708            None,
709            false,
710        )
711        .await
712    }
713
714    /// Requests recent candlestick data.
715    ///
716    /// # Errors
717    ///
718    /// Returns an error if the operation fails.
719    ///
720    /// # References
721    ///
722    /// <https://www.okx.com/docs-v5/en/#order-book-trading-market-data-get-candlesticks>
723    pub async fn get_candles(
724        &self,
725        params: GetCandlesticksParams,
726    ) -> Result<Vec<OKXCandlestick>, OKXHttpError> {
727        self.send_request(
728            Method::GET,
729            "/api/v5/market/candles",
730            Some(&params),
731            None,
732            false,
733        )
734        .await
735    }
736
737    /// Requests historical candlestick data.
738    ///
739    /// # Errors
740    ///
741    /// Returns an error if the operation fails.
742    ///
743    /// # References
744    ///
745    /// <https://www.okx.com/docs-v5/en/#order-book-trading-market-data-get-candlesticks-history>
746    pub async fn get_history_candles(
747        &self,
748        params: GetCandlesticksParams,
749    ) -> Result<Vec<OKXCandlestick>, OKXHttpError> {
750        self.send_request(
751            Method::GET,
752            "/api/v5/market/history-candles",
753            Some(&params),
754            None,
755            false,
756        )
757        .await
758    }
759
760    /// Requests a list of assets (with non-zero balance), remaining balance, and available amount
761    /// in the trading account.
762    ///
763    /// # Errors
764    ///
765    /// Returns an error if the operation fails.
766    ///
767    /// # References
768    ///
769    /// <https://www.okx.com/docs-v5/en/#trading-account-rest-api-get-balance>
770    pub async fn get_balance(&self) -> Result<Vec<OKXAccount>, OKXHttpError> {
771        let path = "/api/v5/account/balance";
772        self.send_request::<_, ()>(Method::GET, path, None, None, true)
773            .await
774    }
775
776    /// Requests fee rates for the account.
777    ///
778    /// Returns fee rates for the specified instrument type and the user's VIP level.
779    ///
780    /// # Errors
781    ///
782    /// Returns an error if the operation fails.
783    ///
784    /// # References
785    ///
786    /// <https://www.okx.com/docs-v5/en/#trading-account-rest-api-get-fee-rates>
787    pub async fn get_trade_fee(
788        &self,
789        params: GetTradeFeeParams,
790    ) -> Result<Vec<OKXFeeRate>, OKXHttpError> {
791        self.send_request(
792            Method::GET,
793            "/api/v5/account/trade-fee",
794            Some(&params),
795            None,
796            true,
797        )
798        .await
799    }
800
801    /// Retrieves a single order’s details.
802    ///
803    /// # Errors
804    ///
805    /// Returns an error if the operation fails.
806    ///
807    /// # References
808    ///
809    /// <https://www.okx.com/docs-v5/en/#order-book-trading-trade-get-order>
810    pub async fn get_order(
811        &self,
812        params: GetOrderParams,
813    ) -> Result<Vec<OKXOrderHistory>, OKXHttpError> {
814        self.send_request(
815            Method::GET,
816            "/api/v5/trade/order",
817            Some(&params),
818            None,
819            true,
820        )
821        .await
822    }
823
824    /// Requests order list (pending orders).
825    ///
826    /// # Errors
827    ///
828    /// Returns an error if the operation fails.
829    ///
830    /// # References
831    ///
832    /// <https://www.okx.com/docs-v5/en/#order-book-trading-trade-get-order-list>
833    pub async fn get_orders_pending(
834        &self,
835        params: GetOrderListParams,
836    ) -> Result<Vec<OKXOrderHistory>, OKXHttpError> {
837        self.send_request(
838            Method::GET,
839            "/api/v5/trade/orders-pending",
840            Some(&params),
841            None,
842            true,
843        )
844        .await
845    }
846
847    /// Requests historical order records.
848    ///
849    /// # Errors
850    ///
851    /// Returns an error if the operation fails.
852    ///
853    /// # References
854    ///
855    /// <https://www.okx.com/docs-v5/en/#order-book-trading-trade-get-orders-history>
856    pub async fn get_orders_history(
857        &self,
858        params: GetOrderHistoryParams,
859    ) -> Result<Vec<OKXOrderHistory>, OKXHttpError> {
860        self.send_request(
861            Method::GET,
862            "/api/v5/trade/orders-history",
863            Some(&params),
864            None,
865            true,
866        )
867        .await
868    }
869
870    /// Requests pending algo orders.
871    ///
872    /// # Errors
873    ///
874    /// Returns an error if the operation fails.
875    pub async fn get_order_algo_pending(
876        &self,
877        params: GetAlgoOrdersParams,
878    ) -> Result<Vec<OKXOrderAlgo>, OKXHttpError> {
879        self.send_request(
880            Method::GET,
881            "/api/v5/trade/order-algo-pending",
882            Some(&params),
883            None,
884            true,
885        )
886        .await
887    }
888
889    /// Requests historical algo orders.
890    ///
891    /// # Errors
892    ///
893    /// Returns an error if the operation fails.
894    pub async fn get_order_algo_history(
895        &self,
896        params: GetAlgoOrdersParams,
897    ) -> Result<Vec<OKXOrderAlgo>, OKXHttpError> {
898        self.send_request(
899            Method::GET,
900            "/api/v5/trade/order-algo-history",
901            Some(&params),
902            None,
903            true,
904        )
905        .await
906    }
907
908    /// Requests transaction details (fills) for the given parameters.
909    ///
910    /// # Errors
911    ///
912    /// Returns an error if the operation fails.
913    ///
914    /// # References
915    ///
916    /// <https://www.okx.com/docs-v5/en/#order-book-trading-trade-get-transaction-details-last-3-days>
917    pub async fn get_fills(
918        &self,
919        params: GetTransactionDetailsParams,
920    ) -> Result<Vec<OKXTransactionDetail>, OKXHttpError> {
921        self.send_request(
922            Method::GET,
923            "/api/v5/trade/fills",
924            Some(&params),
925            None,
926            true,
927        )
928        .await
929    }
930
931    /// Requests information on your positions. When the account is in net mode, net positions will
932    /// be displayed, and when the account is in long/short mode, long or short positions will be
933    /// displayed. Returns in reverse chronological order using ctime.
934    ///
935    /// # Errors
936    ///
937    /// Returns an error if the operation fails.
938    ///
939    /// # References
940    ///
941    /// <https://www.okx.com/docs-v5/en/#trading-account-rest-api-get-positions>
942    pub async fn get_positions(
943        &self,
944        params: GetPositionsParams,
945    ) -> Result<Vec<OKXPosition>, OKXHttpError> {
946        self.send_request(
947            Method::GET,
948            "/api/v5/account/positions",
949            Some(&params),
950            None,
951            true,
952        )
953        .await
954    }
955
956    /// Requests closed or historical position data.
957    ///
958    /// # Errors
959    ///
960    /// Returns an error if the operation fails.
961    ///
962    /// # References
963    ///
964    /// <https://www.okx.com/docs-v5/en/#trading-account-rest-api-get-positions-history>
965    pub async fn get_positions_history(
966        &self,
967        params: GetPositionsHistoryParams,
968    ) -> Result<Vec<OKXPositionHistory>, OKXHttpError> {
969        self.send_request(
970            Method::GET,
971            "/api/v5/account/positions-history",
972            Some(&params),
973            None,
974            true,
975        )
976        .await
977    }
978}
979
980/// Provides a higher-level HTTP client for the [OKX](https://okx.com) REST API.
981///
982/// This client wraps the underlying `OKXHttpInnerClient` to handle conversions
983/// into the Nautilus domain model.
984#[derive(Debug)]
985#[cfg_attr(
986    feature = "python",
987    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.adapters")
988)]
989pub struct OKXHttpClient {
990    pub(crate) inner: Arc<OKXRawHttpClient>,
991    pub(crate) instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
992    cache_initialized: AtomicBool,
993}
994
995impl Clone for OKXHttpClient {
996    fn clone(&self) -> Self {
997        let cache_initialized = AtomicBool::new(false);
998
999        let is_initialized = self.cache_initialized.load(Ordering::Acquire);
1000        if is_initialized {
1001            cache_initialized.store(true, Ordering::Release);
1002        }
1003
1004        Self {
1005            inner: self.inner.clone(),
1006            instruments_cache: self.instruments_cache.clone(),
1007            cache_initialized,
1008        }
1009    }
1010}
1011
1012impl Default for OKXHttpClient {
1013    fn default() -> Self {
1014        Self::new(None, Some(60), None, None, None, false, None)
1015            .expect("Failed to create default OKXHttpClient")
1016    }
1017}
1018
1019impl OKXHttpClient {
1020    /// Creates a new [`OKXHttpClient`] using the default OKX HTTP URL,
1021    /// optionally overridden with a custom base url.
1022    ///
1023    /// This version of the client has **no credentials**, so it can only
1024    /// call publicly accessible endpoints.
1025    ///
1026    /// # Errors
1027    ///
1028    /// Returns an error if the retry manager cannot be created.
1029    pub fn new(
1030        base_url: Option<String>,
1031        timeout_secs: Option<u64>,
1032        max_retries: Option<u32>,
1033        retry_delay_ms: Option<u64>,
1034        retry_delay_max_ms: Option<u64>,
1035        is_demo: bool,
1036        proxy_url: Option<String>,
1037    ) -> anyhow::Result<Self> {
1038        Ok(Self {
1039            inner: Arc::new(OKXRawHttpClient::new(
1040                base_url,
1041                timeout_secs,
1042                max_retries,
1043                retry_delay_ms,
1044                retry_delay_max_ms,
1045                is_demo,
1046                proxy_url,
1047            )?),
1048            instruments_cache: Arc::new(DashMap::new()),
1049            cache_initialized: AtomicBool::new(false),
1050        })
1051    }
1052
1053    /// Generates a timestamp for initialization.
1054    fn generate_ts_init(&self) -> UnixNanos {
1055        get_atomic_clock_realtime().get_time_ns()
1056    }
1057
1058    /// Creates a new authenticated [`OKXHttpClient`] using environment variables and
1059    /// the default OKX HTTP base url.
1060    ///
1061    /// # Errors
1062    ///
1063    /// Returns an error if the operation fails.
1064    pub fn from_env() -> anyhow::Result<Self> {
1065        Self::with_credentials(None, None, None, None, None, None, None, None, false, None)
1066    }
1067
1068    /// Creates a new [`OKXHttpClient`] configured with credentials
1069    /// for authenticated requests, optionally using a custom base url.
1070    ///
1071    /// # Errors
1072    ///
1073    /// Returns an error if the operation fails.
1074    #[allow(clippy::too_many_arguments)]
1075    pub fn with_credentials(
1076        api_key: Option<String>,
1077        api_secret: Option<String>,
1078        api_passphrase: Option<String>,
1079        base_url: Option<String>,
1080        timeout_secs: Option<u64>,
1081        max_retries: Option<u32>,
1082        retry_delay_ms: Option<u64>,
1083        retry_delay_max_ms: Option<u64>,
1084        is_demo: bool,
1085        proxy_url: Option<String>,
1086    ) -> anyhow::Result<Self> {
1087        let api_key = get_or_env_var(api_key, "OKX_API_KEY")?;
1088        let api_secret = get_or_env_var(api_secret, "OKX_API_SECRET")?;
1089        let api_passphrase = get_or_env_var(api_passphrase, "OKX_API_PASSPHRASE")?;
1090        let base_url = base_url.unwrap_or(OKX_HTTP_URL.to_string());
1091
1092        Ok(Self {
1093            inner: Arc::new(OKXRawHttpClient::with_credentials(
1094                api_key,
1095                api_secret,
1096                api_passphrase,
1097                base_url,
1098                timeout_secs,
1099                max_retries,
1100                retry_delay_ms,
1101                retry_delay_max_ms,
1102                is_demo,
1103                proxy_url,
1104            )?),
1105            instruments_cache: Arc::new(DashMap::new()),
1106            cache_initialized: AtomicBool::new(false),
1107        })
1108    }
1109
1110    /// Retrieves an instrument from the cache.
1111    ///
1112    /// # Errors
1113    ///
1114    /// Returns an error if the instrument is not found in the cache.
1115    fn instrument_from_cache(&self, symbol: Ustr) -> anyhow::Result<InstrumentAny> {
1116        self.instruments_cache
1117            .get(&symbol)
1118            .map(|entry| entry.value().clone())
1119            .ok_or_else(|| anyhow::anyhow!("Instrument {symbol} not in cache"))
1120    }
1121
1122    /// Cancel all pending HTTP requests.
1123    pub fn cancel_all_requests(&self) {
1124        self.inner.cancel_all_requests();
1125    }
1126
1127    /// Get the cancellation token for this client.
1128    pub fn cancellation_token(&self) -> &CancellationToken {
1129        self.inner.cancellation_token()
1130    }
1131
1132    /// Returns the base url being used by the client.
1133    pub fn base_url(&self) -> &str {
1134        self.inner.base_url.as_str()
1135    }
1136
1137    /// Returns the public API key being used by the client.
1138    pub fn api_key(&self) -> Option<&str> {
1139        self.inner.credential.as_ref().map(|c| c.api_key.as_str())
1140    }
1141
1142    /// Returns a masked version of the API key for logging purposes.
1143    #[must_use]
1144    pub fn api_key_masked(&self) -> Option<String> {
1145        self.inner.credential.as_ref().map(|c| c.api_key_masked())
1146    }
1147
1148    /// Returns whether the client is configured for demo trading.
1149    #[must_use]
1150    pub fn is_demo(&self) -> bool {
1151        self.inner.is_demo
1152    }
1153
1154    /// Requests the current server time from OKX.
1155    ///
1156    /// Returns the OKX system time as a Unix timestamp in milliseconds.
1157    ///
1158    /// # Errors
1159    ///
1160    /// Returns an error if the HTTP request fails or if the response cannot be parsed.
1161    pub async fn get_server_time(&self) -> Result<u64, OKXHttpError> {
1162        self.inner.get_server_time().await
1163    }
1164
1165    /// Checks if the client is initialized.
1166    ///
1167    /// The client is considered initialized if any instruments have been cached from the venue.
1168    #[must_use]
1169    pub fn is_initialized(&self) -> bool {
1170        self.cache_initialized.load(Ordering::Acquire)
1171    }
1172
1173    /// Returns a snapshot of all instrument symbols currently held in the
1174    /// internal cache.
1175    #[must_use]
1176    pub fn get_cached_symbols(&self) -> Vec<String> {
1177        self.instruments_cache
1178            .iter()
1179            .map(|entry| entry.key().to_string())
1180            .collect()
1181    }
1182
1183    /// Caches multiple instruments.
1184    ///
1185    /// Any existing instruments with the same symbols will be replaced.
1186    pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
1187        for inst in instruments {
1188            self.instruments_cache
1189                .insert(inst.raw_symbol().inner(), inst);
1190        }
1191        self.cache_initialized.store(true, Ordering::Release);
1192    }
1193
1194    /// Caches a single instrument.
1195    ///
1196    /// Any existing instrument with the same symbol will be replaced.
1197    pub fn cache_instrument(&self, instrument: InstrumentAny) {
1198        self.instruments_cache
1199            .insert(instrument.raw_symbol().inner(), instrument);
1200        self.cache_initialized.store(true, Ordering::Release);
1201    }
1202
1203    /// Gets an instrument from the cache by symbol.
1204    pub fn get_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
1205        self.instruments_cache
1206            .get(symbol)
1207            .map(|entry| entry.value().clone())
1208    }
1209
1210    /// Requests the account state for the `account_id` from OKX.
1211    ///
1212    /// # Errors
1213    ///
1214    /// Returns an error if the HTTP request fails or no account state is returned.
1215    pub async fn request_account_state(
1216        &self,
1217        account_id: AccountId,
1218    ) -> anyhow::Result<AccountState> {
1219        let resp = self
1220            .inner
1221            .get_balance()
1222            .await
1223            .map_err(|e| anyhow::anyhow!(e))?;
1224
1225        let ts_init = self.generate_ts_init();
1226        let raw = resp
1227            .first()
1228            .ok_or_else(|| anyhow::anyhow!("No account state returned from OKX"))?;
1229        let account_state = parse_account_state(raw, account_id, ts_init)?;
1230
1231        Ok(account_state)
1232    }
1233
1234    /// Sets the position mode for the account.
1235    ///
1236    /// Defaults to NetMode if no position mode is provided.
1237    ///
1238    /// # Errors
1239    ///
1240    /// Returns an error if the HTTP request fails or the position mode cannot be set.
1241    ///
1242    /// # Note
1243    ///
1244    /// This endpoint only works for accounts with derivatives trading enabled.
1245    /// If the account only has spot trading, this will return an error.
1246    pub async fn set_position_mode(&self, position_mode: OKXPositionMode) -> anyhow::Result<()> {
1247        let mut params = SetPositionModeParamsBuilder::default();
1248        params.pos_mode(position_mode);
1249        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1250
1251        match self.inner.set_position_mode(params).await {
1252            Ok(_) => Ok(()),
1253            Err(e) => {
1254                if let OKXHttpError::OkxError {
1255                    error_code,
1256                    message,
1257                } = &e
1258                    && error_code == "50115"
1259                {
1260                    tracing::warn!(
1261                        "Account does not support position mode setting (derivatives trading not enabled): {message}"
1262                    );
1263                    return Ok(()); // Gracefully handle this case
1264                }
1265                anyhow::bail!(e)
1266            }
1267        }
1268    }
1269
1270    /// Requests all instruments for the `instrument_type` from OKX.
1271    ///
1272    /// # Errors
1273    ///
1274    /// Returns an error if the HTTP request fails or instrument parsing fails.
1275    pub async fn request_instruments(
1276        &self,
1277        instrument_type: OKXInstrumentType,
1278        instrument_family: Option<String>,
1279    ) -> anyhow::Result<Vec<InstrumentAny>> {
1280        let mut params = GetInstrumentsParamsBuilder::default();
1281        params.inst_type(instrument_type);
1282
1283        if let Some(family) = instrument_family.clone() {
1284            params.inst_family(family);
1285        }
1286
1287        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1288
1289        let resp = self
1290            .inner
1291            .get_instruments(params)
1292            .await
1293            .map_err(|e| anyhow::anyhow!(e))?;
1294
1295        let fee_rate_opt = {
1296            let fee_params = GetTradeFeeParams {
1297                inst_type: instrument_type,
1298                uly: None,
1299                inst_family: instrument_family,
1300            };
1301
1302            match self.inner.get_trade_fee(fee_params).await {
1303                Ok(rates) => rates.into_iter().next(),
1304                Err(OKXHttpError::MissingCredentials) => {
1305                    tracing::debug!("Missing credentials for fee rates, using None");
1306                    None
1307                }
1308                Err(e) => {
1309                    tracing::warn!("Failed to fetch fee rates for {instrument_type}: {e}");
1310                    None
1311                }
1312            }
1313        };
1314
1315        let ts_init = self.generate_ts_init();
1316
1317        let mut instruments: Vec<InstrumentAny> = Vec::new();
1318        for inst in &resp {
1319            // Skip pre-open instruments which have incomplete/empty field values
1320            // Keep suspended instruments as they have valid metadata and may return to live
1321            if inst.state == OKXInstrumentStatus::Preopen {
1322                continue;
1323            }
1324
1325            // Determine which fee fields to use based on contract type
1326            let (maker_fee, taker_fee) = if let Some(ref fee_rate) = fee_rate_opt {
1327                let is_usdt_margined = inst.ct_type == OKXContractType::Linear;
1328                let (maker_str, taker_str) = if is_usdt_margined {
1329                    (&fee_rate.maker_u, &fee_rate.taker_u)
1330                } else {
1331                    (&fee_rate.maker, &fee_rate.taker)
1332                };
1333
1334                let maker = if !maker_str.is_empty() {
1335                    Decimal::from_str(maker_str).ok()
1336                } else {
1337                    None
1338                };
1339                let taker = if !taker_str.is_empty() {
1340                    Decimal::from_str(taker_str).ok()
1341                } else {
1342                    None
1343                };
1344
1345                (maker, taker)
1346            } else {
1347                (None, None)
1348            };
1349
1350            match parse_instrument_any(inst, None, None, maker_fee, taker_fee, ts_init) {
1351                Ok(Some(instrument_any)) => {
1352                    instruments.push(instrument_any);
1353                }
1354                Ok(None) => {
1355                    // Unsupported instrument type, skip silently
1356                }
1357                Err(e) => {
1358                    tracing::warn!("Failed to parse instrument {}: {e}", inst.inst_id);
1359                }
1360            }
1361        }
1362
1363        Ok(instruments)
1364    }
1365
1366    /// Requests a single instrument by `instrument_id` from OKX.
1367    ///
1368    /// Fetches the instrument from the API, caches it, and returns it.
1369    ///
1370    /// # Errors
1371    ///
1372    /// This function will return an error if:
1373    /// - The API request fails.
1374    /// - The instrument is not found.
1375    /// - Failed to parse instrument data.
1376    pub async fn request_instrument(
1377        &self,
1378        instrument_id: InstrumentId,
1379    ) -> anyhow::Result<InstrumentAny> {
1380        let symbol = instrument_id.symbol.as_str();
1381        let instrument_type = okx_instrument_type_from_symbol(symbol);
1382
1383        let mut params = GetInstrumentsParamsBuilder::default();
1384        params.inst_type(instrument_type);
1385        params.inst_id(symbol);
1386
1387        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1388
1389        let resp = self
1390            .inner
1391            .get_instruments(params)
1392            .await
1393            .map_err(|e| anyhow::anyhow!(e))?;
1394
1395        let raw_inst = resp
1396            .first()
1397            .ok_or_else(|| anyhow::anyhow!("Instrument {symbol} not found"))?;
1398
1399        // Skip pre-open instruments which have incomplete/empty field values
1400        if raw_inst.state == OKXInstrumentStatus::Preopen {
1401            anyhow::bail!("Instrument {symbol} is in pre-open state");
1402        }
1403
1404        let fee_rate_opt = {
1405            let fee_params = GetTradeFeeParams {
1406                inst_type: instrument_type,
1407                uly: None,
1408                inst_family: None,
1409            };
1410
1411            match self.inner.get_trade_fee(fee_params).await {
1412                Ok(rates) => rates.into_iter().next(),
1413                Err(OKXHttpError::MissingCredentials) => {
1414                    tracing::debug!("Missing credentials for fee rates, using None");
1415                    None
1416                }
1417                Err(e) => {
1418                    tracing::warn!("Failed to fetch fee rates for {symbol}: {e}");
1419                    None
1420                }
1421            }
1422        };
1423
1424        let (maker_fee, taker_fee) = if let Some(ref fee_rate) = fee_rate_opt {
1425            let is_usdt_margined = raw_inst.ct_type == OKXContractType::Linear;
1426            let (maker_str, taker_str) = if is_usdt_margined {
1427                (&fee_rate.maker_u, &fee_rate.taker_u)
1428            } else {
1429                (&fee_rate.maker, &fee_rate.taker)
1430            };
1431
1432            let maker = if !maker_str.is_empty() {
1433                Decimal::from_str(maker_str).ok()
1434            } else {
1435                None
1436            };
1437            let taker = if !taker_str.is_empty() {
1438                Decimal::from_str(taker_str).ok()
1439            } else {
1440                None
1441            };
1442
1443            (maker, taker)
1444        } else {
1445            (None, None)
1446        };
1447
1448        let ts_init = self.generate_ts_init();
1449        let instrument = parse_instrument_any(raw_inst, None, None, maker_fee, taker_fee, ts_init)?
1450            .ok_or_else(|| anyhow::anyhow!("Unsupported instrument type for {symbol}"))?;
1451
1452        self.cache_instrument(instrument.clone());
1453
1454        Ok(instrument)
1455    }
1456
1457    /// Requests the latest mark price for the `instrument_type` from OKX.
1458    ///
1459    /// # Errors
1460    ///
1461    /// Returns an error if the HTTP request fails or no mark price is returned.
1462    pub async fn request_mark_price(
1463        &self,
1464        instrument_id: InstrumentId,
1465    ) -> anyhow::Result<MarkPriceUpdate> {
1466        let mut params = GetMarkPriceParamsBuilder::default();
1467        params.inst_id(instrument_id.symbol.inner());
1468        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1469
1470        let resp = self
1471            .inner
1472            .get_mark_price(params)
1473            .await
1474            .map_err(|e| anyhow::anyhow!(e))?;
1475
1476        let raw = resp
1477            .first()
1478            .ok_or_else(|| anyhow::anyhow!("No mark price returned from OKX"))?;
1479        let inst = self.instrument_from_cache(instrument_id.symbol.inner())?;
1480        let ts_init = self.generate_ts_init();
1481
1482        let mark_price =
1483            parse_mark_price_update(raw, instrument_id, inst.price_precision(), ts_init)
1484                .map_err(|e| anyhow::anyhow!(e))?;
1485        Ok(mark_price)
1486    }
1487
1488    /// Requests the latest index price for the `instrument_id` from OKX.
1489    ///
1490    /// # Errors
1491    ///
1492    /// Returns an error if the HTTP request fails or no index price is returned.
1493    pub async fn request_index_price(
1494        &self,
1495        instrument_id: InstrumentId,
1496    ) -> anyhow::Result<IndexPriceUpdate> {
1497        let mut params = GetIndexTickerParamsBuilder::default();
1498        params.inst_id(instrument_id.symbol.inner());
1499        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1500
1501        let resp = self
1502            .inner
1503            .get_index_tickers(params)
1504            .await
1505            .map_err(|e| anyhow::anyhow!(e))?;
1506
1507        let raw = resp
1508            .first()
1509            .ok_or_else(|| anyhow::anyhow!("No index price returned from OKX"))?;
1510        let inst = self.instrument_from_cache(instrument_id.symbol.inner())?;
1511        let ts_init = self.generate_ts_init();
1512
1513        let index_price =
1514            parse_index_price_update(raw, instrument_id, inst.price_precision(), ts_init)
1515                .map_err(|e| anyhow::anyhow!(e))?;
1516        Ok(index_price)
1517    }
1518
1519    /// Requests trades for the `instrument_id` and `start` -> `end` time range.
1520    ///
1521    /// # Errors
1522    ///
1523    /// Returns an error if the HTTP request fails or trade parsing fails.
1524    ///
1525    /// # Panics
1526    ///
1527    /// Panics if the API returns an empty response when debug logging is enabled.
1528    pub async fn request_trades(
1529        &self,
1530        instrument_id: InstrumentId,
1531        start: Option<DateTime<Utc>>,
1532        end: Option<DateTime<Utc>>,
1533        limit: Option<u32>,
1534    ) -> anyhow::Result<Vec<TradeTick>> {
1535        const OKX_TRADES_MAX_LIMIT: u32 = 100;
1536
1537        let limit = if limit == Some(0) { None } else { limit };
1538
1539        if let (Some(s), Some(e)) = (start, end) {
1540            anyhow::ensure!(s < e, "Invalid time range: start={s:?} end={e:?}");
1541        }
1542
1543        let now = Utc::now();
1544
1545        if let Some(s) = start
1546            && s > now
1547        {
1548            return Ok(Vec::new());
1549        }
1550
1551        let end = if let Some(e) = end
1552            && e > now
1553        {
1554            Some(now)
1555        } else {
1556            end
1557        };
1558
1559        #[derive(Clone, Copy, Debug, PartialEq, Eq)]
1560        enum Mode {
1561            Latest,
1562            Backward,
1563            Range,
1564        }
1565
1566        let mode = match (start, end) {
1567            (None, None) => Mode::Latest,
1568            (Some(_), None) => Mode::Backward,
1569            (None, Some(_)) => Mode::Backward,
1570            (Some(_), Some(_)) => Mode::Range,
1571        };
1572
1573        let start_ms = start.map(|s| s.timestamp_millis());
1574        let end_ms = end.map(|e| e.timestamp_millis());
1575
1576        let ts_init = self.generate_ts_init();
1577        let inst = self.instrument_from_cache(instrument_id.symbol.inner())?;
1578
1579        // Historical pagination walks backwards using trade IDs, OKX does not honour timestamps for
1580        // standalone `before` requests (type=2)
1581        if matches!(mode, Mode::Backward | Mode::Range) {
1582            let mut before_trade_id: Option<String> = None;
1583            let mut pages = 0usize;
1584            let mut page_results: Vec<Vec<TradeTick>> = Vec::new();
1585            let mut seen_trades: std::collections::HashSet<(String, i64)> =
1586                std::collections::HashSet::new();
1587            let mut unique_count = 0usize;
1588            let mut consecutive_empty_pages = 0usize;
1589            const MAX_PAGES: usize = 500;
1590            const MAX_CONSECUTIVE_EMPTY: usize = 3;
1591
1592            // Only apply default limit when there's no start boundary
1593            // (start provides a natural stopping point, end alone allows infinite backward pagination)
1594            let effective_limit = if start.is_some() {
1595                limit.unwrap_or(u32::MAX)
1596            } else {
1597                limit.unwrap_or(OKX_TRADES_MAX_LIMIT)
1598            };
1599
1600            tracing::debug!(
1601                "Starting trades pagination: mode={:?}, start={:?}, end={:?}, limit={:?}, effective_limit={}",
1602                mode,
1603                start,
1604                end,
1605                limit,
1606                effective_limit
1607            );
1608
1609            loop {
1610                if pages >= MAX_PAGES {
1611                    tracing::warn!("Hit MAX_PAGES limit of {}", MAX_PAGES);
1612                    break;
1613                }
1614
1615                if effective_limit < u32::MAX && unique_count >= effective_limit as usize {
1616                    tracing::debug!("Reached effective limit: unique_count={}", unique_count);
1617                    break;
1618                }
1619
1620                let remaining = (effective_limit as usize).saturating_sub(unique_count);
1621                let page_cap = remaining.min(OKX_TRADES_MAX_LIMIT as usize) as u32;
1622
1623                tracing::debug!(
1624                    "Requesting page {}: before_id={:?}, page_cap={}, unique_count={}",
1625                    pages + 1,
1626                    before_trade_id,
1627                    page_cap,
1628                    unique_count
1629                );
1630
1631                let mut params_builder = GetTradesParamsBuilder::default();
1632                params_builder
1633                    .inst_id(instrument_id.symbol.inner())
1634                    .limit(page_cap)
1635                    .pagination_type(1);
1636
1637                // Use 'after' to get older trades (OKX API: after=cursor means < cursor)
1638                if let Some(ref before_id) = before_trade_id {
1639                    params_builder.after(before_id.clone());
1640                }
1641
1642                let params = params_builder.build().map_err(anyhow::Error::new)?;
1643                let raw = self
1644                    .inner
1645                    .get_history_trades(params)
1646                    .await
1647                    .map_err(anyhow::Error::new)?;
1648
1649                tracing::debug!("Received {} raw trades from API", raw.len());
1650
1651                if !raw.is_empty() {
1652                    let first_id = &raw.first().unwrap().trade_id;
1653                    let last_id = &raw.last().unwrap().trade_id;
1654                    tracing::debug!(
1655                        "Raw response trade ID range: first={} (newest), last={} (oldest)",
1656                        first_id,
1657                        last_id
1658                    );
1659                }
1660
1661                if raw.is_empty() {
1662                    tracing::debug!("API returned empty page, stopping pagination");
1663                    break;
1664                }
1665
1666                pages += 1;
1667
1668                let mut page_trades: Vec<TradeTick> = Vec::with_capacity(raw.len());
1669                let mut hit_start_boundary = false;
1670                let mut filtered_out = 0usize;
1671                let mut duplicates = 0usize;
1672
1673                for r in &raw {
1674                    match parse_trade_tick(
1675                        r,
1676                        instrument_id,
1677                        inst.price_precision(),
1678                        inst.size_precision(),
1679                        ts_init,
1680                    ) {
1681                        Ok(trade) => {
1682                            let ts_ms = trade.ts_event.as_i64() / 1_000_000;
1683
1684                            if let Some(e_ms) = end_ms
1685                                && ts_ms > e_ms
1686                            {
1687                                filtered_out += 1;
1688                                continue;
1689                            }
1690
1691                            if let Some(s_ms) = start_ms
1692                                && ts_ms < s_ms
1693                            {
1694                                hit_start_boundary = true;
1695                                filtered_out += 1;
1696                                break;
1697                            }
1698
1699                            let trade_key = (trade.trade_id.to_string(), trade.ts_event.as_i64());
1700                            if seen_trades.insert(trade_key) {
1701                                unique_count += 1;
1702                                page_trades.push(trade);
1703                            } else {
1704                                duplicates += 1;
1705                            }
1706                        }
1707                        Err(e) => tracing::error!("{e}"),
1708                    }
1709                }
1710
1711                tracing::debug!(
1712                    "Page {} processed: {} trades kept, {} filtered out, {} duplicates, hit_start_boundary={}",
1713                    pages,
1714                    page_trades.len(),
1715                    filtered_out,
1716                    duplicates,
1717                    hit_start_boundary
1718                );
1719
1720                // Extract oldest unique trade ID for next page cursor
1721                let oldest_trade_id = if !page_trades.is_empty() {
1722                    // Use oldest deduplicated trade ID before reversing
1723                    let oldest_id = page_trades.last().map(|t| {
1724                        let id = t.trade_id.to_string();
1725                        tracing::debug!(
1726                            "Setting cursor from deduplicated trades: oldest_id={}, ts_event={}",
1727                            id,
1728                            t.ts_event.as_i64()
1729                        );
1730                        id
1731                    });
1732                    page_trades.reverse();
1733                    page_results.push(page_trades);
1734                    consecutive_empty_pages = 0;
1735                    oldest_id
1736                } else {
1737                    // Only apply consecutive empty guard if we've already collected some trades
1738                    // This allows historical backfills to paginate through empty prelude
1739                    if unique_count > 0 {
1740                        consecutive_empty_pages += 1;
1741                        if consecutive_empty_pages >= MAX_CONSECUTIVE_EMPTY {
1742                            tracing::debug!(
1743                                "Stopping: {} consecutive pages with no trades in range after collecting {} trades",
1744                                consecutive_empty_pages,
1745                                unique_count
1746                            );
1747                            break;
1748                        }
1749                    }
1750                    // No unique trades on page, use raw response for cursor
1751                    raw.last().map(|t| {
1752                        let id = t.trade_id.to_string();
1753                        tracing::debug!(
1754                            "Setting cursor from raw response (no unique trades): oldest_id={}",
1755                            id
1756                        );
1757                        id
1758                    })
1759                };
1760
1761                if let Some(ref old_id) = before_trade_id
1762                    && oldest_trade_id.as_ref() == Some(old_id)
1763                {
1764                    break;
1765                }
1766
1767                if oldest_trade_id.is_none() {
1768                    break;
1769                }
1770
1771                before_trade_id = oldest_trade_id;
1772
1773                if hit_start_boundary {
1774                    break;
1775                }
1776
1777                tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
1778            }
1779
1780            tracing::debug!(
1781                "Pagination complete: {} pages, {} unique trades collected",
1782                pages,
1783                unique_count
1784            );
1785
1786            let mut out: Vec<TradeTick> = Vec::new();
1787            for page in page_results.into_iter().rev() {
1788                out.extend(page);
1789            }
1790
1791            // Deduplicate by (trade_id, ts_event) composite key
1792            let mut dedup_keys = std::collections::HashSet::new();
1793            let pre_dedup_len = out.len();
1794            out.retain(|trade| {
1795                dedup_keys.insert((trade.trade_id.to_string(), trade.ts_event.as_i64()))
1796            });
1797
1798            if out.len() < pre_dedup_len {
1799                tracing::debug!(
1800                    "Removed {} duplicate trades during final dedup",
1801                    pre_dedup_len - out.len()
1802                );
1803            }
1804
1805            if let Some(lim) = limit
1806                && lim > 0
1807                && out.len() > lim as usize
1808            {
1809                let excess = out.len() - lim as usize;
1810                tracing::debug!("Trimming {} oldest trades to respect limit={}", excess, lim);
1811                out.drain(0..excess);
1812            }
1813
1814            tracing::debug!("Returning {} trades", out.len());
1815            return Ok(out);
1816        }
1817
1818        let req_limit = limit
1819            .unwrap_or(OKX_TRADES_MAX_LIMIT)
1820            .min(OKX_TRADES_MAX_LIMIT);
1821        let params = GetTradesParamsBuilder::default()
1822            .inst_id(instrument_id.symbol.inner())
1823            .limit(req_limit)
1824            .build()
1825            .map_err(anyhow::Error::new)?;
1826
1827        let raw = self
1828            .inner
1829            .get_history_trades(params)
1830            .await
1831            .map_err(anyhow::Error::new)?;
1832
1833        let mut trades: Vec<TradeTick> = Vec::with_capacity(raw.len());
1834        for r in &raw {
1835            match parse_trade_tick(
1836                r,
1837                instrument_id,
1838                inst.price_precision(),
1839                inst.size_precision(),
1840                ts_init,
1841            ) {
1842                Ok(trade) => trades.push(trade),
1843                Err(e) => tracing::error!("{e}"),
1844            }
1845        }
1846
1847        // OKX returns newest-first, reverse to oldest-first
1848        trades.reverse();
1849
1850        if let Some(lim) = limit
1851            && lim > 0
1852            && trades.len() > lim as usize
1853        {
1854            trades.drain(0..trades.len() - lim as usize);
1855        }
1856
1857        Ok(trades)
1858    }
1859
1860    /// Requests historical bars for the given bar type and time range.
1861    ///
1862    /// The aggregation source must be `EXTERNAL`. Time range validation ensures start < end.
1863    /// Returns bars sorted oldest to newest.
1864    ///
1865    /// # Errors
1866    ///
1867    /// Returns an error if the request fails.
1868    ///
1869    /// # Endpoint Selection
1870    ///
1871    /// The OKX API has different endpoints with different limits:
1872    /// - Regular endpoint (`/api/v5/market/candles`): ≤ 300 rows/call, ≤ 40 req/2s
1873    ///   - Used when: start is None OR age ≤ 100 days
1874    /// - History endpoint (`/api/v5/market/history-candles`): ≤ 100 rows/call, ≤ 20 req/2s
1875    ///   - Used when: start is Some AND age > 100 days
1876    ///
1877    /// Age is calculated as `Utc::now() - start` at the time of the first request.
1878    ///
1879    /// # Supported Aggregations
1880    ///
1881    /// Maps to OKX bar query parameter:
1882    /// - `Second` → `{n}s`
1883    /// - `Minute` → `{n}m`
1884    /// - `Hour` → `{n}H`
1885    /// - `Day` → `{n}D`
1886    /// - `Week` → `{n}W`
1887    /// - `Month` → `{n}M`
1888    ///
1889    /// # Pagination
1890    ///
1891    /// - Uses `before` parameter for backwards pagination
1892    /// - Pages backwards from end time (or now) to start time
1893    /// - Stops when: limit reached, time window covered, or API returns empty
1894    /// - Rate limit safety: ≥ 50ms between requests
1895    ///
1896    /// # Panics
1897    ///
1898    /// May panic if internal data structures are in an unexpected state.
1899    ///
1900    /// # References
1901    ///
1902    /// - <https://tr.okx.com/docs-v5/en/#order-book-trading-market-data-get-candlesticks>
1903    /// - <https://tr.okx.com/docs-v5/en/#order-book-trading-market-data-get-candlesticks-history>
1904    pub async fn request_bars(
1905        &self,
1906        bar_type: BarType,
1907        start: Option<DateTime<Utc>>,
1908        mut end: Option<DateTime<Utc>>,
1909        limit: Option<u32>,
1910    ) -> anyhow::Result<Vec<Bar>> {
1911        const HISTORY_SPLIT_DAYS: i64 = 100;
1912        const MAX_PAGES_SOFT: usize = 500;
1913
1914        let limit = if limit == Some(0) { None } else { limit };
1915
1916        anyhow::ensure!(
1917            bar_type.aggregation_source() == AggregationSource::External,
1918            "Only EXTERNAL aggregation is supported"
1919        );
1920
1921        if let (Some(s), Some(e)) = (start, end) {
1922            anyhow::ensure!(s < e, "Invalid time range: start={s:?} end={e:?}");
1923        }
1924
1925        let now = Utc::now();
1926
1927        if let Some(s) = start
1928            && s > now
1929        {
1930            return Ok(Vec::new());
1931        }
1932        if let Some(e) = end
1933            && e > now
1934        {
1935            end = Some(now);
1936        }
1937
1938        let spec = bar_type.spec();
1939        let step = spec.step.get();
1940        let bar_param = match spec.aggregation {
1941            BarAggregation::Second => format!("{step}s"),
1942            BarAggregation::Minute => format!("{step}m"),
1943            BarAggregation::Hour => format!("{step}H"),
1944            BarAggregation::Day => format!("{step}D"),
1945            BarAggregation::Week => format!("{step}W"),
1946            BarAggregation::Month => format!("{step}M"),
1947            a => anyhow::bail!("OKX does not support {a:?} aggregation"),
1948        };
1949
1950        let slot_ms: i64 = match spec.aggregation {
1951            BarAggregation::Second => (step as i64) * 1_000,
1952            BarAggregation::Minute => (step as i64) * 60_000,
1953            BarAggregation::Hour => (step as i64) * 3_600_000,
1954            BarAggregation::Day => (step as i64) * 86_400_000,
1955            BarAggregation::Week => (step as i64) * 7 * 86_400_000,
1956            BarAggregation::Month => (step as i64) * 30 * 86_400_000,
1957            _ => unreachable!("Unsupported aggregation should have been caught above"),
1958        };
1959        let slot_ns: i64 = slot_ms * 1_000_000;
1960
1961        #[derive(Clone, Copy, Debug, PartialEq, Eq)]
1962        enum Mode {
1963            Latest,
1964            Backward,
1965            Range,
1966        }
1967
1968        let mode = match (start, end) {
1969            (None, None) => Mode::Latest,
1970            (Some(_), None) => Mode::Backward, // Changed: when only start is provided, work backward from now
1971            (None, Some(_)) => Mode::Backward,
1972            (Some(_), Some(_)) => Mode::Range,
1973        };
1974
1975        let start_ns = start.and_then(|s| s.timestamp_nanos_opt());
1976        let end_ns = end.and_then(|e| e.timestamp_nanos_opt());
1977
1978        // Floor start and ceiling end to bar boundaries for cleaner API requests
1979        let start_ms = start.map(|s| {
1980            let ms = s.timestamp_millis();
1981            if slot_ms > 0 {
1982                (ms / slot_ms) * slot_ms // Floor to nearest bar boundary
1983            } else {
1984                ms
1985            }
1986        });
1987        let end_ms = end.map(|e| {
1988            let ms = e.timestamp_millis();
1989            if slot_ms > 0 {
1990                ((ms + slot_ms - 1) / slot_ms) * slot_ms // Ceiling to nearest bar boundary
1991            } else {
1992                ms
1993            }
1994        });
1995        let now_ms = now.timestamp_millis();
1996
1997        let symbol = bar_type.instrument_id().symbol;
1998        let inst = self.instrument_from_cache(symbol.inner())?;
1999
2000        let mut out: Vec<Bar> = Vec::new();
2001        let mut pages = 0usize;
2002
2003        // IMPORTANT: OKX API has COUNTER-INTUITIVE semantics (same for bars and trades):
2004        // - after=X returns records with timestamp < X (upper bound, despite the name!)
2005        // - before=X returns records with timestamp > X (lower bound, despite the name!)
2006        // For Range [start, end], use: before=start (lower bound), after=end (upper bound)
2007        let mut after_ms: Option<i64> = match mode {
2008            Mode::Range => end_ms.or(Some(now_ms)), // Upper bound: bars < end
2009            _ => None,
2010        };
2011        let mut before_ms: Option<i64> = match mode {
2012            Mode::Backward => end_ms.map(|v| v.saturating_sub(1)),
2013            Mode::Range => start_ms, // Lower bound: bars > start
2014            Mode::Latest => None,
2015        };
2016
2017        // For Range mode, we'll paginate backwards like Backward mode
2018        let mut forward_prepend_mode = matches!(mode, Mode::Range);
2019
2020        // Adjust before_ms to ensure we get data from the API
2021        // OKX API might not have bars for the very recent past
2022        // This handles both explicit end=now and the actor layer setting end=now when it's None
2023        if matches!(mode, Mode::Backward | Mode::Range)
2024            && let Some(b) = before_ms
2025        {
2026            // OKX endpoints have different data availability windows:
2027            // - Regular endpoint: has most recent data but limited depth
2028            // - History endpoint: has deep history but lags behind current time
2029            // Use a small buffer to avoid the "dead zone"
2030            let buffer_ms = slot_ms.max(60_000); // At least 1 minute or 1 bar
2031            if b >= now_ms.saturating_sub(buffer_ms) {
2032                before_ms = Some(now_ms.saturating_sub(buffer_ms));
2033            }
2034        }
2035
2036        let mut have_latest_first_page = false;
2037        let mut progressless_loops = 0u8;
2038
2039        loop {
2040            if let Some(lim) = limit
2041                && lim > 0
2042                && out.len() >= lim as usize
2043            {
2044                break;
2045            }
2046            if pages >= MAX_PAGES_SOFT {
2047                break;
2048            }
2049
2050            let pivot_ms = if let Some(a) = after_ms {
2051                a
2052            } else if let Some(b) = before_ms {
2053                b
2054            } else {
2055                now_ms
2056            };
2057            // Choose endpoint based on how old the data is:
2058            // - Use regular endpoint for recent data (< 1 hour old)
2059            // - Use history endpoint for older data (> 1 hour old)
2060            // This avoids the "gap" where history endpoint has no recent data
2061            // and regular endpoint has limited depth
2062            let age_ms = now_ms.saturating_sub(pivot_ms);
2063            let age_hours = age_ms / (60 * 60 * 1000);
2064            let using_history = age_hours > 1; // Use history if data is > 1 hour old
2065
2066            let page_ceiling = if using_history { 100 } else { 300 };
2067            let remaining = limit
2068                .filter(|&l| l > 0) // Treat limit=0 as no limit
2069                .map_or(page_ceiling, |l| (l as usize).saturating_sub(out.len()));
2070            let page_cap = remaining.min(page_ceiling);
2071
2072            let mut p = GetCandlesticksParamsBuilder::default();
2073            p.inst_id(symbol.as_str())
2074                .bar(&bar_param)
2075                .limit(page_cap as u32);
2076
2077            // Track whether this planned request uses BEFORE or AFTER.
2078            let mut req_used_before = false;
2079
2080            match mode {
2081                Mode::Latest => {
2082                    if have_latest_first_page && let Some(b) = before_ms {
2083                        p.before_ms(b);
2084                        req_used_before = true;
2085                    }
2086                }
2087                Mode::Backward => {
2088                    // Use 'after' to get older bars (OKX API: after=cursor means < cursor)
2089                    if let Some(b) = before_ms {
2090                        p.after_ms(b);
2091                    }
2092                }
2093                Mode::Range => {
2094                    // For Range mode, use both after and before to specify the full range
2095                    // This is much more efficient than pagination
2096                    if let Some(a) = after_ms {
2097                        p.after_ms(a);
2098                    }
2099                    if let Some(b) = before_ms {
2100                        p.before_ms(b);
2101                        req_used_before = true;
2102                    }
2103                }
2104            }
2105
2106            let params = p.build().map_err(anyhow::Error::new)?;
2107
2108            let mut raw = if using_history {
2109                self.inner
2110                    .get_history_candles(params.clone())
2111                    .await
2112                    .map_err(anyhow::Error::new)?
2113            } else {
2114                self.inner
2115                    .get_candles(params.clone())
2116                    .await
2117                    .map_err(anyhow::Error::new)?
2118            };
2119
2120            // --- Fallbacks on empty page ---
2121            if raw.is_empty() {
2122                // LATEST: retry same cursor via history, then step back a page-interval before giving up
2123                if matches!(mode, Mode::Latest)
2124                    && have_latest_first_page
2125                    && !using_history
2126                    && let Some(b) = before_ms
2127                {
2128                    let mut p2 = GetCandlesticksParamsBuilder::default();
2129                    p2.inst_id(symbol.as_str())
2130                        .bar(&bar_param)
2131                        .limit(page_cap as u32);
2132                    p2.before_ms(b);
2133                    let params2 = p2.build().map_err(anyhow::Error::new)?;
2134                    let raw2 = self
2135                        .inner
2136                        .get_history_candles(params2)
2137                        .await
2138                        .map_err(anyhow::Error::new)?;
2139                    if !raw2.is_empty() {
2140                        raw = raw2;
2141                    } else {
2142                        // Step back one page interval and retry loop
2143                        let jump = (page_cap as i64).saturating_mul(slot_ms.max(1));
2144                        before_ms = Some(b.saturating_sub(jump));
2145                        progressless_loops = progressless_loops.saturating_add(1);
2146                        if progressless_loops >= 3 {
2147                            break;
2148                        }
2149                        continue;
2150                    }
2151                }
2152
2153                // Range mode doesn't need special bootstrap - it uses the normal flow with before_ms set
2154
2155                // If still empty: for Range after first page, try a single backstep window using BEFORE
2156                if raw.is_empty() && matches!(mode, Mode::Range) && pages > 0 {
2157                    let backstep_ms = (page_cap as i64).saturating_mul(slot_ms.max(1));
2158                    let pivot_back = after_ms.unwrap_or(now_ms).saturating_sub(backstep_ms);
2159
2160                    let mut p2 = GetCandlesticksParamsBuilder::default();
2161                    p2.inst_id(symbol.as_str())
2162                        .bar(&bar_param)
2163                        .limit(page_cap as u32)
2164                        .before_ms(pivot_back);
2165                    let params2 = p2.build().map_err(anyhow::Error::new)?;
2166                    let raw2 = if (now_ms.saturating_sub(pivot_back)) / (24 * 60 * 60 * 1000)
2167                        > HISTORY_SPLIT_DAYS
2168                    {
2169                        self.inner.get_history_candles(params2).await
2170                    } else {
2171                        self.inner.get_candles(params2).await
2172                    }
2173                    .map_err(anyhow::Error::new)?;
2174                    if raw2.is_empty() {
2175                        break;
2176                    } else {
2177                        raw = raw2;
2178                        forward_prepend_mode = true;
2179                        req_used_before = true;
2180                    }
2181                }
2182
2183                // First LATEST page empty: jump back >100d to force history, then continue loop
2184                if raw.is_empty()
2185                    && matches!(mode, Mode::Latest)
2186                    && !have_latest_first_page
2187                    && !using_history
2188                {
2189                    let jump_days_ms = (HISTORY_SPLIT_DAYS + 1) * 86_400_000;
2190                    before_ms = Some(now_ms.saturating_sub(jump_days_ms));
2191                    have_latest_first_page = true;
2192                    continue;
2193                }
2194
2195                // Still empty for any other case? Just break.
2196                if raw.is_empty() {
2197                    break;
2198                }
2199            }
2200            // --- end fallbacks ---
2201
2202            pages += 1;
2203
2204            // Parse, oldest → newest
2205            let ts_init = self.generate_ts_init();
2206            let mut page: Vec<Bar> = Vec::with_capacity(raw.len());
2207            for r in &raw {
2208                page.push(parse_candlestick(
2209                    r,
2210                    bar_type,
2211                    inst.price_precision(),
2212                    inst.size_precision(),
2213                    ts_init,
2214                )?);
2215            }
2216            page.reverse();
2217
2218            let page_oldest_ms = page.first().map(|b| b.ts_event.as_i64() / 1_000_000);
2219            let page_newest_ms = page.last().map(|b| b.ts_event.as_i64() / 1_000_000);
2220
2221            // Range filter (inclusive)
2222            // For Range mode, if we have no bars yet and this is an early page,
2223            // be more tolerant with the start boundary to handle gaps in data
2224            let mut filtered: Vec<Bar> = if matches!(mode, Mode::Range)
2225                && out.is_empty()
2226                && pages < 2
2227            {
2228                // On first pages of Range mode with no data yet, include the most recent bar
2229                // even if it's slightly before our start time (within 2 bar periods)
2230                // BUT we want ALL bars in the page that are within our range
2231                let tolerance_ns = slot_ns * 2; // Allow up to 2 bar periods before start
2232
2233                // Debug: log the page range
2234                if !page.is_empty() {
2235                    tracing::debug!(
2236                        "Range mode bootstrap page: {} bars from {} to {}, filtering with start={:?} end={:?}",
2237                        page.len(),
2238                        page.first().unwrap().ts_event.as_i64() / 1_000_000,
2239                        page.last().unwrap().ts_event.as_i64() / 1_000_000,
2240                        start_ms,
2241                        end_ms
2242                    );
2243                }
2244
2245                let result: Vec<Bar> = page
2246                    .clone()
2247                    .into_iter()
2248                    .filter(|b| {
2249                        let ts = b.ts_event.as_i64();
2250                        // Accept bars from (start - tolerance) to end
2251                        let ok_after =
2252                            start_ns.is_none_or(|sns| ts >= sns.saturating_sub(tolerance_ns));
2253                        let ok_before = end_ns.is_none_or(|ens| ts <= ens);
2254                        ok_after && ok_before
2255                    })
2256                    .collect();
2257
2258                result
2259            } else {
2260                // Normal filtering
2261                page.clone()
2262                    .into_iter()
2263                    .filter(|b| {
2264                        let ts = b.ts_event.as_i64();
2265                        let ok_after = start_ns.is_none_or(|sns| ts >= sns);
2266                        let ok_before = end_ns.is_none_or(|ens| ts <= ens);
2267                        ok_after && ok_before
2268                    })
2269                    .collect()
2270            };
2271
2272            if !page.is_empty() && filtered.is_empty() {
2273                // For Range mode, if all bars are before our start time, there's no point continuing
2274                if matches!(mode, Mode::Range)
2275                    && !forward_prepend_mode
2276                    && let (Some(newest_ms), Some(start_ms)) = (page_newest_ms, start_ms)
2277                    && newest_ms < start_ms.saturating_sub(slot_ms * 2)
2278                {
2279                    // Bars are too old (more than 2 bar periods before start), stop
2280                    break;
2281                }
2282            }
2283
2284            // Track contribution for progress guard
2285            let contribution;
2286
2287            if out.is_empty() {
2288                contribution = filtered.len();
2289                out = filtered;
2290            } else {
2291                match mode {
2292                    Mode::Backward | Mode::Latest => {
2293                        if let Some(first) = out.first() {
2294                            filtered.retain(|b| b.ts_event < first.ts_event);
2295                        }
2296                        contribution = filtered.len();
2297                        if contribution != 0 {
2298                            let mut new_out = Vec::with_capacity(out.len() + filtered.len());
2299                            new_out.extend_from_slice(&filtered);
2300                            new_out.extend_from_slice(&out);
2301                            out = new_out;
2302                        }
2303                    }
2304                    Mode::Range => {
2305                        if forward_prepend_mode || req_used_before {
2306                            // We are backfilling older pages: prepend them.
2307                            if let Some(first) = out.first() {
2308                                filtered.retain(|b| b.ts_event < first.ts_event);
2309                            }
2310                            contribution = filtered.len();
2311                            if contribution != 0 {
2312                                let mut new_out = Vec::with_capacity(out.len() + filtered.len());
2313                                new_out.extend_from_slice(&filtered);
2314                                new_out.extend_from_slice(&out);
2315                                out = new_out;
2316                            }
2317                        } else {
2318                            // Normal forward: append newer pages.
2319                            if let Some(last) = out.last() {
2320                                filtered.retain(|b| b.ts_event > last.ts_event);
2321                            }
2322                            contribution = filtered.len();
2323                            out.extend(filtered);
2324                        }
2325                    }
2326                }
2327            }
2328
2329            // Duplicate-window mitigation for Latest/Backward/Range
2330            if contribution == 0
2331                && matches!(mode, Mode::Latest | Mode::Backward | Mode::Range)
2332                && let Some(b) = before_ms
2333            {
2334                let jump = (page_cap as i64).saturating_mul(slot_ms.max(1));
2335                let new_b = b.saturating_sub(jump);
2336                if new_b != b {
2337                    before_ms = Some(new_b);
2338                }
2339            }
2340
2341            if contribution == 0 {
2342                progressless_loops = progressless_loops.saturating_add(1);
2343                if progressless_loops >= 3 {
2344                    break;
2345                }
2346            } else {
2347                progressless_loops = 0;
2348
2349                // Advance cursors only when we made progress
2350                match mode {
2351                    Mode::Latest | Mode::Backward => {
2352                        if let Some(oldest) = page_oldest_ms {
2353                            before_ms = Some(oldest.saturating_sub(1));
2354                            have_latest_first_page = true;
2355                        } else {
2356                            break;
2357                        }
2358                    }
2359                    Mode::Range => {
2360                        if forward_prepend_mode || req_used_before {
2361                            if let Some(oldest) = page_oldest_ms {
2362                                // Move back by at least one bar period to avoid getting the same data
2363                                let jump_back = slot_ms.max(60_000); // At least 1 minute
2364                                before_ms = Some(oldest.saturating_sub(jump_back));
2365                                after_ms = None;
2366                            } else {
2367                                break;
2368                            }
2369                        } else if let Some(newest) = page_newest_ms {
2370                            after_ms = Some(newest.saturating_add(1));
2371                            before_ms = None;
2372                        } else {
2373                            break;
2374                        }
2375                    }
2376                }
2377            }
2378
2379            // Stop conditions
2380            if let Some(lim) = limit
2381                && lim > 0
2382                && out.len() >= lim as usize
2383            {
2384                break;
2385            }
2386            if let Some(ens) = end_ns
2387                && let Some(last) = out.last()
2388                && last.ts_event.as_i64() >= ens
2389            {
2390                break;
2391            }
2392            if let Some(sns) = start_ns
2393                && let Some(first) = out.first()
2394                && (matches!(mode, Mode::Backward) || forward_prepend_mode)
2395                && first.ts_event.as_i64() <= sns
2396            {
2397                // For Range mode, check if we have all bars up to the end time
2398                if matches!(mode, Mode::Range) {
2399                    // Don't stop if we haven't reached the end time yet
2400                    if let Some(ens) = end_ns
2401                        && let Some(last) = out.last()
2402                    {
2403                        let last_ts = last.ts_event.as_i64();
2404                        if last_ts < ens {
2405                            // We have bars before start but haven't reached end, need to continue forward
2406                            // Switch from backward to forward pagination
2407                            forward_prepend_mode = false;
2408                            after_ms = Some((last_ts / 1_000_000).saturating_add(1));
2409                            before_ms = None;
2410                            continue;
2411                        }
2412                    }
2413                }
2414                break;
2415            }
2416
2417            tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
2418        }
2419
2420        // Final rescue for FORWARD/RANGE when nothing gathered
2421        if out.is_empty() && matches!(mode, Mode::Range) {
2422            let pivot = end_ms.unwrap_or(now_ms.saturating_sub(1));
2423            let hist = (now_ms.saturating_sub(pivot)) / (24 * 60 * 60 * 1000) > HISTORY_SPLIT_DAYS;
2424            let mut p = GetCandlesticksParamsBuilder::default();
2425            p.inst_id(symbol.as_str())
2426                .bar(&bar_param)
2427                .limit(300)
2428                .before_ms(pivot);
2429            let params = p.build().map_err(anyhow::Error::new)?;
2430            let raw = if hist {
2431                self.inner.get_history_candles(params).await
2432            } else {
2433                self.inner.get_candles(params).await
2434            }
2435            .map_err(anyhow::Error::new)?;
2436            if !raw.is_empty() {
2437                let ts_init = self.generate_ts_init();
2438                let mut page: Vec<Bar> = Vec::with_capacity(raw.len());
2439                for r in &raw {
2440                    page.push(parse_candlestick(
2441                        r,
2442                        bar_type,
2443                        inst.price_precision(),
2444                        inst.size_precision(),
2445                        ts_init,
2446                    )?);
2447                }
2448                page.reverse();
2449                out = page
2450                    .into_iter()
2451                    .filter(|b| {
2452                        let ts = b.ts_event.as_i64();
2453                        let ok_after = start_ns.is_none_or(|sns| ts >= sns);
2454                        let ok_before = end_ns.is_none_or(|ens| ts <= ens);
2455                        ok_after && ok_before
2456                    })
2457                    .collect();
2458            }
2459        }
2460
2461        // Trim against end bound if needed (keep ≤ end)
2462        if let Some(ens) = end_ns {
2463            while out.last().is_some_and(|b| b.ts_event.as_i64() > ens) {
2464                out.pop();
2465            }
2466        }
2467
2468        // Clamp first bar for Range when using forward pagination
2469        if matches!(mode, Mode::Range)
2470            && !forward_prepend_mode
2471            && let Some(sns) = start_ns
2472        {
2473            let lower = sns.saturating_sub(slot_ns);
2474            while out.first().is_some_and(|b| b.ts_event.as_i64() < lower) {
2475                out.remove(0);
2476            }
2477        }
2478
2479        if let Some(lim) = limit
2480            && lim > 0
2481            && out.len() > lim as usize
2482        {
2483            out.truncate(lim as usize);
2484        }
2485
2486        Ok(out)
2487    }
2488
2489    /// Requests historical order status reports for the given parameters.
2490    ///
2491    /// # Errors
2492    ///
2493    /// Returns an error if the request fails.
2494    ///
2495    /// # References
2496    ///
2497    /// - <https://www.okx.com/docs-v5/en/#order-book-trading-trade-get-order-history-last-7-days>.
2498    /// - <https://www.okx.com/docs-v5/en/#order-book-trading-trade-get-order-history-last-3-months>.
2499    #[allow(clippy::too_many_arguments)]
2500    pub async fn request_order_status_reports(
2501        &self,
2502        account_id: AccountId,
2503        instrument_type: Option<OKXInstrumentType>,
2504        instrument_id: Option<InstrumentId>,
2505        start: Option<DateTime<Utc>>,
2506        end: Option<DateTime<Utc>>,
2507        open_only: bool,
2508        limit: Option<u32>,
2509    ) -> anyhow::Result<Vec<OrderStatusReport>> {
2510        let mut history_params = GetOrderHistoryParamsBuilder::default();
2511
2512        let instrument_type = if let Some(instrument_type) = instrument_type {
2513            instrument_type
2514        } else {
2515            let instrument_id = instrument_id.ok_or_else(|| {
2516                anyhow::anyhow!("Instrument ID required if `instrument_type` not provided")
2517            })?;
2518            let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
2519            okx_instrument_type(&instrument)?
2520        };
2521
2522        history_params.inst_type(instrument_type);
2523
2524        if let Some(instrument_id) = instrument_id.as_ref() {
2525            history_params.inst_id(instrument_id.symbol.inner().to_string());
2526        }
2527
2528        if let Some(limit) = limit {
2529            history_params.limit(limit);
2530        }
2531
2532        let history_params = history_params.build().map_err(|e| anyhow::anyhow!(e))?;
2533
2534        let mut pending_params = GetOrderListParamsBuilder::default();
2535        pending_params.inst_type(instrument_type);
2536
2537        if let Some(instrument_id) = instrument_id.as_ref() {
2538            pending_params.inst_id(instrument_id.symbol.inner().to_string());
2539        }
2540
2541        if let Some(limit) = limit {
2542            pending_params.limit(limit);
2543        }
2544
2545        let pending_params = pending_params.build().map_err(|e| anyhow::anyhow!(e))?;
2546
2547        let combined_resp = if open_only {
2548            // Only request pending/open orders
2549            self.inner
2550                .get_orders_pending(pending_params)
2551                .await
2552                .map_err(|e| anyhow::anyhow!(e))?
2553        } else {
2554            // Make both requests concurrently
2555            let (history_resp, pending_resp) = tokio::try_join!(
2556                self.inner.get_orders_history(history_params),
2557                self.inner.get_orders_pending(pending_params)
2558            )
2559            .map_err(|e| anyhow::anyhow!(e))?;
2560
2561            // Combine both responses
2562            let mut combined_resp = history_resp;
2563            combined_resp.extend(pending_resp);
2564            combined_resp
2565        };
2566
2567        // Prepare time range filter
2568        let start_ns = start.map(UnixNanos::from);
2569        let end_ns = end.map(UnixNanos::from);
2570
2571        let ts_init = self.generate_ts_init();
2572        let mut reports = Vec::with_capacity(combined_resp.len());
2573
2574        // Use a seen filter in case pending orders are within the histories "2hr reserve window"
2575        let mut seen: AHashSet<String> = AHashSet::new();
2576
2577        for order in combined_resp {
2578            let seen_key = if !order.cl_ord_id.is_empty() {
2579                order.cl_ord_id.as_str().to_string()
2580            } else if let Some(algo_cl_ord_id) = order
2581                .algo_cl_ord_id
2582                .as_ref()
2583                .filter(|value| !value.as_str().is_empty())
2584            {
2585                algo_cl_ord_id.as_str().to_string()
2586            } else if let Some(algo_id) = order
2587                .algo_id
2588                .as_ref()
2589                .filter(|value| !value.as_str().is_empty())
2590            {
2591                algo_id.as_str().to_string()
2592            } else {
2593                order.ord_id.as_str().to_string()
2594            };
2595
2596            if !seen.insert(seen_key) {
2597                continue; // Reserved pending already reported
2598            }
2599
2600            let Ok(inst) = self.instrument_from_cache(order.inst_id) else {
2601                tracing::debug!(
2602                    symbol = %order.inst_id,
2603                    "Skipping order report for instrument not in cache"
2604                );
2605                continue;
2606            };
2607
2608            let report = match parse_order_status_report(
2609                &order,
2610                account_id,
2611                inst.id(),
2612                inst.price_precision(),
2613                inst.size_precision(),
2614                ts_init,
2615            ) {
2616                Ok(report) => report,
2617                Err(e) => {
2618                    tracing::error!("Failed to parse order status report: {e}");
2619                    continue;
2620                }
2621            };
2622
2623            if let Some(start_ns) = start_ns
2624                && report.ts_last < start_ns
2625            {
2626                continue;
2627            }
2628            if let Some(end_ns) = end_ns
2629                && report.ts_last > end_ns
2630            {
2631                continue;
2632            }
2633
2634            reports.push(report);
2635        }
2636
2637        Ok(reports)
2638    }
2639
2640    /// Requests fill reports (transaction details) for the given parameters.
2641    ///
2642    /// # Errors
2643    ///
2644    /// Returns an error if the request fails.
2645    ///
2646    /// # References
2647    ///
2648    /// <https://www.okx.com/docs-v5/en/#order-book-trading-trade-get-transaction-details-last-3-days>.
2649    pub async fn request_fill_reports(
2650        &self,
2651        account_id: AccountId,
2652        instrument_type: Option<OKXInstrumentType>,
2653        instrument_id: Option<InstrumentId>,
2654        start: Option<DateTime<Utc>>,
2655        end: Option<DateTime<Utc>>,
2656        limit: Option<u32>,
2657    ) -> anyhow::Result<Vec<FillReport>> {
2658        let mut params = GetTransactionDetailsParamsBuilder::default();
2659
2660        let instrument_type = if let Some(instrument_type) = instrument_type {
2661            instrument_type
2662        } else {
2663            let instrument_id = instrument_id.ok_or_else(|| {
2664                anyhow::anyhow!("Instrument ID required if `instrument_type` not provided")
2665            })?;
2666            let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
2667            okx_instrument_type(&instrument)?
2668        };
2669
2670        params.inst_type(instrument_type);
2671
2672        if let Some(instrument_id) = instrument_id {
2673            let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
2674            let instrument_type = okx_instrument_type(&instrument)?;
2675            params.inst_type(instrument_type);
2676            params.inst_id(instrument_id.symbol.inner().to_string());
2677        }
2678
2679        if let Some(limit) = limit {
2680            params.limit(limit);
2681        }
2682
2683        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2684
2685        let resp = self
2686            .inner
2687            .get_fills(params)
2688            .await
2689            .map_err(|e| anyhow::anyhow!(e))?;
2690
2691        // Prepare time range filter
2692        let start_ns = start.map(UnixNanos::from);
2693        let end_ns = end.map(UnixNanos::from);
2694
2695        let ts_init = self.generate_ts_init();
2696        let mut reports = Vec::with_capacity(resp.len());
2697
2698        for detail in resp {
2699            // Skip fills with zero or negative quantity (cancelled orders, etc)
2700            if detail.fill_sz.is_empty() {
2701                continue;
2702            }
2703            if let Ok(qty) = detail.fill_sz.parse::<f64>() {
2704                if qty <= 0.0 {
2705                    continue;
2706                }
2707            } else {
2708                // Skip unparsable quantities
2709                continue;
2710            }
2711
2712            let Ok(inst) = self.instrument_from_cache(detail.inst_id) else {
2713                tracing::debug!(
2714                    symbol = %detail.inst_id,
2715                    "Skipping fill report for instrument not in cache"
2716                );
2717                continue;
2718            };
2719
2720            let report = match parse_fill_report(
2721                detail,
2722                account_id,
2723                inst.id(),
2724                inst.price_precision(),
2725                inst.size_precision(),
2726                ts_init,
2727            ) {
2728                Ok(report) => report,
2729                Err(e) => {
2730                    tracing::error!("Failed to parse fill report: {e}");
2731                    continue;
2732                }
2733            };
2734
2735            if let Some(start_ns) = start_ns
2736                && report.ts_event < start_ns
2737            {
2738                continue;
2739            }
2740
2741            if let Some(end_ns) = end_ns
2742                && report.ts_event > end_ns
2743            {
2744                continue;
2745            }
2746
2747            reports.push(report);
2748        }
2749
2750        Ok(reports)
2751    }
2752
2753    /// Requests current position status reports for the given parameters.
2754    ///
2755    /// # Position Modes
2756    ///
2757    /// OKX supports two position modes, which affects how position data is returned:
2758    ///
2759    /// ## Net Mode (One-way)
2760    /// - `posSide` field will be `"net"`
2761    /// - `pos` field uses **signed quantities**:
2762    ///   - Positive value = Long position
2763    ///   - Negative value = Short position
2764    ///   - Zero = Flat/no position
2765    ///
2766    /// ## Long/Short Mode (Hedge/Dual-side)
2767    /// - `posSide` field will be `"long"` or `"short"`
2768    /// - `pos` field is **always positive** (use `posSide` to determine actual side)
2769    /// - Allows holding simultaneous long and short positions on the same instrument
2770    /// - Position IDs are suffixed with `-LONG` or `-SHORT` for uniqueness
2771    ///
2772    /// # Errors
2773    ///
2774    /// Returns an error if the request fails.
2775    ///
2776    /// # References
2777    ///
2778    /// <https://www.okx.com/docs-v5/en/#trading-account-rest-api-get-positions>
2779    pub async fn request_position_status_reports(
2780        &self,
2781        account_id: AccountId,
2782        instrument_type: Option<OKXInstrumentType>,
2783        instrument_id: Option<InstrumentId>,
2784    ) -> anyhow::Result<Vec<PositionStatusReport>> {
2785        let mut params = GetPositionsParamsBuilder::default();
2786
2787        let instrument_type = if let Some(instrument_type) = instrument_type {
2788            instrument_type
2789        } else {
2790            let instrument_id = instrument_id.ok_or_else(|| {
2791                anyhow::anyhow!("Instrument ID required if `instrument_type` not provided")
2792            })?;
2793            let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
2794            okx_instrument_type(&instrument)?
2795        };
2796
2797        params.inst_type(instrument_type);
2798
2799        instrument_id
2800            .as_ref()
2801            .map(|i| params.inst_id(i.symbol.inner()));
2802
2803        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2804
2805        let resp = self
2806            .inner
2807            .get_positions(params)
2808            .await
2809            .map_err(|e| anyhow::anyhow!(e))?;
2810
2811        let ts_init = self.generate_ts_init();
2812        let mut reports = Vec::with_capacity(resp.len());
2813
2814        for position in resp {
2815            let Ok(inst) = self.instrument_from_cache(position.inst_id) else {
2816                tracing::debug!(
2817                    symbol = %position.inst_id,
2818                    "Skipping position report for instrument not in cache"
2819                );
2820                continue;
2821            };
2822
2823            match parse_position_status_report(
2824                position,
2825                account_id,
2826                inst.id(),
2827                inst.size_precision(),
2828                ts_init,
2829            ) {
2830                Ok(report) => reports.push(report),
2831                Err(e) => {
2832                    tracing::error!("Failed to parse position status report: {e}");
2833                    continue;
2834                }
2835            };
2836        }
2837
2838        Ok(reports)
2839    }
2840
2841    /// Requests spot margin position status reports from account balance.
2842    ///
2843    /// Spot margin positions appear in `/api/v5/account/balance` as balance sheet items
2844    /// with non-zero `liab` (liability) or `spotInUseAmt` fields, rather than in the
2845    /// positions endpoint. This method fetches the balance and converts any margin
2846    /// positions into position status reports.
2847    ///
2848    /// # Errors
2849    ///
2850    /// Returns an error if the request fails or parsing fails.
2851    ///
2852    /// # References
2853    ///
2854    /// <https://www.okx.com/docs-v5/en/#trading-account-rest-api-get-balance>
2855    pub async fn request_spot_margin_position_reports(
2856        &self,
2857        account_id: AccountId,
2858    ) -> anyhow::Result<Vec<PositionStatusReport>> {
2859        let accounts = self
2860            .inner
2861            .get_balance()
2862            .await
2863            .map_err(|e| anyhow::anyhow!(e))?;
2864
2865        let ts_init = self.generate_ts_init();
2866        let mut reports = Vec::new();
2867
2868        for account in accounts {
2869            for balance in account.details {
2870                let ccy_str = balance.ccy.as_str();
2871
2872                // Try to find instrument by constructing potential spot pair symbols
2873                let potential_symbols = [
2874                    format!("{ccy_str}-USDT"),
2875                    format!("{ccy_str}-USD"),
2876                    format!("{ccy_str}-USDC"),
2877                ];
2878
2879                let instrument_result = potential_symbols.iter().find_map(|symbol| {
2880                    self.instrument_from_cache(Ustr::from(symbol))
2881                        .ok()
2882                        .map(|inst| (inst.id(), inst.size_precision()))
2883                });
2884
2885                let (instrument_id, size_precision) = match instrument_result {
2886                    Some((id, prec)) => (id, prec),
2887                    None => {
2888                        tracing::debug!(
2889                            "Skipping balance for {} - no matching instrument in cache",
2890                            ccy_str
2891                        );
2892                        continue;
2893                    }
2894                };
2895
2896                match parse_spot_margin_position_from_balance(
2897                    &balance,
2898                    account_id,
2899                    instrument_id,
2900                    size_precision,
2901                    ts_init,
2902                ) {
2903                    Ok(Some(report)) => reports.push(report),
2904                    Ok(None) => {} // No margin position for this currency
2905                    Err(e) => {
2906                        tracing::error!(
2907                            "Failed to parse spot margin position from balance for {}: {e}",
2908                            ccy_str
2909                        );
2910                        continue;
2911                    }
2912                };
2913            }
2914        }
2915
2916        Ok(reports)
2917    }
2918
2919    /// Places an algo order via HTTP.
2920    ///
2921    /// # Errors
2922    ///
2923    /// Returns an error if the request fails.
2924    ///
2925    /// # References
2926    ///
2927    /// <https://www.okx.com/docs-v5/en/#order-book-trading-algo-trading-post-place-algo-order>
2928    pub async fn place_algo_order(
2929        &self,
2930        request: OKXPlaceAlgoOrderRequest,
2931    ) -> Result<OKXPlaceAlgoOrderResponse, OKXHttpError> {
2932        let body =
2933            serde_json::to_vec(&request).map_err(|e| OKXHttpError::JsonError(e.to_string()))?;
2934
2935        let resp: Vec<OKXPlaceAlgoOrderResponse> = self
2936            .inner
2937            .send_request::<_, ()>(
2938                Method::POST,
2939                "/api/v5/trade/order-algo",
2940                None,
2941                Some(body),
2942                true,
2943            )
2944            .await?;
2945
2946        resp.into_iter()
2947            .next()
2948            .ok_or_else(|| OKXHttpError::ValidationError("Empty response".to_string()))
2949    }
2950
2951    /// Cancels an algo order via HTTP.
2952    ///
2953    /// # Errors
2954    ///
2955    /// Returns an error if the request fails.
2956    ///
2957    /// # References
2958    ///
2959    /// <https://www.okx.com/docs-v5/en/#order-book-trading-algo-trading-post-cancel-algo-order>
2960    pub async fn cancel_algo_order(
2961        &self,
2962        request: OKXCancelAlgoOrderRequest,
2963    ) -> Result<OKXCancelAlgoOrderResponse, OKXHttpError> {
2964        // OKX expects an array for cancel-algos endpoint
2965        // Serialize once to bytes to keep signing and sending identical
2966        let body =
2967            serde_json::to_vec(&[request]).map_err(|e| OKXHttpError::JsonError(e.to_string()))?;
2968
2969        let resp: Vec<OKXCancelAlgoOrderResponse> = self
2970            .inner
2971            .send_request::<_, ()>(
2972                Method::POST,
2973                "/api/v5/trade/cancel-algos",
2974                None,
2975                Some(body),
2976                true,
2977            )
2978            .await?;
2979
2980        resp.into_iter()
2981            .next()
2982            .ok_or_else(|| OKXHttpError::ValidationError("Empty response".to_string()))
2983    }
2984
2985    /// Places an algo order using domain types.
2986    ///
2987    /// This is a convenience method that accepts Nautilus domain types
2988    /// and builds the appropriate OKX request structure internally.
2989    ///
2990    /// # Errors
2991    ///
2992    /// Returns an error if the request fails.
2993    #[allow(clippy::too_many_arguments)]
2994    pub async fn place_algo_order_with_domain_types(
2995        &self,
2996        instrument_id: InstrumentId,
2997        td_mode: OKXTradeMode,
2998        client_order_id: ClientOrderId,
2999        order_side: OrderSide,
3000        order_type: OrderType,
3001        quantity: Quantity,
3002        trigger_price: Price,
3003        trigger_type: Option<TriggerType>,
3004        limit_price: Option<Price>,
3005        reduce_only: Option<bool>,
3006    ) -> Result<OKXPlaceAlgoOrderResponse, OKXHttpError> {
3007        if !matches!(order_side, OrderSide::Buy | OrderSide::Sell) {
3008            return Err(OKXHttpError::ValidationError(
3009                "Invalid order side".to_string(),
3010            ));
3011        }
3012        let okx_side: OKXSide = order_side.into();
3013
3014        // Map trigger type to OKX format
3015        let trigger_px_type_enum = trigger_type.map_or(OKXTriggerType::Last, Into::into);
3016
3017        // Determine order price based on order type
3018        let order_px = if matches!(order_type, OrderType::StopLimit | OrderType::LimitIfTouched) {
3019            limit_price.map(|p| p.to_string())
3020        } else {
3021            // Market orders use -1 to indicate market execution
3022            Some("-1".to_string())
3023        };
3024
3025        let request = OKXPlaceAlgoOrderRequest {
3026            inst_id: instrument_id.symbol.as_str().to_string(),
3027            td_mode,
3028            side: okx_side,
3029            ord_type: OKXAlgoOrderType::Trigger, // All conditional orders use 'trigger' type
3030            sz: quantity.to_string(),
3031            algo_cl_ord_id: Some(client_order_id.as_str().to_string()),
3032            trigger_px: Some(trigger_price.to_string()),
3033            order_px,
3034            trigger_px_type: Some(trigger_px_type_enum),
3035            tgt_ccy: None,  // Let OKX determine based on instrument
3036            pos_side: None, // Use default position side
3037            close_position: None,
3038            tag: Some(OKX_NAUTILUS_BROKER_ID.to_string()),
3039            reduce_only,
3040        };
3041
3042        self.place_algo_order(request).await
3043    }
3044
3045    /// Cancels an algo order using domain types.
3046    ///
3047    /// This is a convenience method that accepts Nautilus domain types
3048    /// and builds the appropriate OKX request structure internally.
3049    ///
3050    /// # Errors
3051    ///
3052    /// Returns an error if the request fails.
3053    pub async fn cancel_algo_order_with_domain_types(
3054        &self,
3055        instrument_id: InstrumentId,
3056        algo_id: String,
3057    ) -> Result<OKXCancelAlgoOrderResponse, OKXHttpError> {
3058        let request = OKXCancelAlgoOrderRequest {
3059            inst_id: instrument_id.symbol.to_string(),
3060            algo_id: Some(algo_id),
3061            algo_cl_ord_id: None,
3062        };
3063
3064        self.cancel_algo_order(request).await
3065    }
3066
3067    /// Requests algo order status reports.
3068    ///
3069    /// # Errors
3070    ///
3071    /// Returns an error if the request fails.
3072    #[allow(clippy::too_many_arguments)]
3073    pub async fn request_algo_order_status_reports(
3074        &self,
3075        account_id: AccountId,
3076        instrument_type: Option<OKXInstrumentType>,
3077        instrument_id: Option<InstrumentId>,
3078        algo_id: Option<String>,
3079        algo_client_order_id: Option<ClientOrderId>,
3080        state: Option<OKXOrderStatus>,
3081        limit: Option<u32>,
3082    ) -> anyhow::Result<Vec<OrderStatusReport>> {
3083        let mut instruments_cache: AHashMap<Ustr, InstrumentAny> = AHashMap::new();
3084
3085        let inst_type = if let Some(inst_type) = instrument_type {
3086            inst_type
3087        } else if let Some(inst_id) = instrument_id {
3088            let instrument = self.instrument_from_cache(inst_id.symbol.inner())?;
3089            let inst_type = okx_instrument_type(&instrument)?;
3090            instruments_cache.insert(inst_id.symbol.inner(), instrument);
3091            inst_type
3092        } else {
3093            anyhow::bail!("instrument_type or instrument_id required for algo order query")
3094        };
3095
3096        let mut params_builder = GetAlgoOrdersParamsBuilder::default();
3097        params_builder.inst_type(inst_type);
3098        if let Some(inst_id) = instrument_id {
3099            params_builder.inst_id(inst_id.symbol.inner().to_string());
3100        }
3101        if let Some(algo_id) = algo_id.as_ref() {
3102            params_builder.algo_id(algo_id.clone());
3103        }
3104        if let Some(client_order_id) = algo_client_order_id.as_ref() {
3105            params_builder.algo_cl_ord_id(client_order_id.as_str().to_string());
3106        }
3107        if let Some(state) = state {
3108            params_builder.state(state);
3109        }
3110        if let Some(limit) = limit {
3111            params_builder.limit(limit);
3112        }
3113
3114        let params = params_builder
3115            .build()
3116            .map_err(|e| anyhow::anyhow!(format!("Failed to build algo order params: {e}")))?;
3117
3118        let ts_init = self.generate_ts_init();
3119        let mut reports = Vec::new();
3120        let mut seen: AHashSet<(String, String)> = AHashSet::new();
3121
3122        let pending = match self.inner.get_order_algo_pending(params.clone()).await {
3123            Ok(result) => result,
3124            Err(OKXHttpError::UnexpectedStatus { status, .. })
3125                if status == StatusCode::NOT_FOUND =>
3126            {
3127                Vec::new()
3128            }
3129            Err(e) => return Err(e.into()),
3130        };
3131        self.collect_algo_reports(
3132            account_id,
3133            &pending,
3134            &mut instruments_cache,
3135            ts_init,
3136            &mut seen,
3137            &mut reports,
3138        )
3139        .await?;
3140
3141        let history = match self.inner.get_order_algo_history(params).await {
3142            Ok(result) => result,
3143            Err(OKXHttpError::UnexpectedStatus { status, .. })
3144                if status == StatusCode::NOT_FOUND =>
3145            {
3146                Vec::new()
3147            }
3148            Err(e) => return Err(e.into()),
3149        };
3150        self.collect_algo_reports(
3151            account_id,
3152            &history,
3153            &mut instruments_cache,
3154            ts_init,
3155            &mut seen,
3156            &mut reports,
3157        )
3158        .await?;
3159
3160        Ok(reports)
3161    }
3162
3163    /// Requests an algo order status report by client order identifier.
3164    ///
3165    /// # Errors
3166    ///
3167    /// Returns an error if the request fails.
3168    pub async fn request_algo_order_status_report(
3169        &self,
3170        account_id: AccountId,
3171        instrument_id: InstrumentId,
3172        algo_client_order_id: ClientOrderId,
3173    ) -> anyhow::Result<Option<OrderStatusReport>> {
3174        let reports = self
3175            .request_algo_order_status_reports(
3176                account_id,
3177                None,
3178                Some(instrument_id),
3179                None,
3180                Some(algo_client_order_id),
3181                None,
3182                Some(50_u32),
3183            )
3184            .await?;
3185
3186        Ok(reports.into_iter().next())
3187    }
3188
3189    /// Exposes raw HTTP client for testing purposes
3190    pub fn raw_client(&self) -> &Arc<OKXRawHttpClient> {
3191        &self.inner
3192    }
3193
3194    async fn collect_algo_reports(
3195        &self,
3196        account_id: AccountId,
3197        orders: &[OKXOrderAlgo],
3198        instruments_cache: &mut AHashMap<Ustr, InstrumentAny>,
3199        ts_init: UnixNanos,
3200        seen: &mut AHashSet<(String, String)>,
3201        reports: &mut Vec<OrderStatusReport>,
3202    ) -> anyhow::Result<()> {
3203        for order in orders {
3204            let key = (order.algo_id.clone(), order.algo_cl_ord_id.clone());
3205            if !seen.insert(key) {
3206                continue;
3207            }
3208
3209            let instrument = if let Some(instrument) = instruments_cache.get(&order.inst_id) {
3210                instrument.clone()
3211            } else {
3212                let Ok(instrument) = self.instrument_from_cache(order.inst_id) else {
3213                    tracing::debug!(
3214                        symbol = %order.inst_id,
3215                        "Skipping algo order report for instrument not in cache"
3216                    );
3217                    continue;
3218                };
3219                instruments_cache.insert(order.inst_id, instrument.clone());
3220                instrument
3221            };
3222
3223            match parse_http_algo_order(order, account_id, &instrument, ts_init) {
3224                Ok(report) => reports.push(report),
3225                Err(e) => {
3226                    tracing::error!("Failed to parse algo order report: {e}");
3227                }
3228            }
3229        }
3230
3231        Ok(())
3232    }
3233}
3234
3235fn parse_http_algo_order(
3236    order: &OKXOrderAlgo,
3237    account_id: AccountId,
3238    instrument: &InstrumentAny,
3239    ts_init: UnixNanos,
3240) -> anyhow::Result<OrderStatusReport> {
3241    let ord_px = if order.ord_px.is_empty() {
3242        "-1".to_string()
3243    } else {
3244        order.ord_px.clone()
3245    };
3246
3247    let reduce_only = if order.reduce_only.is_empty() {
3248        "false".to_string()
3249    } else {
3250        order.reduce_only.clone()
3251    };
3252
3253    let msg = OKXAlgoOrderMsg {
3254        algo_id: order.algo_id.clone(),
3255        algo_cl_ord_id: order.algo_cl_ord_id.clone(),
3256        cl_ord_id: order.cl_ord_id.clone(),
3257        ord_id: order.ord_id.clone(),
3258        inst_id: order.inst_id,
3259        inst_type: order.inst_type,
3260        ord_type: order.ord_type,
3261        state: order.state,
3262        side: order.side,
3263        pos_side: order.pos_side,
3264        sz: order.sz.clone(),
3265        trigger_px: order.trigger_px.clone(),
3266        trigger_px_type: order.trigger_px_type.unwrap_or(OKXTriggerType::None),
3267        ord_px,
3268        td_mode: order.td_mode,
3269        lever: order.lever.clone(),
3270        reduce_only,
3271        actual_px: order.actual_px.clone(),
3272        actual_sz: order.actual_sz.clone(),
3273        notional_usd: order.notional_usd.clone(),
3274        c_time: order.c_time,
3275        u_time: order.u_time,
3276        trigger_time: order.trigger_time.clone(),
3277        tag: order.tag.clone(),
3278    };
3279
3280    parse_algo_order_status_report(&msg, instrument, account_id, ts_init)
3281}