Skip to main content

nautilus_okx/http/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides an ergonomic wrapper around the **OKX v5 REST API** –
17//! <https://www.okx.com/docs-v5/en/>.
18//!
19//! The core type exported by this module is [`OKXHttpClient`].  It offers an
20//! interface to all exchange endpoints currently required by NautilusTrader.
21//!
22//! Key responsibilities handled internally:
23//! • Request signing and header composition for private routes (HMAC-SHA256).
24//! • Rate-limiting based on the public OKX specification.
25//! • Deserialization of JSON payloads into domain models.
26//! • Conversion of raw exchange errors into the rich [`OKXHttpError`] enum.
27//!
28//! # Official Documentation
29//!
30//! | Endpoint                             | Reference                                              |
31//! |--------------------------------------|--------------------------------------------------------|
32//! | Market data                          | <https://www.okx.com/docs-v5/en/#rest-api-market-data> |
33//! | Account & positions                  | <https://www.okx.com/docs-v5/en/#rest-api-account>     |
34//! | Funding & asset balances             | <https://www.okx.com/docs-v5/en/#rest-api-funding>     |
35
36use std::{
37    collections::HashMap,
38    fmt::Debug,
39    num::NonZeroU32,
40    str::FromStr,
41    sync::{
42        Arc, LazyLock,
43        atomic::{AtomicBool, Ordering},
44    },
45};
46
47use ahash::{AHashMap, AHashSet};
48use chrono::{DateTime, Utc};
49use dashmap::DashMap;
50use nautilus_core::{
51    UnixNanos, consts::NAUTILUS_USER_AGENT, env::get_or_env_var, time::get_atomic_clock_realtime,
52};
53use nautilus_model::{
54    data::{Bar, BarType, IndexPriceUpdate, MarkPriceUpdate, TradeTick},
55    enums::{AggregationSource, BarAggregation, OrderSide, OrderType, TriggerType},
56    events::AccountState,
57    identifiers::{AccountId, ClientOrderId, InstrumentId},
58    instruments::{Instrument, InstrumentAny},
59    reports::{FillReport, OrderStatusReport, PositionStatusReport},
60    types::{Price, Quantity},
61};
62use nautilus_network::{
63    http::{HttpClient, Method, StatusCode, USER_AGENT},
64    ratelimiter::quota::Quota,
65    retry::{RetryConfig, RetryManager},
66};
67use rust_decimal::Decimal;
68use serde::{Deserialize, Serialize, de::DeserializeOwned};
69use tokio_util::sync::CancellationToken;
70use ustr::Ustr;
71
72use super::{
73    error::OKXHttpError,
74    models::{
75        OKXAccount, OKXCancelAlgoOrderRequest, OKXCancelAlgoOrderResponse, OKXFeeRate,
76        OKXIndexTicker, OKXMarkPrice, OKXOrderAlgo, OKXOrderHistory, OKXPlaceAlgoOrderRequest,
77        OKXPlaceAlgoOrderResponse, OKXPosition, OKXPositionHistory, OKXPositionTier, OKXServerTime,
78        OKXTransactionDetail,
79    },
80    query::{
81        GetAlgoOrdersParams, GetAlgoOrdersParamsBuilder, GetCandlesticksParams,
82        GetCandlesticksParamsBuilder, GetIndexTickerParams, GetIndexTickerParamsBuilder,
83        GetInstrumentsParams, GetInstrumentsParamsBuilder, GetMarkPriceParams,
84        GetMarkPriceParamsBuilder, GetOrderHistoryParams, GetOrderHistoryParamsBuilder,
85        GetOrderListParams, GetOrderListParamsBuilder, GetPositionTiersParams,
86        GetPositionsHistoryParams, GetPositionsParams, GetPositionsParamsBuilder,
87        GetTradeFeeParams, GetTradesParams, GetTradesParamsBuilder, GetTransactionDetailsParams,
88        GetTransactionDetailsParamsBuilder, SetPositionModeParams, SetPositionModeParamsBuilder,
89    },
90};
91use crate::{
92    common::{
93        consts::{OKX_HTTP_URL, OKX_NAUTILUS_BROKER_ID, should_retry_error_code},
94        credential::Credential,
95        enums::{
96            OKXAlgoOrderType, OKXContractType, OKXInstrumentStatus, OKXInstrumentType,
97            OKXOrderStatus, OKXPositionMode, OKXSide, OKXTradeMode, OKXTriggerType,
98        },
99        models::OKXInstrument,
100        parse::{
101            okx_instrument_type, okx_instrument_type_from_symbol, parse_account_state,
102            parse_candlestick, parse_fill_report, parse_index_price_update, parse_instrument_any,
103            parse_mark_price_update, parse_order_status_report, parse_position_status_report,
104            parse_spot_margin_position_from_balance, parse_trade_tick,
105        },
106    },
107    http::{
108        models::{OKXCandlestick, OKXTrade},
109        query::GetOrderParams,
110    },
111    websocket::{messages::OKXAlgoOrderMsg, parse::parse_algo_order_status_report},
112};
113
114const OKX_SUCCESS_CODE: &str = "0";
115
116fn resolve_okx_error_message(response_body: &[u8], top_level_msg: &str) -> String {
117    let message = top_level_msg.trim();
118    if !message.is_empty() {
119        return message.to_string();
120    }
121
122    if let Ok(payload) = serde_json::from_slice::<serde_json::Value>(response_body)
123        && let Some(first_item) = payload
124            .get("data")
125            .and_then(serde_json::Value::as_array)
126            .and_then(|items| items.first())
127    {
128        if let Some(s_msg) = first_item.get("sMsg").and_then(serde_json::Value::as_str) {
129            let s_msg = s_msg.trim();
130            if !s_msg.is_empty() {
131                return s_msg.to_string();
132            }
133        }
134
135        if let Some(s_code) = first_item.get("sCode").and_then(serde_json::Value::as_str) {
136            let s_code = s_code.trim();
137            if !s_code.is_empty() {
138                return s_code.to_string();
139            }
140        }
141    }
142
143    String::new()
144}
145
146/// Default OKX REST API rate limit: 500 requests per 2 seconds.
147///
148/// - Sub-account order limit: 1000 requests per 2 seconds.
149/// - Account balance: 10 requests per 2 seconds.
150/// - Account instruments: 20 requests per 2 seconds.
151///
152/// We use a conservative 250 requests per second (500 per 2 seconds) as a general limit
153/// that should accommodate most use cases while respecting OKX's documented limits.
154pub static OKX_REST_QUOTA: LazyLock<Quota> =
155    LazyLock::new(|| Quota::per_second(NonZeroU32::new(250).unwrap()));
156
157const OKX_GLOBAL_RATE_KEY: &str = "okx:global";
158
159/// Represents an OKX HTTP response.
160#[derive(Debug, Serialize, Deserialize)]
161pub struct OKXResponse<T> {
162    /// The OKX response code, which is `"0"` for success.
163    pub code: String,
164    /// A message string which can be informational or describe an error cause.
165    pub msg: String,
166    /// The typed data returned by the OKX endpoint.
167    pub data: Vec<T>,
168}
169
170/// Provides a raw HTTP client for interacting with the [OKX](https://okx.com) REST API.
171///
172/// This client wraps the underlying [`HttpClient`] to handle functionality
173/// specific to OKX, such as request signing (for authenticated endpoints),
174/// forming request URLs, and deserializing responses into OKX specific data models.
175pub struct OKXRawHttpClient {
176    base_url: String,
177    client: HttpClient,
178    credential: Option<Credential>,
179    retry_manager: RetryManager<OKXHttpError>,
180    cancellation_token: CancellationToken,
181    is_demo: bool,
182}
183
184impl Default for OKXRawHttpClient {
185    fn default() -> Self {
186        Self::new(None, Some(60), None, None, None, false, None)
187            .expect("Failed to create default OKXRawHttpClient")
188    }
189}
190
191impl Debug for OKXRawHttpClient {
192    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
193        let credential = self.credential.as_ref().map(|_| "<redacted>");
194        f.debug_struct(stringify!(OKXRawHttpClient))
195            .field("base_url", &self.base_url)
196            .field("credential", &credential)
197            .finish_non_exhaustive()
198    }
199}
200
201impl OKXRawHttpClient {
202    fn rate_limiter_quotas() -> Vec<(String, Quota)> {
203        vec![
204            (OKX_GLOBAL_RATE_KEY.to_string(), *OKX_REST_QUOTA),
205            (
206                "okx:/api/v5/account/balance".to_string(),
207                Quota::per_second(NonZeroU32::new(5).unwrap()),
208            ),
209            (
210                "okx:/api/v5/public/instruments".to_string(),
211                Quota::per_second(NonZeroU32::new(10).unwrap()),
212            ),
213            (
214                "okx:/api/v5/market/candles".to_string(),
215                Quota::per_second(NonZeroU32::new(50).unwrap()),
216            ),
217            (
218                "okx:/api/v5/market/history-candles".to_string(),
219                Quota::per_second(NonZeroU32::new(20).unwrap()),
220            ),
221            (
222                "okx:/api/v5/market/history-trades".to_string(),
223                Quota::per_second(NonZeroU32::new(30).unwrap()),
224            ),
225            (
226                "okx:/api/v5/trade/order".to_string(),
227                Quota::per_second(NonZeroU32::new(30).unwrap()), // 60 requests / 2 seconds (per instrument)
228            ),
229            (
230                "okx:/api/v5/trade/orders-pending".to_string(),
231                Quota::per_second(NonZeroU32::new(20).unwrap()),
232            ),
233            (
234                "okx:/api/v5/trade/orders-history".to_string(),
235                Quota::per_second(NonZeroU32::new(20).unwrap()),
236            ),
237            (
238                "okx:/api/v5/trade/fills".to_string(),
239                Quota::per_second(NonZeroU32::new(30).unwrap()),
240            ),
241            (
242                "okx:/api/v5/trade/order-algo".to_string(),
243                Quota::per_second(NonZeroU32::new(10).unwrap()),
244            ),
245            (
246                "okx:/api/v5/trade/cancel-algos".to_string(),
247                Quota::per_second(NonZeroU32::new(10).unwrap()),
248            ),
249        ]
250    }
251
252    fn rate_limit_keys(endpoint: &str) -> Vec<Ustr> {
253        let normalized = endpoint.split('?').next().unwrap_or(endpoint);
254        let route = format!("okx:{normalized}");
255
256        vec![Ustr::from(OKX_GLOBAL_RATE_KEY), Ustr::from(route.as_str())]
257    }
258
259    /// Cancel all pending HTTP requests.
260    pub fn cancel_all_requests(&self) {
261        self.cancellation_token.cancel();
262    }
263
264    /// Get the cancellation token for this client.
265    pub fn cancellation_token(&self) -> &CancellationToken {
266        &self.cancellation_token
267    }
268
269    /// Creates a new [`OKXHttpClient`] using the default OKX HTTP URL,
270    /// optionally overridden with a custom base URL.
271    ///
272    /// This version of the client has **no credentials**, so it can only
273    /// call publicly accessible endpoints.
274    ///
275    /// # Errors
276    ///
277    /// Returns an error if the retry manager cannot be created.
278    pub fn new(
279        base_url: Option<String>,
280        timeout_secs: Option<u64>,
281        max_retries: Option<u32>,
282        retry_delay_ms: Option<u64>,
283        retry_delay_max_ms: Option<u64>,
284        is_demo: bool,
285        proxy_url: Option<String>,
286    ) -> Result<Self, OKXHttpError> {
287        let retry_config = RetryConfig {
288            max_retries: max_retries.unwrap_or(3),
289            initial_delay_ms: retry_delay_ms.unwrap_or(1000),
290            max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
291            backoff_factor: 2.0,
292            jitter_ms: 1000,
293            operation_timeout_ms: Some(60_000),
294            immediate_first: false,
295            max_elapsed_ms: Some(180_000),
296        };
297
298        let retry_manager = RetryManager::new(retry_config);
299
300        Ok(Self {
301            base_url: base_url.unwrap_or(OKX_HTTP_URL.to_string()),
302            client: HttpClient::new(
303                Self::default_headers(is_demo),
304                vec![],
305                Self::rate_limiter_quotas(),
306                Some(*OKX_REST_QUOTA),
307                timeout_secs,
308                proxy_url,
309            )
310            .map_err(|e| {
311                OKXHttpError::ValidationError(format!("Failed to create HTTP client: {e}"))
312            })?,
313            credential: None,
314            retry_manager,
315            cancellation_token: CancellationToken::new(),
316            is_demo,
317        })
318    }
319
320    /// Creates a new [`OKXHttpClient`] configured with credentials
321    /// for authenticated requests, optionally using a custom base URL.
322    ///
323    /// # Errors
324    ///
325    /// Returns an error if the retry manager cannot be created.
326    #[allow(clippy::too_many_arguments)]
327    pub fn with_credentials(
328        api_key: String,
329        api_secret: String,
330        api_passphrase: String,
331        base_url: String,
332        timeout_secs: Option<u64>,
333        max_retries: Option<u32>,
334        retry_delay_ms: Option<u64>,
335        retry_delay_max_ms: Option<u64>,
336        is_demo: bool,
337        proxy_url: Option<String>,
338    ) -> Result<Self, OKXHttpError> {
339        let retry_config = RetryConfig {
340            max_retries: max_retries.unwrap_or(3),
341            initial_delay_ms: retry_delay_ms.unwrap_or(1000),
342            max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
343            backoff_factor: 2.0,
344            jitter_ms: 1000,
345            operation_timeout_ms: Some(60_000),
346            immediate_first: false,
347            max_elapsed_ms: Some(180_000),
348        };
349
350        let retry_manager = RetryManager::new(retry_config);
351
352        Ok(Self {
353            base_url,
354            client: HttpClient::new(
355                Self::default_headers(is_demo),
356                vec![],
357                Self::rate_limiter_quotas(),
358                Some(*OKX_REST_QUOTA),
359                timeout_secs,
360                proxy_url,
361            )
362            .map_err(|e| {
363                OKXHttpError::ValidationError(format!("Failed to create HTTP client: {e}"))
364            })?,
365            credential: Some(Credential::new(api_key, api_secret, api_passphrase)),
366            retry_manager,
367            cancellation_token: CancellationToken::new(),
368            is_demo,
369        })
370    }
371
372    /// Builds the default headers to include with each request (e.g., `User-Agent`).
373    fn default_headers(is_demo: bool) -> HashMap<String, String> {
374        let mut headers =
375            HashMap::from([(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())]);
376
377        if is_demo {
378            headers.insert("x-simulated-trading".to_string(), "1".to_string());
379        }
380
381        headers
382    }
383
384    /// Signs an OKX request with timestamp, API key, passphrase, and signature.
385    ///
386    /// # Errors
387    ///
388    /// Returns [`OKXHttpError::MissingCredentials`] if no credentials are set
389    /// but the request requires authentication.
390    fn sign_request(
391        &self,
392        method: &Method,
393        path: &str,
394        body: Option<&[u8]>,
395    ) -> Result<HashMap<String, String>, OKXHttpError> {
396        let credential = match self.credential.as_ref() {
397            Some(c) => c,
398            None => return Err(OKXHttpError::MissingCredentials),
399        };
400
401        let api_key = credential.api_key.to_string();
402        let api_passphrase = credential.api_passphrase.clone();
403
404        // OKX requires milliseconds in the timestamp (ISO 8601 with milliseconds)
405        let now = Utc::now();
406        let millis = now.timestamp_subsec_millis();
407        let timestamp = now.format("%Y-%m-%dT%H:%M:%S").to_string() + &format!(".{millis:03}Z");
408        let signature = credential.sign_bytes(&timestamp, method.as_str(), path, body);
409
410        let mut headers = HashMap::new();
411        headers.insert("OK-ACCESS-KEY".to_string(), api_key);
412        headers.insert("OK-ACCESS-PASSPHRASE".to_string(), api_passphrase);
413        headers.insert("OK-ACCESS-TIMESTAMP".to_string(), timestamp);
414        headers.insert("OK-ACCESS-SIGN".to_string(), signature);
415
416        Ok(headers)
417    }
418
419    /// Sends an HTTP request to OKX and parses the response into `Vec<T>`.
420    ///
421    /// Internally, this method handles:
422    /// - Building the URL from `base_url` + `path`.
423    /// - Optionally signing the request.
424    /// - Deserializing JSON responses into typed models, or returning a [`OKXHttpError`].
425    /// - Retrying with exponential backoff on transient errors.
426    ///
427    /// # Errors
428    ///
429    /// Returns an error if:
430    /// - The HTTP request fails.
431    /// - Authentication is required but credentials are missing.
432    /// - The response cannot be deserialized into the expected type.
433    /// - The OKX API returns an error response.
434    async fn send_request<T: DeserializeOwned, P: Serialize>(
435        &self,
436        method: Method,
437        path: &str,
438        params: Option<&P>,
439        body: Option<Vec<u8>>,
440        authenticate: bool,
441    ) -> Result<Vec<T>, OKXHttpError> {
442        let url = format!("{}{path}", self.base_url);
443
444        // Pre-compute rate limit keys once outside the retry closure
445        let rate_keys: Vec<String> = Self::rate_limit_keys(path)
446            .into_iter()
447            .map(|k| k.to_string())
448            .collect();
449
450        let operation = || {
451            let url = url.clone();
452            let method = method.clone();
453            let body = body.clone();
454            let rate_keys = rate_keys.clone();
455
456            async move {
457                // Serialize params to query string for signing (if needed)
458                let query_string = if let Some(p) = params {
459                    serde_urlencoded::to_string(p).map_err(|e| {
460                        OKXHttpError::JsonError(format!("Failed to serialize params: {e}"))
461                    })?
462                } else {
463                    String::new()
464                };
465
466                // Build full path with query string for signing
467                let full_path = if query_string.is_empty() {
468                    path.to_string()
469                } else {
470                    format!("{path}?{query_string}")
471                };
472
473                let mut headers = if authenticate {
474                    self.sign_request(&method, &full_path, body.as_deref())?
475                } else {
476                    HashMap::new()
477                };
478
479                // Always set Content-Type header when body is present
480                if body.is_some() {
481                    headers.insert("Content-Type".to_string(), "application/json".to_string());
482                }
483
484                let resp = self
485                    .client
486                    .request_with_params(
487                        method.clone(),
488                        url,
489                        params,
490                        Some(headers),
491                        body,
492                        None,
493                        Some(rate_keys),
494                    )
495                    .await?;
496
497                log::trace!("Response: {resp:?}");
498
499                if resp.status.is_success() {
500                    let okx_response: OKXResponse<T> =
501                        serde_json::from_slice(&resp.body).map_err(|e| {
502                            log::error!("Failed to deserialize OKXResponse: {e}");
503                            OKXHttpError::JsonError(e.to_string())
504                        })?;
505
506                    if okx_response.code != OKX_SUCCESS_CODE {
507                        return Err(OKXHttpError::OkxError {
508                            error_code: okx_response.code,
509                            message: resolve_okx_error_message(&resp.body, &okx_response.msg),
510                        });
511                    }
512
513                    Ok(okx_response.data)
514                } else {
515                    let error_body = String::from_utf8_lossy(&resp.body);
516                    if resp.status.as_u16() == StatusCode::NOT_FOUND.as_u16() {
517                        log::debug!("HTTP 404 with body: {error_body}");
518                    } else {
519                        log::error!(
520                            "HTTP error {} with body: {error_body}",
521                            resp.status.as_str()
522                        );
523                    }
524
525                    if let Ok(parsed_error) = serde_json::from_slice::<OKXResponse<T>>(&resp.body) {
526                        return Err(OKXHttpError::OkxError {
527                            error_code: parsed_error.code,
528                            message: resolve_okx_error_message(&resp.body, &parsed_error.msg),
529                        });
530                    }
531
532                    Err(OKXHttpError::UnexpectedStatus {
533                        status: StatusCode::from_u16(resp.status.as_u16()).unwrap(),
534                        body: error_body.to_string(),
535                    })
536                }
537            }
538        };
539
540        // Retry strategy based on OKX error responses and HTTP status codes:
541        //
542        // 1. Network errors: always retry (transient connection issues)
543        // 2. HTTP 5xx/429: server errors and rate limiting should be retried
544        // 3. OKX specific retryable error codes (defined in common::consts)
545        //
546        // Note: OKX returns many permanent errors which should NOT be retried
547        // (e.g., "Invalid instrument", "Insufficient balance", "Invalid API Key")
548        let should_retry = |error: &OKXHttpError| -> bool {
549            match error {
550                OKXHttpError::HttpClientError(_) => true,
551                OKXHttpError::UnexpectedStatus { status, .. } => {
552                    status.as_u16() >= 500 || status.as_u16() == 429
553                }
554                OKXHttpError::OkxError { error_code, .. } => should_retry_error_code(error_code),
555                _ => false,
556            }
557        };
558
559        let create_error = |msg: String| -> OKXHttpError {
560            if msg == "canceled" {
561                OKXHttpError::Canceled("Adapter disconnecting or shutting down".to_string())
562            } else {
563                OKXHttpError::ValidationError(msg)
564            }
565        };
566
567        self.retry_manager
568            .execute_with_retry_with_cancel(
569                path,
570                operation,
571                should_retry,
572                create_error,
573                &self.cancellation_token,
574            )
575            .await
576    }
577
578    /// Sets the position mode for an account.
579    ///
580    /// # Errors
581    ///
582    /// Returns an error if JSON serialization of `params` fails, if the HTTP
583    /// request fails, or if the response body cannot be deserialized.
584    ///
585    /// # References
586    ///
587    /// <https://www.okx.com/docs-v5/en/#trading-account-rest-api-set-position-mode>
588    pub async fn set_position_mode(
589        &self,
590        params: SetPositionModeParams,
591    ) -> Result<Vec<serde_json::Value>, OKXHttpError> {
592        let path = "/api/v5/account/set-position-mode";
593        let body = serde_json::to_vec(&params)?;
594        self.send_request::<_, ()>(Method::POST, path, None, Some(body), true)
595            .await
596    }
597
598    /// Requests position tiers information, maximum leverage depends on your borrowings and margin ratio.
599    ///
600    /// # Errors
601    ///
602    /// Returns an error if the HTTP request fails, authentication is rejected
603    /// or the response cannot be deserialized.
604    ///
605    /// # References
606    ///
607    /// <https://www.okx.com/docs-v5/en/#public-data-rest-api-get-position-tiers>
608    pub async fn get_position_tiers(
609        &self,
610        params: GetPositionTiersParams,
611    ) -> Result<Vec<OKXPositionTier>, OKXHttpError> {
612        self.send_request(
613            Method::GET,
614            "/api/v5/public/position-tiers",
615            Some(&params),
616            None,
617            false,
618        )
619        .await
620    }
621
622    /// Requests a list of instruments with open contracts.
623    ///
624    /// # Errors
625    ///
626    /// Returns an error if JSON serialization of `params` fails, if the HTTP
627    /// request fails, or if the response body cannot be deserialized.
628    ///
629    /// # References
630    ///
631    /// <https://www.okx.com/docs-v5/en/#public-data-rest-api-get-instruments>
632    pub async fn get_instruments(
633        &self,
634        params: GetInstrumentsParams,
635    ) -> Result<Vec<OKXInstrument>, OKXHttpError> {
636        self.send_request(
637            Method::GET,
638            "/api/v5/public/instruments",
639            Some(&params),
640            None,
641            false,
642        )
643        .await
644    }
645
646    /// Requests the current server time from OKX.
647    ///
648    /// Retrieves the OKX system time in Unix timestamp (milliseconds). This is useful for
649    /// synchronizing local clocks with the exchange server and logging time drift.
650    ///
651    /// # Errors
652    ///
653    /// Returns an error if the HTTP request fails or if the response body
654    /// cannot be parsed into [`OKXServerTime`].
655    ///
656    /// # References
657    ///
658    /// <https://www.okx.com/docs-v5/en/#public-data-rest-api-get-system-time>
659    pub async fn get_server_time(&self) -> Result<u64, OKXHttpError> {
660        let response: Vec<OKXServerTime> = self
661            .send_request::<_, ()>(Method::GET, "/api/v5/public/time", None, None, false)
662            .await?;
663        response
664            .first()
665            .map(|t| t.ts)
666            .ok_or_else(|| OKXHttpError::JsonError("Empty server time response".to_string()))
667    }
668
669    /// Requests a mark price.
670    ///
671    /// We set the mark price based on the SPOT index and at a reasonable basis to prevent individual
672    /// users from manipulating the market and causing the contract price to fluctuate.
673    ///
674    /// # Errors
675    ///
676    /// Returns an error if the HTTP request fails or if the response body
677    /// cannot be parsed into [`OKXMarkPrice`].
678    ///
679    /// # References
680    ///
681    /// <https://www.okx.com/docs-v5/en/#public-data-rest-api-get-mark-price>
682    pub async fn get_mark_price(
683        &self,
684        params: GetMarkPriceParams,
685    ) -> Result<Vec<OKXMarkPrice>, OKXHttpError> {
686        self.send_request(
687            Method::GET,
688            "/api/v5/public/mark-price",
689            Some(&params),
690            None,
691            false,
692        )
693        .await
694    }
695
696    /// Requests the latest index price.
697    ///
698    /// # Errors
699    ///
700    /// Returns an error if the operation fails.
701    ///
702    /// # References
703    ///
704    /// <https://www.okx.com/docs-v5/en/#public-data-rest-api-get-index-tickers>
705    pub async fn get_index_tickers(
706        &self,
707        params: GetIndexTickerParams,
708    ) -> Result<Vec<OKXIndexTicker>, OKXHttpError> {
709        self.send_request(
710            Method::GET,
711            "/api/v5/market/index-tickers",
712            Some(&params),
713            None,
714            false,
715        )
716        .await
717    }
718
719    /// Requests trades history.
720    ///
721    /// # Errors
722    ///
723    /// Returns an error if the operation fails.
724    ///
725    /// # References
726    ///
727    /// <https://www.okx.com/docs-v5/en/#order-book-trading-market-data-get-trades-history>
728    pub async fn get_history_trades(
729        &self,
730        params: GetTradesParams,
731    ) -> Result<Vec<OKXTrade>, OKXHttpError> {
732        self.send_request(
733            Method::GET,
734            "/api/v5/market/history-trades",
735            Some(&params),
736            None,
737            false,
738        )
739        .await
740    }
741
742    /// Requests recent candlestick data.
743    ///
744    /// # Errors
745    ///
746    /// Returns an error if the operation fails.
747    ///
748    /// # References
749    ///
750    /// <https://www.okx.com/docs-v5/en/#order-book-trading-market-data-get-candlesticks>
751    pub async fn get_candles(
752        &self,
753        params: GetCandlesticksParams,
754    ) -> Result<Vec<OKXCandlestick>, OKXHttpError> {
755        self.send_request(
756            Method::GET,
757            "/api/v5/market/candles",
758            Some(&params),
759            None,
760            false,
761        )
762        .await
763    }
764
765    /// Requests historical candlestick data.
766    ///
767    /// # Errors
768    ///
769    /// Returns an error if the operation fails.
770    ///
771    /// # References
772    ///
773    /// <https://www.okx.com/docs-v5/en/#order-book-trading-market-data-get-candlesticks-history>
774    pub async fn get_history_candles(
775        &self,
776        params: GetCandlesticksParams,
777    ) -> Result<Vec<OKXCandlestick>, OKXHttpError> {
778        self.send_request(
779            Method::GET,
780            "/api/v5/market/history-candles",
781            Some(&params),
782            None,
783            false,
784        )
785        .await
786    }
787
788    /// Requests a list of assets (with non-zero balance), remaining balance, and available amount
789    /// in the trading account.
790    ///
791    /// # Errors
792    ///
793    /// Returns an error if the operation fails.
794    ///
795    /// # References
796    ///
797    /// <https://www.okx.com/docs-v5/en/#trading-account-rest-api-get-balance>
798    pub async fn get_balance(&self) -> Result<Vec<OKXAccount>, OKXHttpError> {
799        let path = "/api/v5/account/balance";
800        self.send_request::<_, ()>(Method::GET, path, None, None, true)
801            .await
802    }
803
804    /// Requests fee rates for the account.
805    ///
806    /// Returns fee rates for the specified instrument type and the user's VIP level.
807    ///
808    /// # Errors
809    ///
810    /// Returns an error if the operation fails.
811    ///
812    /// # References
813    ///
814    /// <https://www.okx.com/docs-v5/en/#trading-account-rest-api-get-fee-rates>
815    pub async fn get_trade_fee(
816        &self,
817        params: GetTradeFeeParams,
818    ) -> Result<Vec<OKXFeeRate>, OKXHttpError> {
819        self.send_request(
820            Method::GET,
821            "/api/v5/account/trade-fee",
822            Some(&params),
823            None,
824            true,
825        )
826        .await
827    }
828
829    /// Retrieves a single order’s details.
830    ///
831    /// # Errors
832    ///
833    /// Returns an error if the operation fails.
834    ///
835    /// # References
836    ///
837    /// <https://www.okx.com/docs-v5/en/#order-book-trading-trade-get-order>
838    pub async fn get_order(
839        &self,
840        params: GetOrderParams,
841    ) -> Result<Vec<OKXOrderHistory>, OKXHttpError> {
842        self.send_request(
843            Method::GET,
844            "/api/v5/trade/order",
845            Some(&params),
846            None,
847            true,
848        )
849        .await
850    }
851
852    /// Requests order list (pending orders).
853    ///
854    /// # Errors
855    ///
856    /// Returns an error if the operation fails.
857    ///
858    /// # References
859    ///
860    /// <https://www.okx.com/docs-v5/en/#order-book-trading-trade-get-order-list>
861    pub async fn get_orders_pending(
862        &self,
863        params: GetOrderListParams,
864    ) -> Result<Vec<OKXOrderHistory>, OKXHttpError> {
865        self.send_request(
866            Method::GET,
867            "/api/v5/trade/orders-pending",
868            Some(&params),
869            None,
870            true,
871        )
872        .await
873    }
874
875    /// Requests historical order records.
876    ///
877    /// # Errors
878    ///
879    /// Returns an error if the operation fails.
880    ///
881    /// # References
882    ///
883    /// <https://www.okx.com/docs-v5/en/#order-book-trading-trade-get-orders-history>
884    pub async fn get_orders_history(
885        &self,
886        params: GetOrderHistoryParams,
887    ) -> Result<Vec<OKXOrderHistory>, OKXHttpError> {
888        self.send_request(
889            Method::GET,
890            "/api/v5/trade/orders-history",
891            Some(&params),
892            None,
893            true,
894        )
895        .await
896    }
897
898    /// Requests pending algo orders.
899    ///
900    /// # Errors
901    ///
902    /// Returns an error if the operation fails.
903    pub async fn get_order_algo_pending(
904        &self,
905        params: GetAlgoOrdersParams,
906    ) -> Result<Vec<OKXOrderAlgo>, OKXHttpError> {
907        self.send_request(
908            Method::GET,
909            "/api/v5/trade/order-algo-pending",
910            Some(&params),
911            None,
912            true,
913        )
914        .await
915    }
916
917    /// Requests historical algo orders.
918    ///
919    /// # Errors
920    ///
921    /// Returns an error if the operation fails.
922    pub async fn get_order_algo_history(
923        &self,
924        params: GetAlgoOrdersParams,
925    ) -> Result<Vec<OKXOrderAlgo>, OKXHttpError> {
926        self.send_request(
927            Method::GET,
928            "/api/v5/trade/order-algo-history",
929            Some(&params),
930            None,
931            true,
932        )
933        .await
934    }
935
936    /// Requests transaction details (fills) for the given parameters.
937    ///
938    /// # Errors
939    ///
940    /// Returns an error if the operation fails.
941    ///
942    /// # References
943    ///
944    /// <https://www.okx.com/docs-v5/en/#order-book-trading-trade-get-transaction-details-last-3-days>
945    pub async fn get_fills(
946        &self,
947        params: GetTransactionDetailsParams,
948    ) -> Result<Vec<OKXTransactionDetail>, OKXHttpError> {
949        self.send_request(
950            Method::GET,
951            "/api/v5/trade/fills",
952            Some(&params),
953            None,
954            true,
955        )
956        .await
957    }
958
959    /// Requests information on your positions. When the account is in net mode, net positions will
960    /// be displayed, and when the account is in long/short mode, long or short positions will be
961    /// displayed. Returns in reverse chronological order using ctime.
962    ///
963    /// # Errors
964    ///
965    /// Returns an error if the operation fails.
966    ///
967    /// # References
968    ///
969    /// <https://www.okx.com/docs-v5/en/#trading-account-rest-api-get-positions>
970    pub async fn get_positions(
971        &self,
972        params: GetPositionsParams,
973    ) -> Result<Vec<OKXPosition>, OKXHttpError> {
974        self.send_request(
975            Method::GET,
976            "/api/v5/account/positions",
977            Some(&params),
978            None,
979            true,
980        )
981        .await
982    }
983
984    /// Requests closed or historical position data.
985    ///
986    /// # Errors
987    ///
988    /// Returns an error if the operation fails.
989    ///
990    /// # References
991    ///
992    /// <https://www.okx.com/docs-v5/en/#trading-account-rest-api-get-positions-history>
993    pub async fn get_positions_history(
994        &self,
995        params: GetPositionsHistoryParams,
996    ) -> Result<Vec<OKXPositionHistory>, OKXHttpError> {
997        self.send_request(
998            Method::GET,
999            "/api/v5/account/positions-history",
1000            Some(&params),
1001            None,
1002            true,
1003        )
1004        .await
1005    }
1006}
1007
1008/// Provides a higher-level HTTP client for the [OKX](https://okx.com) REST API.
1009///
1010/// This client wraps the underlying `OKXHttpInnerClient` to handle conversions
1011/// into the Nautilus domain model.
1012#[derive(Debug)]
1013#[cfg_attr(
1014    feature = "python",
1015    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.okx")
1016)]
1017pub struct OKXHttpClient {
1018    pub(crate) inner: Arc<OKXRawHttpClient>,
1019    pub(crate) instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
1020    cache_initialized: AtomicBool,
1021}
1022
1023impl Clone for OKXHttpClient {
1024    fn clone(&self) -> Self {
1025        let cache_initialized = AtomicBool::new(false);
1026
1027        let is_initialized = self.cache_initialized.load(Ordering::Acquire);
1028        if is_initialized {
1029            cache_initialized.store(true, Ordering::Release);
1030        }
1031
1032        Self {
1033            inner: self.inner.clone(),
1034            instruments_cache: self.instruments_cache.clone(),
1035            cache_initialized,
1036        }
1037    }
1038}
1039
1040impl Default for OKXHttpClient {
1041    fn default() -> Self {
1042        Self::new(None, Some(60), None, None, None, false, None)
1043            .expect("Failed to create default OKXHttpClient")
1044    }
1045}
1046
1047impl OKXHttpClient {
1048    /// Creates a new [`OKXHttpClient`] using the default OKX HTTP URL,
1049    /// optionally overridden with a custom base url.
1050    ///
1051    /// This version of the client has **no credentials**, so it can only
1052    /// call publicly accessible endpoints.
1053    ///
1054    /// # Errors
1055    ///
1056    /// Returns an error if the retry manager cannot be created.
1057    pub fn new(
1058        base_url: Option<String>,
1059        timeout_secs: Option<u64>,
1060        max_retries: Option<u32>,
1061        retry_delay_ms: Option<u64>,
1062        retry_delay_max_ms: Option<u64>,
1063        is_demo: bool,
1064        proxy_url: Option<String>,
1065    ) -> anyhow::Result<Self> {
1066        Ok(Self {
1067            inner: Arc::new(OKXRawHttpClient::new(
1068                base_url,
1069                timeout_secs,
1070                max_retries,
1071                retry_delay_ms,
1072                retry_delay_max_ms,
1073                is_demo,
1074                proxy_url,
1075            )?),
1076            instruments_cache: Arc::new(DashMap::new()),
1077            cache_initialized: AtomicBool::new(false),
1078        })
1079    }
1080
1081    /// Generates a timestamp for initialization.
1082    fn generate_ts_init(&self) -> UnixNanos {
1083        get_atomic_clock_realtime().get_time_ns()
1084    }
1085
1086    /// Creates a new authenticated [`OKXHttpClient`] using environment variables and
1087    /// the default OKX HTTP base url.
1088    ///
1089    /// # Errors
1090    ///
1091    /// Returns an error if the operation fails.
1092    pub fn from_env() -> anyhow::Result<Self> {
1093        Self::with_credentials(None, None, None, None, None, None, None, None, false, None)
1094    }
1095
1096    /// Creates a new [`OKXHttpClient`] configured with credentials
1097    /// for authenticated requests, optionally using a custom base url.
1098    ///
1099    /// # Errors
1100    ///
1101    /// Returns an error if the operation fails.
1102    #[allow(clippy::too_many_arguments)]
1103    pub fn with_credentials(
1104        api_key: Option<String>,
1105        api_secret: Option<String>,
1106        api_passphrase: Option<String>,
1107        base_url: Option<String>,
1108        timeout_secs: Option<u64>,
1109        max_retries: Option<u32>,
1110        retry_delay_ms: Option<u64>,
1111        retry_delay_max_ms: Option<u64>,
1112        is_demo: bool,
1113        proxy_url: Option<String>,
1114    ) -> anyhow::Result<Self> {
1115        let api_key = get_or_env_var(api_key, "OKX_API_KEY")?;
1116        let api_secret = get_or_env_var(api_secret, "OKX_API_SECRET")?;
1117        let api_passphrase = get_or_env_var(api_passphrase, "OKX_API_PASSPHRASE")?;
1118        let base_url = base_url.unwrap_or(OKX_HTTP_URL.to_string());
1119
1120        Ok(Self {
1121            inner: Arc::new(OKXRawHttpClient::with_credentials(
1122                api_key,
1123                api_secret,
1124                api_passphrase,
1125                base_url,
1126                timeout_secs,
1127                max_retries,
1128                retry_delay_ms,
1129                retry_delay_max_ms,
1130                is_demo,
1131                proxy_url,
1132            )?),
1133            instruments_cache: Arc::new(DashMap::new()),
1134            cache_initialized: AtomicBool::new(false),
1135        })
1136    }
1137
1138    /// Retrieves an instrument from the cache.
1139    ///
1140    /// # Errors
1141    ///
1142    /// Returns an error if the instrument is not found in the cache.
1143    fn instrument_from_cache(&self, symbol: Ustr) -> anyhow::Result<InstrumentAny> {
1144        self.instruments_cache
1145            .get(&symbol)
1146            .map(|entry| entry.value().clone())
1147            .ok_or_else(|| anyhow::anyhow!("Instrument {symbol} not in cache"))
1148    }
1149
1150    /// Cancel all pending HTTP requests.
1151    pub fn cancel_all_requests(&self) {
1152        self.inner.cancel_all_requests();
1153    }
1154
1155    /// Get the cancellation token for this client.
1156    pub fn cancellation_token(&self) -> &CancellationToken {
1157        self.inner.cancellation_token()
1158    }
1159
1160    /// Returns the base url being used by the client.
1161    pub fn base_url(&self) -> &str {
1162        self.inner.base_url.as_str()
1163    }
1164
1165    /// Returns the public API key being used by the client.
1166    pub fn api_key(&self) -> Option<&str> {
1167        self.inner.credential.as_ref().map(|c| c.api_key.as_str())
1168    }
1169
1170    /// Returns a masked version of the API key for logging purposes.
1171    #[must_use]
1172    pub fn api_key_masked(&self) -> Option<String> {
1173        self.inner.credential.as_ref().map(|c| c.api_key_masked())
1174    }
1175
1176    /// Returns whether the client is configured for demo trading.
1177    #[must_use]
1178    pub fn is_demo(&self) -> bool {
1179        self.inner.is_demo
1180    }
1181
1182    /// Requests the current server time from OKX.
1183    ///
1184    /// Returns the OKX system time as a Unix timestamp in milliseconds.
1185    ///
1186    /// # Errors
1187    ///
1188    /// Returns an error if the HTTP request fails or if the response cannot be parsed.
1189    pub async fn get_server_time(&self) -> Result<u64, OKXHttpError> {
1190        self.inner.get_server_time().await
1191    }
1192
1193    /// Checks if the client is initialized.
1194    ///
1195    /// The client is considered initialized if any instruments have been cached from the venue.
1196    #[must_use]
1197    pub fn is_initialized(&self) -> bool {
1198        self.cache_initialized.load(Ordering::Acquire)
1199    }
1200
1201    /// Returns a snapshot of all instrument symbols currently held in the
1202    /// internal cache.
1203    #[must_use]
1204    pub fn get_cached_symbols(&self) -> Vec<String> {
1205        self.instruments_cache
1206            .iter()
1207            .map(|entry| entry.key().to_string())
1208            .collect()
1209    }
1210
1211    /// Caches multiple instruments.
1212    ///
1213    /// Any existing instruments with the same symbols will be replaced.
1214    pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
1215        for inst in instruments {
1216            self.instruments_cache
1217                .insert(inst.raw_symbol().inner(), inst);
1218        }
1219        self.cache_initialized.store(true, Ordering::Release);
1220    }
1221
1222    /// Caches a single instrument.
1223    ///
1224    /// Any existing instrument with the same symbol will be replaced.
1225    pub fn cache_instrument(&self, instrument: InstrumentAny) {
1226        self.instruments_cache
1227            .insert(instrument.raw_symbol().inner(), instrument);
1228        self.cache_initialized.store(true, Ordering::Release);
1229    }
1230
1231    /// Gets an instrument from the cache by symbol.
1232    pub fn get_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
1233        self.instruments_cache
1234            .get(symbol)
1235            .map(|entry| entry.value().clone())
1236    }
1237
1238    /// Requests the account state for the `account_id` from OKX.
1239    ///
1240    /// # Errors
1241    ///
1242    /// Returns an error if the HTTP request fails or no account state is returned.
1243    pub async fn request_account_state(
1244        &self,
1245        account_id: AccountId,
1246    ) -> anyhow::Result<AccountState> {
1247        let resp = self
1248            .inner
1249            .get_balance()
1250            .await
1251            .map_err(|e| anyhow::anyhow!(e))?;
1252
1253        let ts_init = self.generate_ts_init();
1254        let raw = resp
1255            .first()
1256            .ok_or_else(|| anyhow::anyhow!("No account state returned from OKX"))?;
1257        let account_state = parse_account_state(raw, account_id, ts_init)?;
1258
1259        Ok(account_state)
1260    }
1261
1262    /// Sets the position mode for the account.
1263    ///
1264    /// Defaults to NetMode if no position mode is provided.
1265    ///
1266    /// # Errors
1267    ///
1268    /// Returns an error if the HTTP request fails or the position mode cannot be set.
1269    ///
1270    /// # Note
1271    ///
1272    /// This endpoint only works for accounts with derivatives trading enabled.
1273    /// If the account only has spot trading, this will return an error.
1274    pub async fn set_position_mode(&self, position_mode: OKXPositionMode) -> anyhow::Result<()> {
1275        let mut params = SetPositionModeParamsBuilder::default();
1276        params.pos_mode(position_mode);
1277        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1278
1279        match self.inner.set_position_mode(params).await {
1280            Ok(_) => Ok(()),
1281            Err(e) => {
1282                if let OKXHttpError::OkxError {
1283                    error_code,
1284                    message,
1285                } = &e
1286                    && error_code == "50115"
1287                {
1288                    log::warn!(
1289                        "Account does not support position mode setting (derivatives trading not enabled): {message}"
1290                    );
1291                    return Ok(()); // Gracefully handle this case
1292                }
1293                anyhow::bail!(e)
1294            }
1295        }
1296    }
1297
1298    /// Requests all instruments for the `instrument_type` from OKX.
1299    ///
1300    /// # Errors
1301    ///
1302    /// Returns an error if the HTTP request fails or instrument parsing fails.
1303    ///
1304    /// # Returns
1305    ///
1306    /// A tuple containing:
1307    /// - `Vec<InstrumentAny>`: The parsed instruments
1308    /// - `Vec<(Ustr, u64)>`: Mappings of inst_id to inst_id_code for WebSocket order operations
1309    pub async fn request_instruments(
1310        &self,
1311        instrument_type: OKXInstrumentType,
1312        instrument_family: Option<String>,
1313    ) -> anyhow::Result<(Vec<InstrumentAny>, Vec<(Ustr, u64)>)> {
1314        let mut params = GetInstrumentsParamsBuilder::default();
1315        params.inst_type(instrument_type);
1316
1317        if let Some(family) = instrument_family.clone() {
1318            params.inst_family(family);
1319        }
1320
1321        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1322
1323        let resp = self
1324            .inner
1325            .get_instruments(params)
1326            .await
1327            .map_err(|e| anyhow::anyhow!(e))?;
1328
1329        let fee_rate_opt = {
1330            let fee_params = GetTradeFeeParams {
1331                inst_type: instrument_type,
1332                uly: None,
1333                inst_family: instrument_family,
1334            };
1335
1336            match self.inner.get_trade_fee(fee_params).await {
1337                Ok(rates) => rates.into_iter().next(),
1338                Err(OKXHttpError::MissingCredentials) => {
1339                    log::debug!("Missing credentials for fee rates, using None");
1340                    None
1341                }
1342                Err(e) => {
1343                    log::warn!("Failed to fetch fee rates for {instrument_type}: {e}");
1344                    None
1345                }
1346            }
1347        };
1348
1349        let ts_init = self.generate_ts_init();
1350
1351        let mut instruments: Vec<InstrumentAny> = Vec::new();
1352        let mut inst_id_codes: Vec<(Ustr, u64)> = Vec::new();
1353
1354        for inst in &resp {
1355            // Collect inst_id_code mappings for WebSocket order operations
1356            if let Some(code) = inst.inst_id_code {
1357                inst_id_codes.push((inst.inst_id, code));
1358            }
1359            // Skip pre-open instruments which have incomplete/empty field values
1360            // Keep suspended instruments as they have valid metadata and may return to live
1361            if inst.state == OKXInstrumentStatus::Preopen {
1362                continue;
1363            }
1364
1365            // Determine which fee fields to use based on contract type
1366            // OKX fee rate convention: positive = rebate, negative = commission
1367            // Nautilus convention: negative = rebate, positive = commission
1368            // Negate to convert between conventions
1369            let (maker_fee, taker_fee) = if let Some(ref fee_rate) = fee_rate_opt {
1370                let is_usdt_margined = inst.ct_type == OKXContractType::Linear;
1371                let (maker_str, taker_str) = if is_usdt_margined {
1372                    (&fee_rate.maker_u, &fee_rate.taker_u)
1373                } else {
1374                    (&fee_rate.maker, &fee_rate.taker)
1375                };
1376
1377                let maker = if maker_str.is_empty() {
1378                    None
1379                } else {
1380                    Decimal::from_str(maker_str).ok().map(|v| -v)
1381                };
1382                let taker = if taker_str.is_empty() {
1383                    None
1384                } else {
1385                    Decimal::from_str(taker_str).ok().map(|v| -v)
1386                };
1387
1388                (maker, taker)
1389            } else {
1390                (None, None)
1391            };
1392
1393            match parse_instrument_any(inst, None, None, maker_fee, taker_fee, ts_init) {
1394                Ok(Some(instrument_any)) => {
1395                    instruments.push(instrument_any);
1396                }
1397                Ok(None) => {
1398                    // Unsupported instrument type, skip silently
1399                }
1400                Err(e) => {
1401                    log::warn!("Failed to parse instrument {}: {e}", inst.inst_id);
1402                }
1403            }
1404        }
1405
1406        Ok((instruments, inst_id_codes))
1407    }
1408
1409    /// Requests a single instrument by `instrument_id` from OKX.
1410    ///
1411    /// Fetches the instrument from the API, caches it, and returns it.
1412    ///
1413    /// # Errors
1414    ///
1415    /// This function will return an error if:
1416    /// - The API request fails.
1417    /// - The instrument is not found.
1418    /// - Failed to parse instrument data.
1419    pub async fn request_instrument(
1420        &self,
1421        instrument_id: InstrumentId,
1422    ) -> anyhow::Result<InstrumentAny> {
1423        let symbol = instrument_id.symbol.as_str();
1424        let instrument_type = okx_instrument_type_from_symbol(symbol);
1425
1426        let mut params = GetInstrumentsParamsBuilder::default();
1427        params.inst_type(instrument_type);
1428        params.inst_id(symbol);
1429
1430        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1431
1432        let resp = self
1433            .inner
1434            .get_instruments(params)
1435            .await
1436            .map_err(|e| anyhow::anyhow!(e))?;
1437
1438        let raw_inst = resp
1439            .first()
1440            .ok_or_else(|| anyhow::anyhow!("Instrument {symbol} not found"))?;
1441
1442        // Skip pre-open instruments which have incomplete/empty field values
1443        if raw_inst.state == OKXInstrumentStatus::Preopen {
1444            anyhow::bail!("Instrument {symbol} is in pre-open state");
1445        }
1446
1447        let fee_rate_opt = {
1448            let fee_params = GetTradeFeeParams {
1449                inst_type: instrument_type,
1450                uly: None,
1451                inst_family: None,
1452            };
1453
1454            match self.inner.get_trade_fee(fee_params).await {
1455                Ok(rates) => rates.into_iter().next(),
1456                Err(OKXHttpError::MissingCredentials) => {
1457                    log::debug!("Missing credentials for fee rates, using None");
1458                    None
1459                }
1460                Err(e) => {
1461                    log::warn!("Failed to fetch fee rates for {symbol}: {e}");
1462                    None
1463                }
1464            }
1465        };
1466
1467        // OKX fee rate convention: positive = rebate, negative = commission
1468        // Nautilus convention: negative = rebate, positive = commission
1469        // Negate to convert between conventions
1470        let (maker_fee, taker_fee) = if let Some(ref fee_rate) = fee_rate_opt {
1471            let is_usdt_margined = raw_inst.ct_type == OKXContractType::Linear;
1472            let (maker_str, taker_str) = if is_usdt_margined {
1473                (&fee_rate.maker_u, &fee_rate.taker_u)
1474            } else {
1475                (&fee_rate.maker, &fee_rate.taker)
1476            };
1477
1478            let maker = if maker_str.is_empty() {
1479                None
1480            } else {
1481                Decimal::from_str(maker_str).ok().map(|v| -v)
1482            };
1483            let taker = if taker_str.is_empty() {
1484                None
1485            } else {
1486                Decimal::from_str(taker_str).ok().map(|v| -v)
1487            };
1488
1489            (maker, taker)
1490        } else {
1491            (None, None)
1492        };
1493
1494        let ts_init = self.generate_ts_init();
1495        let instrument = parse_instrument_any(raw_inst, None, None, maker_fee, taker_fee, ts_init)?
1496            .ok_or_else(|| anyhow::anyhow!("Unsupported instrument type for {symbol}"))?;
1497
1498        self.cache_instrument(instrument.clone());
1499
1500        Ok(instrument)
1501    }
1502
1503    /// Requests the latest mark price for the `instrument_type` from OKX.
1504    ///
1505    /// # Errors
1506    ///
1507    /// Returns an error if the HTTP request fails or no mark price is returned.
1508    pub async fn request_mark_price(
1509        &self,
1510        instrument_id: InstrumentId,
1511    ) -> anyhow::Result<MarkPriceUpdate> {
1512        let mut params = GetMarkPriceParamsBuilder::default();
1513        params.inst_id(instrument_id.symbol.inner());
1514        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1515
1516        let resp = self
1517            .inner
1518            .get_mark_price(params)
1519            .await
1520            .map_err(|e| anyhow::anyhow!(e))?;
1521
1522        let raw = resp
1523            .first()
1524            .ok_or_else(|| anyhow::anyhow!("No mark price returned from OKX"))?;
1525        let inst = self.instrument_from_cache(instrument_id.symbol.inner())?;
1526        let ts_init = self.generate_ts_init();
1527
1528        let mark_price =
1529            parse_mark_price_update(raw, instrument_id, inst.price_precision(), ts_init)
1530                .map_err(|e| anyhow::anyhow!(e))?;
1531        Ok(mark_price)
1532    }
1533
1534    /// Requests the latest index price for the `instrument_id` from OKX.
1535    ///
1536    /// # Errors
1537    ///
1538    /// Returns an error if the HTTP request fails or no index price is returned.
1539    pub async fn request_index_price(
1540        &self,
1541        instrument_id: InstrumentId,
1542    ) -> anyhow::Result<IndexPriceUpdate> {
1543        let mut params = GetIndexTickerParamsBuilder::default();
1544        params.inst_id(instrument_id.symbol.inner());
1545        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1546
1547        let resp = self
1548            .inner
1549            .get_index_tickers(params)
1550            .await
1551            .map_err(|e| anyhow::anyhow!(e))?;
1552
1553        let raw = resp
1554            .first()
1555            .ok_or_else(|| anyhow::anyhow!("No index price returned from OKX"))?;
1556        let inst = self.instrument_from_cache(instrument_id.symbol.inner())?;
1557        let ts_init = self.generate_ts_init();
1558
1559        let index_price =
1560            parse_index_price_update(raw, instrument_id, inst.price_precision(), ts_init)
1561                .map_err(|e| anyhow::anyhow!(e))?;
1562        Ok(index_price)
1563    }
1564
1565    /// Requests trades for the `instrument_id` and `start` -> `end` time range.
1566    ///
1567    /// # Errors
1568    ///
1569    /// Returns an error if the HTTP request fails or trade parsing fails.
1570    ///
1571    /// # Panics
1572    ///
1573    /// Panics if the API returns an empty response when debug logging is enabled.
1574    pub async fn request_trades(
1575        &self,
1576        instrument_id: InstrumentId,
1577        start: Option<DateTime<Utc>>,
1578        end: Option<DateTime<Utc>>,
1579        limit: Option<u32>,
1580    ) -> anyhow::Result<Vec<TradeTick>> {
1581        const OKX_TRADES_MAX_LIMIT: u32 = 100;
1582        const MAX_PAGES: usize = 500;
1583        const MAX_CONSECUTIVE_EMPTY: usize = 3;
1584
1585        #[derive(Clone, Copy, Debug, PartialEq, Eq)]
1586        enum Mode {
1587            Latest,
1588            Backward,
1589            Range,
1590        }
1591
1592        let limit = if limit == Some(0) { None } else { limit };
1593
1594        if let (Some(s), Some(e)) = (start, end) {
1595            anyhow::ensure!(s < e, "Invalid time range: start={s:?} end={e:?}");
1596        }
1597
1598        let now = Utc::now();
1599
1600        if let Some(s) = start
1601            && s > now
1602        {
1603            return Ok(Vec::new());
1604        }
1605
1606        let end = if let Some(e) = end
1607            && e > now
1608        {
1609            Some(now)
1610        } else {
1611            end
1612        };
1613
1614        let mode = match (start, end) {
1615            (None, None) => Mode::Latest,
1616            (Some(_), None) => Mode::Backward,
1617            (None, Some(_)) => Mode::Backward,
1618            (Some(_), Some(_)) => Mode::Range,
1619        };
1620
1621        let start_ms = start.map(|s| s.timestamp_millis());
1622        let end_ms = end.map(|e| e.timestamp_millis());
1623
1624        let ts_init = self.generate_ts_init();
1625        let inst = self.instrument_from_cache(instrument_id.symbol.inner())?;
1626
1627        // Historical pagination walks backwards using trade IDs, OKX does not honour timestamps for
1628        // standalone `before` requests (type=2)
1629        if matches!(mode, Mode::Backward | Mode::Range) {
1630            let mut before_trade_id: Option<String> = None;
1631            let mut pages = 0usize;
1632            let mut page_results: Vec<Vec<TradeTick>> = Vec::new();
1633            let mut seen_trades: std::collections::HashSet<(String, i64)> =
1634                std::collections::HashSet::new();
1635            let mut unique_count = 0usize;
1636            let mut consecutive_empty_pages = 0usize;
1637
1638            // Only apply default limit when there's no start boundary
1639            // (start provides a natural stopping point, end alone allows infinite backward pagination)
1640            let effective_limit = if start.is_some() {
1641                limit.unwrap_or(u32::MAX)
1642            } else {
1643                limit.unwrap_or(OKX_TRADES_MAX_LIMIT)
1644            };
1645
1646            log::debug!(
1647                "Starting trades pagination: mode={mode:?}, start={start:?}, end={end:?}, limit={limit:?}, effective_limit={effective_limit}"
1648            );
1649
1650            loop {
1651                if pages >= MAX_PAGES {
1652                    log::warn!("Hit MAX_PAGES limit of {MAX_PAGES}");
1653                    break;
1654                }
1655
1656                if effective_limit < u32::MAX && unique_count >= effective_limit as usize {
1657                    log::debug!("Reached effective limit: unique_count={unique_count}");
1658                    break;
1659                }
1660
1661                let remaining = (effective_limit as usize).saturating_sub(unique_count);
1662                let page_cap = remaining.min(OKX_TRADES_MAX_LIMIT as usize) as u32;
1663
1664                log::debug!(
1665                    "Requesting page {}: before_id={:?}, page_cap={}, unique_count={}",
1666                    pages + 1,
1667                    before_trade_id,
1668                    page_cap,
1669                    unique_count
1670                );
1671
1672                let mut params_builder = GetTradesParamsBuilder::default();
1673                params_builder
1674                    .inst_id(instrument_id.symbol.inner())
1675                    .limit(page_cap)
1676                    .pagination_type(1);
1677
1678                // Use 'after' to get older trades (OKX API: after=cursor means < cursor)
1679                if let Some(ref before_id) = before_trade_id {
1680                    params_builder.after(before_id.clone());
1681                }
1682
1683                let params = params_builder.build().map_err(anyhow::Error::new)?;
1684                let raw = self
1685                    .inner
1686                    .get_history_trades(params)
1687                    .await
1688                    .map_err(anyhow::Error::new)?;
1689
1690                log::debug!("Received {} raw trades from API", raw.len());
1691
1692                if !raw.is_empty() {
1693                    let first_id = &raw.first().unwrap().trade_id;
1694                    let last_id = &raw.last().unwrap().trade_id;
1695                    log::debug!(
1696                        "Raw response trade ID range: first={first_id} (newest), last={last_id} (oldest)"
1697                    );
1698                }
1699
1700                if raw.is_empty() {
1701                    log::debug!("API returned empty page, stopping pagination");
1702                    break;
1703                }
1704
1705                pages += 1;
1706
1707                let mut page_trades: Vec<TradeTick> = Vec::with_capacity(raw.len());
1708                let mut hit_start_boundary = false;
1709                let mut filtered_out = 0usize;
1710                let mut duplicates = 0usize;
1711
1712                for r in &raw {
1713                    match parse_trade_tick(
1714                        r,
1715                        instrument_id,
1716                        inst.price_precision(),
1717                        inst.size_precision(),
1718                        ts_init,
1719                    ) {
1720                        Ok(trade) => {
1721                            let ts_ms = trade.ts_event.as_i64() / 1_000_000;
1722
1723                            if let Some(e_ms) = end_ms
1724                                && ts_ms > e_ms
1725                            {
1726                                filtered_out += 1;
1727                                continue;
1728                            }
1729
1730                            if let Some(s_ms) = start_ms
1731                                && ts_ms < s_ms
1732                            {
1733                                hit_start_boundary = true;
1734                                filtered_out += 1;
1735                                break;
1736                            }
1737
1738                            let trade_key = (trade.trade_id.to_string(), trade.ts_event.as_i64());
1739                            if seen_trades.insert(trade_key) {
1740                                unique_count += 1;
1741                                page_trades.push(trade);
1742                            } else {
1743                                duplicates += 1;
1744                            }
1745                        }
1746                        Err(e) => log::error!("{e}"),
1747                    }
1748                }
1749
1750                log::debug!(
1751                    "Page {} processed: {} trades kept, {} filtered out, {} duplicates, hit_start_boundary={}",
1752                    pages,
1753                    page_trades.len(),
1754                    filtered_out,
1755                    duplicates,
1756                    hit_start_boundary
1757                );
1758
1759                // Extract oldest unique trade ID for next page cursor
1760                let oldest_trade_id = if page_trades.is_empty() {
1761                    // Only apply consecutive empty guard if we've already collected some trades
1762                    // This allows historical backfills to paginate through empty prelude
1763                    if unique_count > 0 {
1764                        consecutive_empty_pages += 1;
1765                        if consecutive_empty_pages >= MAX_CONSECUTIVE_EMPTY {
1766                            log::debug!(
1767                                "Stopping: {consecutive_empty_pages} consecutive pages with no trades in range after collecting {unique_count} trades"
1768                            );
1769                            break;
1770                        }
1771                    }
1772                    // No unique trades on page, use raw response for cursor
1773                    raw.last().map(|t| {
1774                        let id = t.trade_id.to_string();
1775                        log::debug!(
1776                            "Setting cursor from raw response (no unique trades): oldest_id={id}"
1777                        );
1778                        id
1779                    })
1780                } else {
1781                    // Use oldest deduplicated trade ID before reversing
1782                    let oldest_id = page_trades.last().map(|t| {
1783                        let id = t.trade_id.to_string();
1784                        log::debug!(
1785                            "Setting cursor from deduplicated trades: oldest_id={}, ts_event={}",
1786                            id,
1787                            t.ts_event.as_i64()
1788                        );
1789                        id
1790                    });
1791                    page_trades.reverse();
1792                    page_results.push(page_trades);
1793                    consecutive_empty_pages = 0;
1794                    oldest_id
1795                };
1796
1797                if let Some(ref old_id) = before_trade_id
1798                    && oldest_trade_id.as_ref() == Some(old_id)
1799                {
1800                    break;
1801                }
1802
1803                if oldest_trade_id.is_none() {
1804                    break;
1805                }
1806
1807                before_trade_id = oldest_trade_id;
1808
1809                if hit_start_boundary {
1810                    break;
1811                }
1812
1813                tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
1814            }
1815
1816            log::debug!(
1817                "Pagination complete: {pages} pages, {unique_count} unique trades collected"
1818            );
1819
1820            let mut out: Vec<TradeTick> = Vec::new();
1821            for page in page_results.into_iter().rev() {
1822                out.extend(page);
1823            }
1824
1825            // Deduplicate by (trade_id, ts_event) composite key
1826            let mut dedup_keys = std::collections::HashSet::new();
1827            let pre_dedup_len = out.len();
1828            out.retain(|trade| {
1829                dedup_keys.insert((trade.trade_id.to_string(), trade.ts_event.as_i64()))
1830            });
1831
1832            if out.len() < pre_dedup_len {
1833                log::debug!(
1834                    "Removed {} duplicate trades during final dedup",
1835                    pre_dedup_len - out.len()
1836                );
1837            }
1838
1839            if let Some(lim) = limit
1840                && lim > 0
1841                && out.len() > lim as usize
1842            {
1843                let excess = out.len() - lim as usize;
1844                log::debug!("Trimming {excess} oldest trades to respect limit={lim}");
1845                out.drain(0..excess);
1846            }
1847
1848            log::debug!("Returning {} trades", out.len());
1849            return Ok(out);
1850        }
1851
1852        let req_limit = limit
1853            .unwrap_or(OKX_TRADES_MAX_LIMIT)
1854            .min(OKX_TRADES_MAX_LIMIT);
1855        let params = GetTradesParamsBuilder::default()
1856            .inst_id(instrument_id.symbol.inner())
1857            .limit(req_limit)
1858            .build()
1859            .map_err(anyhow::Error::new)?;
1860
1861        let raw = self
1862            .inner
1863            .get_history_trades(params)
1864            .await
1865            .map_err(anyhow::Error::new)?;
1866
1867        let mut trades: Vec<TradeTick> = Vec::with_capacity(raw.len());
1868        for r in &raw {
1869            match parse_trade_tick(
1870                r,
1871                instrument_id,
1872                inst.price_precision(),
1873                inst.size_precision(),
1874                ts_init,
1875            ) {
1876                Ok(trade) => trades.push(trade),
1877                Err(e) => log::error!("{e}"),
1878            }
1879        }
1880
1881        // OKX returns newest-first, reverse to oldest-first
1882        trades.reverse();
1883
1884        if let Some(lim) = limit
1885            && lim > 0
1886            && trades.len() > lim as usize
1887        {
1888            trades.drain(0..trades.len() - lim as usize);
1889        }
1890
1891        Ok(trades)
1892    }
1893
1894    /// Requests historical bars for the given bar type and time range.
1895    ///
1896    /// The aggregation source must be `EXTERNAL`. Time range validation ensures start < end.
1897    /// Returns bars sorted oldest to newest.
1898    ///
1899    /// # Errors
1900    ///
1901    /// Returns an error if the request fails.
1902    ///
1903    /// # Endpoint Selection
1904    ///
1905    /// The OKX API has different endpoints with different limits:
1906    /// - Regular endpoint (`/api/v5/market/candles`): ≤ 300 rows/call, ≤ 40 req/2s
1907    ///   - Used when: start is None OR age ≤ 100 days
1908    /// - History endpoint (`/api/v5/market/history-candles`): ≤ 100 rows/call, ≤ 20 req/2s
1909    ///   - Used when: start is Some AND age > 100 days
1910    ///
1911    /// Age is calculated as `Utc::now() - start` at the time of the first request.
1912    ///
1913    /// # Supported Aggregations
1914    ///
1915    /// Maps to OKX bar query parameter:
1916    /// - `Second` → `{n}s`
1917    /// - `Minute` → `{n}m`
1918    /// - `Hour` → `{n}H`
1919    /// - `Day` → `{n}D`
1920    /// - `Week` → `{n}W`
1921    /// - `Month` → `{n}M`
1922    ///
1923    /// # Pagination
1924    ///
1925    /// - Uses `before` parameter for backwards pagination
1926    /// - Pages backwards from end time (or now) to start time
1927    /// - Stops when: limit reached, time window covered, or API returns empty
1928    /// - Rate limit safety: ≥ 50ms between requests
1929    ///
1930    /// # Panics
1931    ///
1932    /// May panic if internal data structures are in an unexpected state.
1933    ///
1934    /// # References
1935    ///
1936    /// - <https://tr.okx.com/docs-v5/en/#order-book-trading-market-data-get-candlesticks>
1937    /// - <https://tr.okx.com/docs-v5/en/#order-book-trading-market-data-get-candlesticks-history>
1938    pub async fn request_bars(
1939        &self,
1940        bar_type: BarType,
1941        start: Option<DateTime<Utc>>,
1942        mut end: Option<DateTime<Utc>>,
1943        limit: Option<u32>,
1944    ) -> anyhow::Result<Vec<Bar>> {
1945        const HISTORY_SPLIT_DAYS: i64 = 100;
1946        const MAX_PAGES_SOFT: usize = 500;
1947
1948        #[derive(Clone, Copy, Debug, PartialEq, Eq)]
1949        enum Mode {
1950            Latest,
1951            Backward,
1952            Range,
1953        }
1954
1955        let limit = if limit == Some(0) { None } else { limit };
1956
1957        anyhow::ensure!(
1958            bar_type.aggregation_source() == AggregationSource::External,
1959            "Only EXTERNAL aggregation is supported"
1960        );
1961
1962        if let (Some(s), Some(e)) = (start, end) {
1963            anyhow::ensure!(s < e, "Invalid time range: start={s:?} end={e:?}");
1964        }
1965
1966        let now = Utc::now();
1967
1968        if let Some(s) = start
1969            && s > now
1970        {
1971            return Ok(Vec::new());
1972        }
1973        if let Some(e) = end
1974            && e > now
1975        {
1976            end = Some(now);
1977        }
1978
1979        let spec = bar_type.spec();
1980        let step = spec.step.get();
1981        let bar_param = match spec.aggregation {
1982            BarAggregation::Second => format!("{step}s"),
1983            BarAggregation::Minute => format!("{step}m"),
1984            BarAggregation::Hour => format!("{step}H"),
1985            BarAggregation::Day => format!("{step}D"),
1986            BarAggregation::Week => format!("{step}W"),
1987            BarAggregation::Month => format!("{step}M"),
1988            a => anyhow::bail!("OKX does not support {a:?} aggregation"),
1989        };
1990
1991        let slot_ms: i64 = match spec.aggregation {
1992            BarAggregation::Second => (step as i64) * 1_000,
1993            BarAggregation::Minute => (step as i64) * 60_000,
1994            BarAggregation::Hour => (step as i64) * 3_600_000,
1995            BarAggregation::Day => (step as i64) * 86_400_000,
1996            BarAggregation::Week => (step as i64) * 7 * 86_400_000,
1997            BarAggregation::Month => (step as i64) * 30 * 86_400_000,
1998            _ => unreachable!("Unsupported aggregation should have been caught above"),
1999        };
2000        let slot_ns: i64 = slot_ms * 1_000_000;
2001
2002        let mode = match (start, end) {
2003            (None, None) => Mode::Latest,
2004            (Some(_), None) => Mode::Backward, // Changed: when only start is provided, work backward from now
2005            (None, Some(_)) => Mode::Backward,
2006            (Some(_), Some(_)) => Mode::Range,
2007        };
2008
2009        let start_ns = start.and_then(|s| s.timestamp_nanos_opt());
2010        let end_ns = end.and_then(|e| e.timestamp_nanos_opt());
2011
2012        // Floor start and ceiling end to bar boundaries for cleaner API requests
2013        let start_ms = start.map(|s| {
2014            let ms = s.timestamp_millis();
2015            if slot_ms > 0 {
2016                (ms / slot_ms) * slot_ms // Floor to nearest bar boundary
2017            } else {
2018                ms
2019            }
2020        });
2021        let end_ms = end.map(|e| {
2022            let ms = e.timestamp_millis();
2023            if slot_ms > 0 {
2024                ((ms + slot_ms - 1) / slot_ms) * slot_ms // Ceiling to nearest bar boundary
2025            } else {
2026                ms
2027            }
2028        });
2029        let now_ms = now.timestamp_millis();
2030
2031        let symbol = bar_type.instrument_id().symbol;
2032        let inst = self.instrument_from_cache(symbol.inner())?;
2033
2034        let mut out: Vec<Bar> = Vec::new();
2035        let mut pages = 0usize;
2036
2037        // IMPORTANT: OKX API has COUNTER-INTUITIVE semantics (same for bars and trades):
2038        // - after=X returns records with timestamp < X (upper bound, despite the name!)
2039        // - before=X returns records with timestamp > X (lower bound, despite the name!)
2040        // For Range [start, end], use: before=start (lower bound), after=end (upper bound)
2041        let mut after_ms: Option<i64> = match mode {
2042            Mode::Range => end_ms.or(Some(now_ms)), // Upper bound: bars < end
2043            _ => None,
2044        };
2045        let mut before_ms: Option<i64> = match mode {
2046            Mode::Backward => end_ms.map(|v| v.saturating_sub(1)),
2047            Mode::Range => start_ms, // Lower bound: bars > start
2048            Mode::Latest => None,
2049        };
2050
2051        // For Range mode, we'll paginate backwards like Backward mode
2052        let mut forward_prepend_mode = matches!(mode, Mode::Range);
2053
2054        // Adjust before_ms to ensure we get data from the API
2055        // OKX API might not have bars for the very recent past
2056        // This handles both explicit end=now and the actor layer setting end=now when it's None
2057        if matches!(mode, Mode::Backward | Mode::Range)
2058            && let Some(b) = before_ms
2059        {
2060            // OKX endpoints have different data availability windows:
2061            // - Regular endpoint: has most recent data but limited depth
2062            // - History endpoint: has deep history but lags behind current time
2063            // Use a small buffer to avoid the "dead zone"
2064            let buffer_ms = slot_ms.max(60_000); // At least 1 minute or 1 bar
2065            if b >= now_ms.saturating_sub(buffer_ms) {
2066                before_ms = Some(now_ms.saturating_sub(buffer_ms));
2067            }
2068        }
2069
2070        let mut have_latest_first_page = false;
2071        let mut progressless_loops = 0u8;
2072
2073        loop {
2074            if let Some(lim) = limit
2075                && lim > 0
2076                && out.len() >= lim as usize
2077            {
2078                break;
2079            }
2080            if pages >= MAX_PAGES_SOFT {
2081                break;
2082            }
2083
2084            let pivot_ms = if let Some(a) = after_ms {
2085                a
2086            } else if let Some(b) = before_ms {
2087                b
2088            } else {
2089                now_ms
2090            };
2091            // Choose endpoint based on how old the data is:
2092            // - Use regular endpoint for recent data (< 1 hour old)
2093            // - Use history endpoint for older data (> 1 hour old)
2094            // This avoids the "gap" where history endpoint has no recent data
2095            // and regular endpoint has limited depth
2096            let age_ms = now_ms.saturating_sub(pivot_ms);
2097            let age_hours = age_ms / (60 * 60 * 1000);
2098            let using_history = age_hours > 1; // Use history if data is > 1 hour old
2099
2100            let page_ceiling = if using_history { 100 } else { 300 };
2101            let remaining = limit
2102                .filter(|&l| l > 0) // Treat limit=0 as no limit
2103                .map_or(page_ceiling, |l| (l as usize).saturating_sub(out.len()));
2104            let page_cap = remaining.min(page_ceiling);
2105
2106            let mut p = GetCandlesticksParamsBuilder::default();
2107            p.inst_id(symbol.as_str())
2108                .bar(&bar_param)
2109                .limit(page_cap as u32);
2110
2111            // Track whether this planned request uses BEFORE or AFTER.
2112            let mut req_used_before = false;
2113
2114            match mode {
2115                Mode::Latest => {
2116                    if have_latest_first_page && let Some(b) = before_ms {
2117                        p.before_ms(b);
2118                        req_used_before = true;
2119                    }
2120                }
2121                Mode::Backward => {
2122                    // Use 'after' to get older bars (OKX API: after=cursor means < cursor)
2123                    if let Some(b) = before_ms {
2124                        p.after_ms(b);
2125                    }
2126                }
2127                Mode::Range => {
2128                    // For Range mode, use both after and before to specify the full range
2129                    // This is much more efficient than pagination
2130                    if let Some(a) = after_ms {
2131                        p.after_ms(a);
2132                    }
2133                    if let Some(b) = before_ms {
2134                        p.before_ms(b);
2135                        req_used_before = true;
2136                    }
2137                }
2138            }
2139
2140            let params = p.build().map_err(anyhow::Error::new)?;
2141
2142            let mut raw = if using_history {
2143                self.inner
2144                    .get_history_candles(params.clone())
2145                    .await
2146                    .map_err(anyhow::Error::new)?
2147            } else {
2148                self.inner
2149                    .get_candles(params.clone())
2150                    .await
2151                    .map_err(anyhow::Error::new)?
2152            };
2153
2154            // --- Fallbacks on empty page ---
2155            if raw.is_empty() {
2156                // LATEST: retry same cursor via history, then step back a page-interval before giving up
2157                if matches!(mode, Mode::Latest)
2158                    && have_latest_first_page
2159                    && !using_history
2160                    && let Some(b) = before_ms
2161                {
2162                    let mut p2 = GetCandlesticksParamsBuilder::default();
2163                    p2.inst_id(symbol.as_str())
2164                        .bar(&bar_param)
2165                        .limit(page_cap as u32);
2166                    p2.before_ms(b);
2167                    let params2 = p2.build().map_err(anyhow::Error::new)?;
2168                    let raw2 = self
2169                        .inner
2170                        .get_history_candles(params2)
2171                        .await
2172                        .map_err(anyhow::Error::new)?;
2173                    if raw2.is_empty() {
2174                        // Step back one page interval and retry loop
2175                        let jump = (page_cap as i64).saturating_mul(slot_ms.max(1));
2176                        before_ms = Some(b.saturating_sub(jump));
2177                        progressless_loops = progressless_loops.saturating_add(1);
2178                        if progressless_loops >= 3 {
2179                            break;
2180                        }
2181                        continue;
2182                    } else {
2183                        raw = raw2;
2184                    }
2185                }
2186
2187                // Range mode doesn't need special bootstrap - it uses the normal flow with before_ms set
2188
2189                // If still empty: for Range after first page, try a single backstep window using BEFORE
2190                if raw.is_empty() && matches!(mode, Mode::Range) && pages > 0 {
2191                    let backstep_ms = (page_cap as i64).saturating_mul(slot_ms.max(1));
2192                    let pivot_back = after_ms.unwrap_or(now_ms).saturating_sub(backstep_ms);
2193
2194                    let mut p2 = GetCandlesticksParamsBuilder::default();
2195                    p2.inst_id(symbol.as_str())
2196                        .bar(&bar_param)
2197                        .limit(page_cap as u32)
2198                        .before_ms(pivot_back);
2199                    let params2 = p2.build().map_err(anyhow::Error::new)?;
2200                    let raw2 = if (now_ms.saturating_sub(pivot_back)) / (24 * 60 * 60 * 1000)
2201                        > HISTORY_SPLIT_DAYS
2202                    {
2203                        self.inner.get_history_candles(params2).await
2204                    } else {
2205                        self.inner.get_candles(params2).await
2206                    }
2207                    .map_err(anyhow::Error::new)?;
2208                    if raw2.is_empty() {
2209                        break;
2210                    } else {
2211                        raw = raw2;
2212                        forward_prepend_mode = true;
2213                        req_used_before = true;
2214                    }
2215                }
2216
2217                // First LATEST page empty: jump back >100d to force history, then continue loop
2218                if raw.is_empty()
2219                    && matches!(mode, Mode::Latest)
2220                    && !have_latest_first_page
2221                    && !using_history
2222                {
2223                    let jump_days_ms = (HISTORY_SPLIT_DAYS + 1) * 86_400_000;
2224                    before_ms = Some(now_ms.saturating_sub(jump_days_ms));
2225                    have_latest_first_page = true;
2226                    continue;
2227                }
2228
2229                // Still empty for any other case? Just break.
2230                if raw.is_empty() {
2231                    break;
2232                }
2233            }
2234            // --- end fallbacks ---
2235
2236            pages += 1;
2237
2238            // Parse, oldest → newest
2239            let ts_init = self.generate_ts_init();
2240            let mut page: Vec<Bar> = Vec::with_capacity(raw.len());
2241            for r in &raw {
2242                page.push(parse_candlestick(
2243                    r,
2244                    bar_type,
2245                    inst.price_precision(),
2246                    inst.size_precision(),
2247                    ts_init,
2248                )?);
2249            }
2250            page.reverse();
2251
2252            let page_oldest_ms = page.first().map(|b| b.ts_event.as_i64() / 1_000_000);
2253            let page_newest_ms = page.last().map(|b| b.ts_event.as_i64() / 1_000_000);
2254
2255            // Range filter (inclusive)
2256            // For Range mode, if we have no bars yet and this is an early page,
2257            // be more tolerant with the start boundary to handle gaps in data
2258            let mut filtered: Vec<Bar> = if matches!(mode, Mode::Range)
2259                && out.is_empty()
2260                && pages < 2
2261            {
2262                // On first pages of Range mode with no data yet, include the most recent bar
2263                // even if it's slightly before our start time (within 2 bar periods)
2264                // BUT we want ALL bars in the page that are within our range
2265                let tolerance_ns = slot_ns * 2; // Allow up to 2 bar periods before start
2266
2267                // Debug: log the page range
2268                if !page.is_empty() {
2269                    log::debug!(
2270                        "Range mode bootstrap page: {} bars from {} to {}, filtering with start={:?} end={:?}",
2271                        page.len(),
2272                        page.first().unwrap().ts_event.as_i64() / 1_000_000,
2273                        page.last().unwrap().ts_event.as_i64() / 1_000_000,
2274                        start_ms,
2275                        end_ms
2276                    );
2277                }
2278
2279                let result: Vec<Bar> = page
2280                    .clone()
2281                    .into_iter()
2282                    .filter(|b| {
2283                        let ts = b.ts_event.as_i64();
2284                        // Accept bars from (start - tolerance) to end
2285                        let ok_after =
2286                            start_ns.is_none_or(|sns| ts >= sns.saturating_sub(tolerance_ns));
2287                        let ok_before = end_ns.is_none_or(|ens| ts <= ens);
2288                        ok_after && ok_before
2289                    })
2290                    .collect();
2291
2292                result
2293            } else {
2294                // Normal filtering
2295                page.clone()
2296                    .into_iter()
2297                    .filter(|b| {
2298                        let ts = b.ts_event.as_i64();
2299                        let ok_after = start_ns.is_none_or(|sns| ts >= sns);
2300                        let ok_before = end_ns.is_none_or(|ens| ts <= ens);
2301                        ok_after && ok_before
2302                    })
2303                    .collect()
2304            };
2305
2306            if !page.is_empty() && filtered.is_empty() {
2307                // For Range mode, if all bars are before our start time, there's no point continuing
2308                if matches!(mode, Mode::Range)
2309                    && !forward_prepend_mode
2310                    && let (Some(newest_ms), Some(start_ms)) = (page_newest_ms, start_ms)
2311                    && newest_ms < start_ms.saturating_sub(slot_ms * 2)
2312                {
2313                    // Bars are too old (more than 2 bar periods before start), stop
2314                    break;
2315                }
2316            }
2317
2318            // Track contribution for progress guard
2319            let contribution;
2320
2321            if out.is_empty() {
2322                contribution = filtered.len();
2323                out = filtered;
2324            } else {
2325                match mode {
2326                    Mode::Backward | Mode::Latest => {
2327                        if let Some(first) = out.first() {
2328                            filtered.retain(|b| b.ts_event < first.ts_event);
2329                        }
2330                        contribution = filtered.len();
2331                        if contribution != 0 {
2332                            let mut new_out = Vec::with_capacity(out.len() + filtered.len());
2333                            new_out.extend_from_slice(&filtered);
2334                            new_out.extend_from_slice(&out);
2335                            out = new_out;
2336                        }
2337                    }
2338                    Mode::Range => {
2339                        if forward_prepend_mode || req_used_before {
2340                            // We are backfilling older pages: prepend them.
2341                            if let Some(first) = out.first() {
2342                                filtered.retain(|b| b.ts_event < first.ts_event);
2343                            }
2344                            contribution = filtered.len();
2345                            if contribution != 0 {
2346                                let mut new_out = Vec::with_capacity(out.len() + filtered.len());
2347                                new_out.extend_from_slice(&filtered);
2348                                new_out.extend_from_slice(&out);
2349                                out = new_out;
2350                            }
2351                        } else {
2352                            // Normal forward: append newer pages.
2353                            if let Some(last) = out.last() {
2354                                filtered.retain(|b| b.ts_event > last.ts_event);
2355                            }
2356                            contribution = filtered.len();
2357                            out.extend(filtered);
2358                        }
2359                    }
2360                }
2361            }
2362
2363            // Duplicate-window mitigation for Latest/Backward/Range
2364            if contribution == 0
2365                && matches!(mode, Mode::Latest | Mode::Backward | Mode::Range)
2366                && let Some(b) = before_ms
2367            {
2368                let jump = (page_cap as i64).saturating_mul(slot_ms.max(1));
2369                let new_b = b.saturating_sub(jump);
2370                if new_b != b {
2371                    before_ms = Some(new_b);
2372                }
2373            }
2374
2375            if contribution == 0 {
2376                progressless_loops = progressless_loops.saturating_add(1);
2377                if progressless_loops >= 3 {
2378                    break;
2379                }
2380            } else {
2381                progressless_loops = 0;
2382
2383                // Advance cursors only when we made progress
2384                match mode {
2385                    Mode::Latest | Mode::Backward => {
2386                        if let Some(oldest) = page_oldest_ms {
2387                            before_ms = Some(oldest.saturating_sub(1));
2388                            have_latest_first_page = true;
2389                        } else {
2390                            break;
2391                        }
2392                    }
2393                    Mode::Range => {
2394                        if forward_prepend_mode || req_used_before {
2395                            if let Some(oldest) = page_oldest_ms {
2396                                // Move back by at least one bar period to avoid getting the same data
2397                                let jump_back = slot_ms.max(60_000); // At least 1 minute
2398                                before_ms = Some(oldest.saturating_sub(jump_back));
2399                                after_ms = None;
2400                            } else {
2401                                break;
2402                            }
2403                        } else if let Some(newest) = page_newest_ms {
2404                            after_ms = Some(newest.saturating_add(1));
2405                            before_ms = None;
2406                        } else {
2407                            break;
2408                        }
2409                    }
2410                }
2411            }
2412
2413            // Stop conditions
2414            if let Some(lim) = limit
2415                && lim > 0
2416                && out.len() >= lim as usize
2417            {
2418                break;
2419            }
2420            if let Some(ens) = end_ns
2421                && let Some(last) = out.last()
2422                && last.ts_event.as_i64() >= ens
2423            {
2424                break;
2425            }
2426            if let Some(sns) = start_ns
2427                && let Some(first) = out.first()
2428                && (matches!(mode, Mode::Backward) || forward_prepend_mode)
2429                && first.ts_event.as_i64() <= sns
2430            {
2431                // For Range mode, check if we have all bars up to the end time
2432                if matches!(mode, Mode::Range) {
2433                    // Don't stop if we haven't reached the end time yet
2434                    if let Some(ens) = end_ns
2435                        && let Some(last) = out.last()
2436                    {
2437                        let last_ts = last.ts_event.as_i64();
2438                        if last_ts < ens {
2439                            // We have bars before start but haven't reached end, need to continue forward
2440                            // Switch from backward to forward pagination
2441                            forward_prepend_mode = false;
2442                            after_ms = Some((last_ts / 1_000_000).saturating_add(1));
2443                            before_ms = None;
2444                            continue;
2445                        }
2446                    }
2447                }
2448                break;
2449            }
2450
2451            tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
2452        }
2453
2454        // Final rescue for FORWARD/RANGE when nothing gathered
2455        if out.is_empty() && matches!(mode, Mode::Range) {
2456            let pivot = end_ms.unwrap_or(now_ms.saturating_sub(1));
2457            let hist = (now_ms.saturating_sub(pivot)) / (24 * 60 * 60 * 1000) > HISTORY_SPLIT_DAYS;
2458            let mut p = GetCandlesticksParamsBuilder::default();
2459            p.inst_id(symbol.as_str())
2460                .bar(&bar_param)
2461                .limit(300)
2462                .before_ms(pivot);
2463            let params = p.build().map_err(anyhow::Error::new)?;
2464            let raw = if hist {
2465                self.inner.get_history_candles(params).await
2466            } else {
2467                self.inner.get_candles(params).await
2468            }
2469            .map_err(anyhow::Error::new)?;
2470            if !raw.is_empty() {
2471                let ts_init = self.generate_ts_init();
2472                let mut page: Vec<Bar> = Vec::with_capacity(raw.len());
2473                for r in &raw {
2474                    page.push(parse_candlestick(
2475                        r,
2476                        bar_type,
2477                        inst.price_precision(),
2478                        inst.size_precision(),
2479                        ts_init,
2480                    )?);
2481                }
2482                page.reverse();
2483                out = page
2484                    .into_iter()
2485                    .filter(|b| {
2486                        let ts = b.ts_event.as_i64();
2487                        let ok_after = start_ns.is_none_or(|sns| ts >= sns);
2488                        let ok_before = end_ns.is_none_or(|ens| ts <= ens);
2489                        ok_after && ok_before
2490                    })
2491                    .collect();
2492            }
2493        }
2494
2495        // Trim against end bound if needed (keep ≤ end)
2496        if let Some(ens) = end_ns {
2497            while out.last().is_some_and(|b| b.ts_event.as_i64() > ens) {
2498                out.pop();
2499            }
2500        }
2501
2502        // Clamp first bar for Range when using forward pagination
2503        if matches!(mode, Mode::Range)
2504            && !forward_prepend_mode
2505            && let Some(sns) = start_ns
2506        {
2507            let lower = sns.saturating_sub(slot_ns);
2508            while out.first().is_some_and(|b| b.ts_event.as_i64() < lower) {
2509                out.remove(0);
2510            }
2511        }
2512
2513        if let Some(lim) = limit
2514            && lim > 0
2515            && out.len() > lim as usize
2516        {
2517            out.truncate(lim as usize);
2518        }
2519
2520        Ok(out)
2521    }
2522
2523    /// Requests historical order status reports for the given parameters.
2524    ///
2525    /// # Errors
2526    ///
2527    /// Returns an error if the request fails.
2528    ///
2529    /// # References
2530    ///
2531    /// - <https://www.okx.com/docs-v5/en/#order-book-trading-trade-get-order-history-last-7-days>.
2532    /// - <https://www.okx.com/docs-v5/en/#order-book-trading-trade-get-order-history-last-3-months>.
2533    #[allow(clippy::too_many_arguments)]
2534    pub async fn request_order_status_reports(
2535        &self,
2536        account_id: AccountId,
2537        instrument_type: Option<OKXInstrumentType>,
2538        instrument_id: Option<InstrumentId>,
2539        start: Option<DateTime<Utc>>,
2540        end: Option<DateTime<Utc>>,
2541        open_only: bool,
2542        limit: Option<u32>,
2543    ) -> anyhow::Result<Vec<OrderStatusReport>> {
2544        let mut history_params = GetOrderHistoryParamsBuilder::default();
2545
2546        let instrument_type = if let Some(instrument_type) = instrument_type {
2547            instrument_type
2548        } else {
2549            let instrument_id = instrument_id.ok_or_else(|| {
2550                anyhow::anyhow!("Instrument ID required if `instrument_type` not provided")
2551            })?;
2552            let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
2553            okx_instrument_type(&instrument)?
2554        };
2555
2556        history_params.inst_type(instrument_type);
2557
2558        if let Some(instrument_id) = instrument_id.as_ref() {
2559            history_params.inst_id(instrument_id.symbol.inner().to_string());
2560        }
2561
2562        if let Some(limit) = limit {
2563            history_params.limit(limit);
2564        }
2565
2566        let history_params = history_params.build().map_err(|e| anyhow::anyhow!(e))?;
2567
2568        let mut pending_params = GetOrderListParamsBuilder::default();
2569        pending_params.inst_type(instrument_type);
2570
2571        if let Some(instrument_id) = instrument_id.as_ref() {
2572            pending_params.inst_id(instrument_id.symbol.inner().to_string());
2573        }
2574
2575        if let Some(limit) = limit {
2576            pending_params.limit(limit);
2577        }
2578
2579        let pending_params = pending_params.build().map_err(|e| anyhow::anyhow!(e))?;
2580
2581        let combined_resp = if open_only {
2582            // Only request pending/open orders
2583            self.inner
2584                .get_orders_pending(pending_params)
2585                .await
2586                .map_err(|e| anyhow::anyhow!(e))?
2587        } else {
2588            // Make both requests concurrently
2589            let (history_resp, pending_resp) = tokio::try_join!(
2590                self.inner.get_orders_history(history_params),
2591                self.inner.get_orders_pending(pending_params)
2592            )
2593            .map_err(|e| anyhow::anyhow!(e))?;
2594
2595            // Combine both responses
2596            let mut combined_resp = history_resp;
2597            combined_resp.extend(pending_resp);
2598            combined_resp
2599        };
2600
2601        // Prepare time range filter
2602        let start_ns = start.map(UnixNanos::from);
2603        let end_ns = end.map(UnixNanos::from);
2604
2605        let ts_init = self.generate_ts_init();
2606        let mut reports = Vec::with_capacity(combined_resp.len());
2607
2608        // Use a seen filter in case pending orders are within the histories "2hr reserve window"
2609        let mut seen: AHashSet<String> = AHashSet::new();
2610
2611        for order in combined_resp {
2612            let seen_key = if !order.cl_ord_id.is_empty() {
2613                order.cl_ord_id.as_str().to_string()
2614            } else if let Some(algo_cl_ord_id) = order
2615                .algo_cl_ord_id
2616                .as_ref()
2617                .filter(|value| !value.as_str().is_empty())
2618            {
2619                algo_cl_ord_id.as_str().to_string()
2620            } else if let Some(algo_id) = order
2621                .algo_id
2622                .as_ref()
2623                .filter(|value| !value.as_str().is_empty())
2624            {
2625                algo_id.as_str().to_string()
2626            } else {
2627                order.ord_id.as_str().to_string()
2628            };
2629
2630            if !seen.insert(seen_key) {
2631                continue; // Reserved pending already reported
2632            }
2633
2634            let Ok(inst) = self.instrument_from_cache(order.inst_id) else {
2635                log::debug!(
2636                    "Skipping order report for instrument not in cache: symbol={}",
2637                    order.inst_id,
2638                );
2639                continue;
2640            };
2641
2642            let report = match parse_order_status_report(
2643                &order,
2644                account_id,
2645                inst.id(),
2646                inst.price_precision(),
2647                inst.size_precision(),
2648                ts_init,
2649            ) {
2650                Ok(report) => report,
2651                Err(e) => {
2652                    log::error!("Failed to parse order status report: {e}");
2653                    continue;
2654                }
2655            };
2656
2657            if let Some(start_ns) = start_ns
2658                && report.ts_last < start_ns
2659            {
2660                continue;
2661            }
2662            if let Some(end_ns) = end_ns
2663                && report.ts_last > end_ns
2664            {
2665                continue;
2666            }
2667
2668            reports.push(report);
2669        }
2670
2671        Ok(reports)
2672    }
2673
2674    /// Requests fill reports (transaction details) for the given parameters.
2675    ///
2676    /// # Errors
2677    ///
2678    /// Returns an error if the request fails.
2679    ///
2680    /// # References
2681    ///
2682    /// <https://www.okx.com/docs-v5/en/#order-book-trading-trade-get-transaction-details-last-3-days>.
2683    pub async fn request_fill_reports(
2684        &self,
2685        account_id: AccountId,
2686        instrument_type: Option<OKXInstrumentType>,
2687        instrument_id: Option<InstrumentId>,
2688        start: Option<DateTime<Utc>>,
2689        end: Option<DateTime<Utc>>,
2690        limit: Option<u32>,
2691    ) -> anyhow::Result<Vec<FillReport>> {
2692        let mut params = GetTransactionDetailsParamsBuilder::default();
2693
2694        let instrument_type = if let Some(instrument_type) = instrument_type {
2695            instrument_type
2696        } else {
2697            let instrument_id = instrument_id.ok_or_else(|| {
2698                anyhow::anyhow!("Instrument ID required if `instrument_type` not provided")
2699            })?;
2700            let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
2701            okx_instrument_type(&instrument)?
2702        };
2703
2704        params.inst_type(instrument_type);
2705
2706        if let Some(instrument_id) = instrument_id {
2707            let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
2708            let instrument_type = okx_instrument_type(&instrument)?;
2709            params.inst_type(instrument_type);
2710            params.inst_id(instrument_id.symbol.inner().to_string());
2711        }
2712
2713        if let Some(limit) = limit {
2714            params.limit(limit);
2715        }
2716
2717        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2718
2719        let resp = self
2720            .inner
2721            .get_fills(params)
2722            .await
2723            .map_err(|e| anyhow::anyhow!(e))?;
2724
2725        // Prepare time range filter
2726        let start_ns = start.map(UnixNanos::from);
2727        let end_ns = end.map(UnixNanos::from);
2728
2729        let ts_init = self.generate_ts_init();
2730        let mut reports = Vec::with_capacity(resp.len());
2731
2732        for detail in resp {
2733            // Skip fills with zero or negative quantity (cancelled orders, etc)
2734            if detail.fill_sz.is_empty() {
2735                continue;
2736            }
2737            if let Ok(qty) = detail.fill_sz.parse::<f64>() {
2738                if qty <= 0.0 {
2739                    continue;
2740                }
2741            } else {
2742                // Skip unparsable quantities
2743                continue;
2744            }
2745
2746            let Ok(inst) = self.instrument_from_cache(detail.inst_id) else {
2747                log::debug!(
2748                    "Skipping fill report for instrument not in cache: symbol={}",
2749                    detail.inst_id,
2750                );
2751                continue;
2752            };
2753
2754            let report = match parse_fill_report(
2755                detail,
2756                account_id,
2757                inst.id(),
2758                inst.price_precision(),
2759                inst.size_precision(),
2760                ts_init,
2761            ) {
2762                Ok(report) => report,
2763                Err(e) => {
2764                    log::error!("Failed to parse fill report: {e}");
2765                    continue;
2766                }
2767            };
2768
2769            if let Some(start_ns) = start_ns
2770                && report.ts_event < start_ns
2771            {
2772                continue;
2773            }
2774
2775            if let Some(end_ns) = end_ns
2776                && report.ts_event > end_ns
2777            {
2778                continue;
2779            }
2780
2781            reports.push(report);
2782        }
2783
2784        Ok(reports)
2785    }
2786
2787    /// Requests current position status reports for the given parameters.
2788    ///
2789    /// # Position Modes
2790    ///
2791    /// OKX supports two position modes, which affects how position data is returned:
2792    ///
2793    /// ## Net Mode (One-way)
2794    /// - `posSide` field will be `"net"`
2795    /// - `pos` field uses **signed quantities**:
2796    ///   - Positive value = Long position
2797    ///   - Negative value = Short position
2798    ///   - Zero = Flat/no position
2799    ///
2800    /// ## Long/Short Mode (Hedge/Dual-side)
2801    /// - `posSide` field will be `"long"` or `"short"`
2802    /// - `pos` field is **always positive** (use `posSide` to determine actual side)
2803    /// - Allows holding simultaneous long and short positions on the same instrument
2804    /// - Position IDs are suffixed with `-LONG` or `-SHORT` for uniqueness
2805    ///
2806    /// # Errors
2807    ///
2808    /// Returns an error if the request fails.
2809    ///
2810    /// # References
2811    ///
2812    /// <https://www.okx.com/docs-v5/en/#trading-account-rest-api-get-positions>
2813    pub async fn request_position_status_reports(
2814        &self,
2815        account_id: AccountId,
2816        instrument_type: Option<OKXInstrumentType>,
2817        instrument_id: Option<InstrumentId>,
2818    ) -> anyhow::Result<Vec<PositionStatusReport>> {
2819        let mut params = GetPositionsParamsBuilder::default();
2820
2821        let instrument_type = if let Some(instrument_type) = instrument_type {
2822            instrument_type
2823        } else {
2824            let instrument_id = instrument_id.ok_or_else(|| {
2825                anyhow::anyhow!("Instrument ID required if `instrument_type` not provided")
2826            })?;
2827            let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
2828            okx_instrument_type(&instrument)?
2829        };
2830
2831        params.inst_type(instrument_type);
2832
2833        instrument_id
2834            .as_ref()
2835            .map(|i| params.inst_id(i.symbol.inner()));
2836
2837        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2838
2839        let resp = self
2840            .inner
2841            .get_positions(params)
2842            .await
2843            .map_err(|e| anyhow::anyhow!(e))?;
2844
2845        let ts_init = self.generate_ts_init();
2846        let mut reports = Vec::with_capacity(resp.len());
2847
2848        for position in resp {
2849            let Ok(inst) = self.instrument_from_cache(position.inst_id) else {
2850                log::debug!(
2851                    "Skipping position report for instrument not in cache: symbol={}",
2852                    position.inst_id,
2853                );
2854                continue;
2855            };
2856
2857            match parse_position_status_report(
2858                position,
2859                account_id,
2860                inst.id(),
2861                inst.size_precision(),
2862                ts_init,
2863            ) {
2864                Ok(report) => reports.push(report),
2865                Err(e) => {
2866                    log::error!("Failed to parse position status report: {e}");
2867                    continue;
2868                }
2869            };
2870        }
2871
2872        Ok(reports)
2873    }
2874
2875    /// Requests spot margin position status reports from account balance.
2876    ///
2877    /// Spot margin positions appear in `/api/v5/account/balance` as balance sheet items
2878    /// with non-zero `liab` (liability) or `spotInUseAmt` fields, rather than in the
2879    /// positions endpoint. This method fetches the balance and converts any margin
2880    /// positions into position status reports.
2881    ///
2882    /// # Errors
2883    ///
2884    /// Returns an error if the request fails or parsing fails.
2885    ///
2886    /// # References
2887    ///
2888    /// <https://www.okx.com/docs-v5/en/#trading-account-rest-api-get-balance>
2889    pub async fn request_spot_margin_position_reports(
2890        &self,
2891        account_id: AccountId,
2892    ) -> anyhow::Result<Vec<PositionStatusReport>> {
2893        let accounts = self
2894            .inner
2895            .get_balance()
2896            .await
2897            .map_err(|e| anyhow::anyhow!(e))?;
2898
2899        let ts_init = self.generate_ts_init();
2900        let mut reports = Vec::new();
2901
2902        for account in accounts {
2903            for balance in account.details {
2904                let ccy_str = balance.ccy.as_str();
2905
2906                // Try to find instrument by constructing potential spot pair symbols
2907                let potential_symbols = [
2908                    format!("{ccy_str}-USDT"),
2909                    format!("{ccy_str}-USD"),
2910                    format!("{ccy_str}-USDC"),
2911                ];
2912
2913                let instrument_result = potential_symbols.iter().find_map(|symbol| {
2914                    self.instrument_from_cache(Ustr::from(symbol))
2915                        .ok()
2916                        .map(|inst| (inst.id(), inst.size_precision()))
2917                });
2918
2919                let (instrument_id, size_precision) = match instrument_result {
2920                    Some((id, prec)) => (id, prec),
2921                    None => {
2922                        log::debug!(
2923                            "Skipping balance for {ccy_str} - no matching instrument in cache"
2924                        );
2925                        continue;
2926                    }
2927                };
2928
2929                match parse_spot_margin_position_from_balance(
2930                    &balance,
2931                    account_id,
2932                    instrument_id,
2933                    size_precision,
2934                    ts_init,
2935                ) {
2936                    Ok(Some(report)) => reports.push(report),
2937                    Ok(None) => {} // No margin position for this currency
2938                    Err(e) => {
2939                        log::error!(
2940                            "Failed to parse spot margin position from balance for {ccy_str}: {e}"
2941                        );
2942                        continue;
2943                    }
2944                };
2945            }
2946        }
2947
2948        Ok(reports)
2949    }
2950
2951    /// Places an algo order via HTTP.
2952    ///
2953    /// # Errors
2954    ///
2955    /// Returns an error if the request fails.
2956    ///
2957    /// # References
2958    ///
2959    /// <https://www.okx.com/docs-v5/en/#order-book-trading-algo-trading-post-place-algo-order>
2960    pub async fn place_algo_order(
2961        &self,
2962        request: OKXPlaceAlgoOrderRequest,
2963    ) -> Result<OKXPlaceAlgoOrderResponse, OKXHttpError> {
2964        let body =
2965            serde_json::to_vec(&request).map_err(|e| OKXHttpError::JsonError(e.to_string()))?;
2966
2967        let resp: Vec<OKXPlaceAlgoOrderResponse> = self
2968            .inner
2969            .send_request::<_, ()>(
2970                Method::POST,
2971                "/api/v5/trade/order-algo",
2972                None,
2973                Some(body),
2974                true,
2975            )
2976            .await?;
2977
2978        resp.into_iter()
2979            .next()
2980            .ok_or_else(|| OKXHttpError::ValidationError("Empty response".to_string()))
2981    }
2982
2983    /// Cancels an algo order via HTTP.
2984    ///
2985    /// # Errors
2986    ///
2987    /// Returns an error if the request fails.
2988    ///
2989    /// # References
2990    ///
2991    /// <https://www.okx.com/docs-v5/en/#order-book-trading-algo-trading-post-cancel-algo-order>
2992    pub async fn cancel_algo_order(
2993        &self,
2994        request: OKXCancelAlgoOrderRequest,
2995    ) -> Result<OKXCancelAlgoOrderResponse, OKXHttpError> {
2996        // OKX expects an array for cancel-algos endpoint
2997        // Serialize once to bytes to keep signing and sending identical
2998        let body =
2999            serde_json::to_vec(&[request]).map_err(|e| OKXHttpError::JsonError(e.to_string()))?;
3000
3001        let resp: Vec<OKXCancelAlgoOrderResponse> = self
3002            .inner
3003            .send_request::<_, ()>(
3004                Method::POST,
3005                "/api/v5/trade/cancel-algos",
3006                None,
3007                Some(body),
3008                true,
3009            )
3010            .await?;
3011
3012        resp.into_iter()
3013            .next()
3014            .ok_or_else(|| OKXHttpError::ValidationError("Empty response".to_string()))
3015    }
3016
3017    /// Cancels multiple algo orders via HTTP in a single request.
3018    ///
3019    /// # Errors
3020    ///
3021    /// Returns an error if the request fails.
3022    ///
3023    /// # References
3024    ///
3025    /// <https://www.okx.com/docs-v5/en/#order-book-trading-algo-trading-post-cancel-algo-order>
3026    pub async fn cancel_algo_orders(
3027        &self,
3028        requests: Vec<OKXCancelAlgoOrderRequest>,
3029    ) -> Result<Vec<OKXCancelAlgoOrderResponse>, OKXHttpError> {
3030        if requests.is_empty() {
3031            return Ok(Vec::new());
3032        }
3033
3034        let body =
3035            serde_json::to_vec(&requests).map_err(|e| OKXHttpError::JsonError(e.to_string()))?;
3036
3037        self.inner
3038            .send_request::<_, ()>(
3039                Method::POST,
3040                "/api/v5/trade/cancel-algos",
3041                None,
3042                Some(body),
3043                true,
3044            )
3045            .await
3046    }
3047
3048    /// Places an algo order using domain types.
3049    ///
3050    /// This is a convenience method that accepts Nautilus domain types
3051    /// and builds the appropriate OKX request structure internally.
3052    ///
3053    /// # Errors
3054    ///
3055    /// Returns an error if the request fails.
3056    #[allow(clippy::too_many_arguments)]
3057    pub async fn place_algo_order_with_domain_types(
3058        &self,
3059        instrument_id: InstrumentId,
3060        td_mode: OKXTradeMode,
3061        client_order_id: ClientOrderId,
3062        order_side: OrderSide,
3063        order_type: OrderType,
3064        quantity: Quantity,
3065        trigger_price: Price,
3066        trigger_type: Option<TriggerType>,
3067        limit_price: Option<Price>,
3068        reduce_only: Option<bool>,
3069    ) -> Result<OKXPlaceAlgoOrderResponse, OKXHttpError> {
3070        if !matches!(order_side, OrderSide::Buy | OrderSide::Sell) {
3071            return Err(OKXHttpError::ValidationError(
3072                "Invalid order side".to_string(),
3073            ));
3074        }
3075        let okx_side: OKXSide = order_side.into();
3076
3077        // Map trigger type to OKX format
3078        let trigger_px_type_enum = trigger_type.map_or(OKXTriggerType::Last, Into::into);
3079
3080        // Determine order price based on order type
3081        let order_px = if matches!(order_type, OrderType::StopLimit | OrderType::LimitIfTouched) {
3082            limit_price.map(|p| p.to_string())
3083        } else {
3084            // Market orders use -1 to indicate market execution
3085            Some("-1".to_string())
3086        };
3087
3088        let request = OKXPlaceAlgoOrderRequest {
3089            inst_id: instrument_id.symbol.as_str().to_string(),
3090            inst_id_code: None,
3091            td_mode,
3092            side: okx_side,
3093            ord_type: OKXAlgoOrderType::Trigger, // All conditional orders use 'trigger' type
3094            sz: quantity.to_string(),
3095            algo_cl_ord_id: Some(client_order_id.as_str().to_string()),
3096            trigger_px: Some(trigger_price.to_string()),
3097            order_px,
3098            trigger_px_type: Some(trigger_px_type_enum),
3099            tgt_ccy: None,  // Let OKX determine based on instrument
3100            pos_side: None, // Use default position side
3101            close_position: None,
3102            tag: Some(OKX_NAUTILUS_BROKER_ID.to_string()),
3103            reduce_only,
3104        };
3105
3106        self.place_algo_order(request).await
3107    }
3108
3109    /// Cancels an algo order using domain types.
3110    ///
3111    /// This is a convenience method that accepts Nautilus domain types
3112    /// and builds the appropriate OKX request structure internally.
3113    ///
3114    /// # Errors
3115    ///
3116    /// Returns an error if the request fails.
3117    pub async fn cancel_algo_order_with_domain_types(
3118        &self,
3119        instrument_id: InstrumentId,
3120        algo_id: String,
3121    ) -> Result<OKXCancelAlgoOrderResponse, OKXHttpError> {
3122        let request = OKXCancelAlgoOrderRequest {
3123            inst_id: instrument_id.symbol.to_string(),
3124            inst_id_code: None,
3125            algo_id: Some(algo_id),
3126            algo_cl_ord_id: None,
3127        };
3128
3129        self.cancel_algo_order(request).await
3130    }
3131
3132    /// Requests algo order status reports.
3133    ///
3134    /// # Errors
3135    ///
3136    /// Returns an error if the request fails.
3137    #[allow(clippy::too_many_arguments)]
3138    pub async fn request_algo_order_status_reports(
3139        &self,
3140        account_id: AccountId,
3141        instrument_type: Option<OKXInstrumentType>,
3142        instrument_id: Option<InstrumentId>,
3143        algo_id: Option<String>,
3144        algo_client_order_id: Option<ClientOrderId>,
3145        state: Option<OKXOrderStatus>,
3146        limit: Option<u32>,
3147    ) -> anyhow::Result<Vec<OrderStatusReport>> {
3148        let mut instruments_cache: AHashMap<Ustr, InstrumentAny> = AHashMap::new();
3149
3150        let inst_type = if let Some(inst_type) = instrument_type {
3151            inst_type
3152        } else if let Some(inst_id) = instrument_id {
3153            let instrument = self.instrument_from_cache(inst_id.symbol.inner())?;
3154            let inst_type = okx_instrument_type(&instrument)?;
3155            instruments_cache.insert(inst_id.symbol.inner(), instrument);
3156            inst_type
3157        } else {
3158            anyhow::bail!("instrument_type or instrument_id required for algo order query")
3159        };
3160
3161        let mut params_builder = GetAlgoOrdersParamsBuilder::default();
3162        params_builder.inst_type(inst_type);
3163        if let Some(inst_id) = instrument_id {
3164            params_builder.inst_id(inst_id.symbol.inner().to_string());
3165        }
3166        if let Some(algo_id) = algo_id.as_ref() {
3167            params_builder.algo_id(algo_id.clone());
3168        }
3169        if let Some(client_order_id) = algo_client_order_id.as_ref() {
3170            params_builder.algo_cl_ord_id(client_order_id.as_str().to_string());
3171        }
3172        if let Some(state) = state {
3173            params_builder.state(state);
3174        }
3175        if let Some(limit) = limit {
3176            params_builder.limit(limit);
3177        }
3178
3179        let params = params_builder
3180            .build()
3181            .map_err(|e| anyhow::anyhow!(format!("Failed to build algo order params: {e}")))?;
3182
3183        let ts_init = self.generate_ts_init();
3184        let mut reports = Vec::new();
3185        let mut seen: AHashSet<(String, String)> = AHashSet::new();
3186
3187        let pending = match self.inner.get_order_algo_pending(params.clone()).await {
3188            Ok(result) => result,
3189            Err(OKXHttpError::UnexpectedStatus { status, .. })
3190                if status == StatusCode::NOT_FOUND =>
3191            {
3192                Vec::new()
3193            }
3194            Err(e) => return Err(e.into()),
3195        };
3196        self.collect_algo_reports(
3197            account_id,
3198            &pending,
3199            &mut instruments_cache,
3200            ts_init,
3201            &mut seen,
3202            &mut reports,
3203        )
3204        .await?;
3205
3206        let history = match self.inner.get_order_algo_history(params).await {
3207            Ok(result) => result,
3208            Err(OKXHttpError::UnexpectedStatus { status, .. })
3209                if status == StatusCode::NOT_FOUND =>
3210            {
3211                Vec::new()
3212            }
3213            Err(e) => return Err(e.into()),
3214        };
3215        self.collect_algo_reports(
3216            account_id,
3217            &history,
3218            &mut instruments_cache,
3219            ts_init,
3220            &mut seen,
3221            &mut reports,
3222        )
3223        .await?;
3224
3225        Ok(reports)
3226    }
3227
3228    /// Requests an algo order status report by client order identifier.
3229    ///
3230    /// # Errors
3231    ///
3232    /// Returns an error if the request fails.
3233    pub async fn request_algo_order_status_report(
3234        &self,
3235        account_id: AccountId,
3236        instrument_id: InstrumentId,
3237        algo_client_order_id: ClientOrderId,
3238    ) -> anyhow::Result<Option<OrderStatusReport>> {
3239        let reports = self
3240            .request_algo_order_status_reports(
3241                account_id,
3242                None,
3243                Some(instrument_id),
3244                None,
3245                Some(algo_client_order_id),
3246                None,
3247                Some(50_u32),
3248            )
3249            .await?;
3250
3251        Ok(reports.into_iter().next())
3252    }
3253
3254    /// Exposes raw HTTP client for testing purposes
3255    pub fn raw_client(&self) -> &Arc<OKXRawHttpClient> {
3256        &self.inner
3257    }
3258
3259    async fn collect_algo_reports(
3260        &self,
3261        account_id: AccountId,
3262        orders: &[OKXOrderAlgo],
3263        instruments_cache: &mut AHashMap<Ustr, InstrumentAny>,
3264        ts_init: UnixNanos,
3265        seen: &mut AHashSet<(String, String)>,
3266        reports: &mut Vec<OrderStatusReport>,
3267    ) -> anyhow::Result<()> {
3268        for order in orders {
3269            let key = (order.algo_id.clone(), order.algo_cl_ord_id.clone());
3270            if !seen.insert(key) {
3271                continue;
3272            }
3273
3274            let instrument = if let Some(instrument) = instruments_cache.get(&order.inst_id) {
3275                instrument.clone()
3276            } else {
3277                let Ok(instrument) = self.instrument_from_cache(order.inst_id) else {
3278                    log::debug!(
3279                        "Skipping algo order report for instrument not in cache: symbol={}",
3280                        order.inst_id,
3281                    );
3282                    continue;
3283                };
3284                instruments_cache.insert(order.inst_id, instrument.clone());
3285                instrument
3286            };
3287
3288            match parse_http_algo_order(order, account_id, &instrument, ts_init) {
3289                Ok(report) => reports.push(report),
3290                Err(e) => {
3291                    log::error!("Failed to parse algo order report: {e}");
3292                }
3293            }
3294        }
3295
3296        Ok(())
3297    }
3298}
3299
3300fn parse_http_algo_order(
3301    order: &OKXOrderAlgo,
3302    account_id: AccountId,
3303    instrument: &InstrumentAny,
3304    ts_init: UnixNanos,
3305) -> anyhow::Result<OrderStatusReport> {
3306    let ord_px = if order.ord_px.is_empty() {
3307        "-1".to_string()
3308    } else {
3309        order.ord_px.clone()
3310    };
3311
3312    let reduce_only = if order.reduce_only.is_empty() {
3313        "false".to_string()
3314    } else {
3315        order.reduce_only.clone()
3316    };
3317
3318    let msg = OKXAlgoOrderMsg {
3319        algo_id: order.algo_id.clone(),
3320        algo_cl_ord_id: order.algo_cl_ord_id.clone(),
3321        cl_ord_id: order.cl_ord_id.clone(),
3322        ord_id: order.ord_id.clone(),
3323        inst_id: order.inst_id,
3324        inst_type: order.inst_type,
3325        ord_type: order.ord_type,
3326        state: order.state,
3327        side: order.side,
3328        pos_side: order.pos_side,
3329        sz: order.sz.clone(),
3330        trigger_px: order.trigger_px.clone(),
3331        trigger_px_type: order.trigger_px_type.unwrap_or(OKXTriggerType::None),
3332        ord_px,
3333        td_mode: order.td_mode,
3334        lever: order.lever.clone(),
3335        reduce_only,
3336        actual_px: order.actual_px.clone(),
3337        actual_sz: order.actual_sz.clone(),
3338        notional_usd: order.notional_usd.clone(),
3339        c_time: order.c_time,
3340        u_time: order.u_time,
3341        trigger_time: order.trigger_time.clone(),
3342        tag: order.tag.clone(),
3343    };
3344
3345    parse_algo_order_status_report(&msg, instrument, account_id, ts_init)
3346}