nautilus_okx/websocket/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides the WebSocket client integration for the [OKX](https://okx.com) WebSocket API.
17//!
18//! The [`OKXWebSocketClient`] ties together several recurring patterns:
19//! - Heartbeats use text `ping`/`pong`, responding to both text and control-frame pings.
20//! - Authentication re-runs on reconnect before resubscribing and skips private channels when
21//!   credentials are unavailable.
22//! - Subscriptions cache instrument type/family/ID groupings so reconnects rebuild the same set of
23//!   channels while respecting the authentication guard described above.
24
25use std::{
26    collections::VecDeque,
27    fmt::Debug,
28    num::NonZeroU32,
29    sync::{
30        Arc, LazyLock,
31        atomic::{AtomicBool, AtomicU8, AtomicU64, Ordering},
32    },
33    time::{Duration, SystemTime},
34};
35
36use ahash::{AHashMap, AHashSet};
37use dashmap::DashMap;
38use futures_util::Stream;
39use nautilus_common::runtime::get_runtime;
40use nautilus_core::{
41    UUID4,
42    consts::NAUTILUS_USER_AGENT,
43    env::{get_env_var, get_or_env_var},
44    nanos::UnixNanos,
45    time::get_atomic_clock_realtime,
46};
47use nautilus_model::{
48    data::BarType,
49    enums::{OrderSide, OrderStatus, OrderType, PositionSide, TimeInForce, TriggerType},
50    events::{AccountState, OrderCancelRejected, OrderModifyRejected, OrderRejected},
51    identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
52    instruments::{Instrument, InstrumentAny},
53    reports::OrderStatusReport,
54    types::{Money, Price, Quantity},
55};
56use nautilus_network::{
57    RECONNECTED,
58    ratelimiter::quota::Quota,
59    retry::{RetryManager, create_websocket_retry_manager},
60    websocket::{
61        PingHandler, TEXT_PING, TEXT_PONG, WebSocketClient, WebSocketConfig,
62        channel_message_handler,
63    },
64};
65use reqwest::header::USER_AGENT;
66use serde_json::Value;
67use tokio::sync::mpsc::UnboundedReceiver;
68use tokio_tungstenite::tungstenite::{Error, Message};
69use tokio_util::sync::CancellationToken;
70use ustr::Ustr;
71
72use super::{
73    auth::{AUTHENTICATION_TIMEOUT_SECS, AuthTracker},
74    enums::{OKXSubscriptionEvent, OKXWsChannel, OKXWsOperation},
75    error::OKXWsError,
76    messages::{
77        ExecutionReport, NautilusWsMessage, OKXAuthentication, OKXAuthenticationArg,
78        OKXSubscription, OKXSubscriptionArg, OKXWebSocketArg, OKXWebSocketError, OKXWebSocketEvent,
79        OKXWsRequest, WsAmendOrderParams, WsAmendOrderParamsBuilder, WsCancelAlgoOrderParams,
80        WsCancelAlgoOrderParamsBuilder, WsCancelOrderParams, WsCancelOrderParamsBuilder,
81        WsMassCancelParams, WsPostAlgoOrderParams, WsPostAlgoOrderParamsBuilder, WsPostOrderParams,
82        WsPostOrderParamsBuilder,
83    },
84    parse::{parse_book_msg_vec, parse_ws_message_data},
85    subscription::{SubscriptionState, topic_from_subscription_arg, topic_from_websocket_arg},
86};
87use crate::{
88    common::{
89        consts::{
90            OKX_NAUTILUS_BROKER_ID, OKX_POST_ONLY_CANCEL_REASON, OKX_POST_ONLY_CANCEL_SOURCE,
91            OKX_POST_ONLY_ERROR_CODE, OKX_SUPPORTED_ORDER_TYPES, OKX_SUPPORTED_TIME_IN_FORCE,
92            OKX_WS_PUBLIC_URL, should_retry_error_code,
93        },
94        credential::Credential,
95        enums::{
96            OKXInstrumentType, OKXOrderStatus, OKXOrderType, OKXPositionSide, OKXSide,
97            OKXTargetCurrency, OKXTradeMode, OKXTriggerType, OKXVipLevel,
98            conditional_order_to_algo_type, is_conditional_order,
99        },
100        parse::{
101            bar_spec_as_okx_channel, okx_instrument_type, parse_account_state,
102            parse_client_order_id, parse_millisecond_timestamp, parse_price, parse_quantity,
103        },
104    },
105    http::models::OKXAccount,
106    websocket::{
107        messages::{OKXAlgoOrderMsg, OKXOrderMsg},
108        parse::{parse_algo_order_msg, parse_order_msg},
109    },
110};
111
112enum PendingOrderParams {
113    Regular(WsPostOrderParams),
114    Algo(()),
115}
116
117type PlaceRequestData = (
118    PendingOrderParams,
119    ClientOrderId,
120    TraderId,
121    StrategyId,
122    InstrumentId,
123);
124type CancelRequestData = (
125    ClientOrderId,
126    TraderId,
127    StrategyId,
128    InstrumentId,
129    Option<VenueOrderId>,
130);
131type AmendRequestData = (
132    ClientOrderId,
133    TraderId,
134    StrategyId,
135    InstrumentId,
136    Option<VenueOrderId>,
137);
138type MassCancelRequestData = InstrumentId;
139
140/// Default OKX WebSocket connection rate limit: 3 requests per second.
141///
142/// This applies to establishing WebSocket connections, not to subscribe/unsubscribe operations.
143pub static OKX_WS_CONNECTION_QUOTA: LazyLock<Quota> =
144    LazyLock::new(|| Quota::per_second(NonZeroU32::new(3).unwrap()));
145
146/// OKX WebSocket subscription rate limit: 480 requests per hour per connection.
147///
148/// This applies to subscribe/unsubscribe/login operations.
149/// 480 per hour = 8 per minute, but we use per-hour for accurate limiting.
150pub static OKX_WS_SUBSCRIPTION_QUOTA: LazyLock<Quota> =
151    LazyLock::new(|| Quota::per_hour(NonZeroU32::new(480).unwrap()));
152
153/// Rate limit for order-related WebSocket operations: 250 requests per second.
154///
155/// Based on OKX documentation for sub-account order limits (1000 per 2 seconds,
156/// so we use half for conservative rate limiting).
157pub static OKX_WS_ORDER_QUOTA: LazyLock<Quota> =
158    LazyLock::new(|| Quota::per_second(NonZeroU32::new(250).unwrap()));
159
160/// Determines if an OKX WebSocket error should trigger a retry.
161fn should_retry_okx_error(error: &OKXWsError) -> bool {
162    match error {
163        OKXWsError::OkxError { error_code, .. } => should_retry_error_code(error_code),
164        OKXWsError::TungsteniteError(_) => true, // Network errors are retryable
165        OKXWsError::ClientError(msg) => {
166            // Retry on timeout and connection errors (case-insensitive)
167            let msg_lower = msg.to_lowercase();
168            msg_lower.contains("timeout")
169                || msg_lower.contains("timed out")
170                || msg_lower.contains("connection")
171                || msg_lower.contains("network")
172        }
173        OKXWsError::AuthenticationError(_)
174        | OKXWsError::JsonError(_)
175        | OKXWsError::ParsingError(_) => {
176            // Don't retry authentication or parsing errors automatically
177            false
178        }
179    }
180}
181
182/// Creates a timeout error for OKX operations.
183fn create_okx_timeout_error(msg: String) -> OKXWsError {
184    OKXWsError::ClientError(msg)
185}
186
187fn channel_requires_auth(channel: &OKXWsChannel) -> bool {
188    matches!(
189        channel,
190        OKXWsChannel::Account
191            | OKXWsChannel::Orders
192            | OKXWsChannel::Fills
193            | OKXWsChannel::OrdersAlgo
194    )
195}
196
197/// Provides a WebSocket client for connecting to [OKX](https://okx.com).
198#[derive(Clone)]
199#[cfg_attr(
200    feature = "python",
201    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.adapters")
202)]
203pub struct OKXWebSocketClient {
204    url: String,
205    account_id: AccountId,
206    vip_level: Arc<AtomicU8>,
207    credential: Option<Credential>,
208    heartbeat: Option<u64>,
209    inner: Arc<tokio::sync::RwLock<Option<WebSocketClient>>>,
210    auth_tracker: AuthTracker,
211    rx: Option<Arc<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>>>,
212    signal: Arc<AtomicBool>,
213    task_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
214    subscriptions_inst_type: Arc<DashMap<OKXWsChannel, AHashSet<OKXInstrumentType>>>,
215    subscriptions_inst_family: Arc<DashMap<OKXWsChannel, AHashSet<Ustr>>>,
216    subscriptions_inst_id: Arc<DashMap<OKXWsChannel, AHashSet<Ustr>>>,
217    subscriptions_bare: Arc<DashMap<OKXWsChannel, bool>>, // For channels without inst params (e.g., Account)
218    subscriptions_state: SubscriptionState,
219    request_id_counter: Arc<AtomicU64>,
220    pending_place_requests: Arc<DashMap<String, PlaceRequestData>>,
221    pending_cancel_requests: Arc<DashMap<String, CancelRequestData>>,
222    pending_amend_requests: Arc<DashMap<String, AmendRequestData>>,
223    pending_mass_cancel_requests: Arc<DashMap<String, MassCancelRequestData>>,
224    active_client_orders: Arc<DashMap<ClientOrderId, (TraderId, StrategyId, InstrumentId)>>,
225    emitted_order_accepted: Arc<DashMap<VenueOrderId, ()>>, // Track orders we've already emitted OrderAccepted for
226    client_id_aliases: Arc<DashMap<ClientOrderId, ClientOrderId>>,
227    instruments_cache: Arc<AHashMap<Ustr, InstrumentAny>>,
228    retry_manager: Arc<RetryManager<OKXWsError>>,
229    cancellation_token: CancellationToken,
230}
231
232impl Default for OKXWebSocketClient {
233    fn default() -> Self {
234        Self::new(None, None, None, None, None, None).unwrap()
235    }
236}
237
238impl Debug for OKXWebSocketClient {
239    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
240        f.debug_struct(stringify!(OKXWebSocketClient))
241            .field("url", &self.url)
242            .field(
243                "credential",
244                &self.credential.as_ref().map(|_| "<redacted>"),
245            )
246            .field("heartbeat", &self.heartbeat)
247            .finish_non_exhaustive()
248    }
249}
250
251impl OKXWebSocketClient {
252    /// Creates a new [`OKXWebSocketClient`] instance.
253    ///
254    /// # Errors
255    ///
256    /// Returns an error if the request fails.
257    pub fn new(
258        url: Option<String>,
259        api_key: Option<String>,
260        api_secret: Option<String>,
261        api_passphrase: Option<String>,
262        account_id: Option<AccountId>,
263        heartbeat: Option<u64>,
264    ) -> anyhow::Result<Self> {
265        let url = url.unwrap_or(OKX_WS_PUBLIC_URL.to_string());
266        let account_id = account_id.unwrap_or(AccountId::from("OKX-master"));
267
268        let credential = match (api_key, api_secret, api_passphrase) {
269            (Some(key), Some(secret), Some(passphrase)) => {
270                Some(Credential::new(key, secret, passphrase))
271            }
272            (None, None, None) => None,
273            _ => anyhow::bail!(
274                "`api_key`, `api_secret`, `api_passphrase` credentials must be provided together"
275            ),
276        };
277
278        let signal = Arc::new(AtomicBool::new(false));
279        let subscriptions_inst_type = Arc::new(DashMap::new());
280        let subscriptions_inst_family = Arc::new(DashMap::new());
281        let subscriptions_inst_id = Arc::new(DashMap::new());
282        let subscriptions_bare = Arc::new(DashMap::new());
283        let subscriptions_state = SubscriptionState::new();
284
285        Ok(Self {
286            url,
287            account_id,
288            vip_level: Arc::new(AtomicU8::new(0)), // Default to VIP 0
289            credential,
290            heartbeat,
291            inner: Arc::new(tokio::sync::RwLock::new(None)),
292            auth_tracker: AuthTracker::new(),
293            rx: None,
294            signal,
295            task_handle: None,
296            subscriptions_inst_type,
297            subscriptions_inst_family,
298            subscriptions_inst_id,
299            subscriptions_bare,
300            subscriptions_state,
301            request_id_counter: Arc::new(AtomicU64::new(1)),
302            pending_place_requests: Arc::new(DashMap::new()),
303            pending_cancel_requests: Arc::new(DashMap::new()),
304            pending_amend_requests: Arc::new(DashMap::new()),
305            pending_mass_cancel_requests: Arc::new(DashMap::new()),
306            active_client_orders: Arc::new(DashMap::new()),
307            emitted_order_accepted: Arc::new(DashMap::new()),
308            client_id_aliases: Arc::new(DashMap::new()),
309            instruments_cache: Arc::new(AHashMap::new()),
310            retry_manager: Arc::new(create_websocket_retry_manager()?),
311            cancellation_token: CancellationToken::new(),
312        })
313    }
314
315    /// Creates a new [`OKXWebSocketClient`] instance.
316    ///
317    /// # Errors
318    ///
319    /// Returns an error if credential values cannot be loaded or if the
320    /// client fails to initialize.
321    pub fn with_credentials(
322        url: Option<String>,
323        api_key: Option<String>,
324        api_secret: Option<String>,
325        api_passphrase: Option<String>,
326        account_id: Option<AccountId>,
327        heartbeat: Option<u64>,
328    ) -> anyhow::Result<Self> {
329        let url = url.unwrap_or(OKX_WS_PUBLIC_URL.to_string());
330        let api_key = get_or_env_var(api_key, "OKX_API_KEY")?;
331        let api_secret = get_or_env_var(api_secret, "OKX_API_SECRET")?;
332        let api_passphrase = get_or_env_var(api_passphrase, "OKX_API_PASSPHRASE")?;
333
334        Self::new(
335            Some(url),
336            Some(api_key),
337            Some(api_secret),
338            Some(api_passphrase),
339            account_id,
340            heartbeat,
341        )
342    }
343
344    /// Creates a new authenticated [`OKXWebSocketClient`] using environment variables.
345    ///
346    /// # Errors
347    ///
348    /// Returns an error if required environment variables are missing or if
349    /// the client fails to initialize.
350    pub fn from_env() -> anyhow::Result<Self> {
351        let url = get_env_var("OKX_WS_URL")?;
352        let api_key = get_env_var("OKX_API_KEY")?;
353        let api_secret = get_env_var("OKX_API_SECRET")?;
354        let api_passphrase = get_env_var("OKX_API_PASSPHRASE")?;
355
356        Self::new(
357            Some(url),
358            Some(api_key),
359            Some(api_secret),
360            Some(api_passphrase),
361            None,
362            None,
363        )
364    }
365
366    /// Cancel all pending WebSocket requests.
367    pub fn cancel_all_requests(&self) {
368        self.cancellation_token.cancel();
369    }
370
371    /// Get the cancellation token for this client.
372    pub fn cancellation_token(&self) -> &CancellationToken {
373        &self.cancellation_token
374    }
375
376    /// Returns the websocket url being used by the client.
377    pub fn url(&self) -> &str {
378        self.url.as_str()
379    }
380
381    /// Returns the public API key being used by the client.
382    pub fn api_key(&self) -> Option<&str> {
383        self.credential.clone().map(|c| c.api_key.as_str())
384    }
385
386    /// Get a read lock on the inner client
387    /// Returns a value indicating whether the client is active.
388    pub fn is_active(&self) -> bool {
389        // Use try_read to avoid blocking
390        match self.inner.try_read() {
391            Ok(guard) => match &*guard {
392                Some(inner) => inner.is_active(),
393                None => false,
394            },
395            Err(_) => false, // If we can't get the lock, assume not active
396        }
397    }
398
399    /// Returns a value indicating whether the client is closed.
400    pub fn is_closed(&self) -> bool {
401        // Use try_read to avoid blocking
402        match self.inner.try_read() {
403            Ok(guard) => match &*guard {
404                Some(inner) => inner.is_closed(),
405                None => true,
406            },
407            Err(_) => true, // If we can't get the lock, assume closed
408        }
409    }
410
411    /// Initialize the instruments cache with the given `instruments`.
412    pub fn initialize_instruments_cache(&mut self, instruments: Vec<InstrumentAny>) {
413        let mut instruments_cache: AHashMap<Ustr, InstrumentAny> = AHashMap::new();
414        for inst in instruments {
415            instruments_cache.insert(inst.symbol().inner(), inst.clone());
416        }
417
418        self.instruments_cache = Arc::new(instruments_cache);
419    }
420
421    /// Sets the VIP level for this client.
422    ///
423    /// The VIP level determines which WebSocket channels are available.
424    pub fn set_vip_level(&self, vip_level: OKXVipLevel) {
425        self.vip_level.store(vip_level as u8, Ordering::Relaxed);
426    }
427
428    /// Gets the current VIP level.
429    pub fn vip_level(&self) -> OKXVipLevel {
430        let level = self.vip_level.load(Ordering::Relaxed);
431        OKXVipLevel::from(level)
432    }
433
434    /// Connect to the OKX WebSocket server.
435    ///
436    /// # Errors
437    ///
438    /// Returns an error if the connection process fails.
439    ///
440    /// # Panics
441    ///
442    /// Panics if subscription arguments fail to serialize to JSON.
443    pub async fn connect(&mut self) -> anyhow::Result<()> {
444        let (message_handler, reader) = channel_message_handler();
445
446        let inner_for_ping = self.inner.clone();
447        let ping_handler: PingHandler = Arc::new(move |payload: Vec<u8>| {
448            let inner = inner_for_ping.clone();
449
450            get_runtime().spawn(async move {
451                let len = payload.len();
452                let guard = inner.read().await;
453
454                if let Some(client) = guard.as_ref() {
455                    if let Err(e) = client.send_pong(payload).await {
456                        tracing::warn!(error = %e, "Failed to send pong frame");
457                    } else {
458                        tracing::trace!("Sent pong frame ({len} bytes)");
459                    }
460                } else {
461                    tracing::debug!("Ping received with no active websocket client");
462                }
463            });
464        });
465
466        let config = WebSocketConfig {
467            url: self.url.clone(),
468            headers: vec![(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())],
469            heartbeat: self.heartbeat,
470            heartbeat_msg: Some(TEXT_PING.to_string()),
471            message_handler: Some(message_handler),
472            ping_handler: Some(ping_handler),
473            reconnect_timeout_ms: Some(5_000),
474            reconnect_delay_initial_ms: None, // Use default
475            reconnect_delay_max_ms: None,     // Use default
476            reconnect_backoff_factor: None,   // Use default
477            reconnect_jitter_ms: None,        // Use default
478        };
479
480        // Configure rate limits for different operation types
481        let keyed_quotas = vec![
482            ("subscription".to_string(), *OKX_WS_SUBSCRIPTION_QUOTA),
483            ("order".to_string(), *OKX_WS_ORDER_QUOTA),
484            ("cancel".to_string(), *OKX_WS_ORDER_QUOTA),
485            ("amend".to_string(), *OKX_WS_ORDER_QUOTA),
486        ];
487
488        let client = WebSocketClient::connect(
489            config,
490            None, // post_reconnection
491            keyed_quotas,
492            Some(*OKX_WS_CONNECTION_QUOTA), // Default quota for connection operations
493        )
494        .await?;
495
496        // Set the inner client with write lock
497        {
498            let mut inner_guard = self.inner.write().await;
499            *inner_guard = Some(client);
500        }
501
502        let account_id = self.account_id;
503        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<NautilusWsMessage>();
504
505        self.rx = Some(Arc::new(rx));
506        let signal = self.signal.clone();
507        let pending_place_requests = self.pending_place_requests.clone();
508        let pending_cancel_requests = self.pending_cancel_requests.clone();
509        let pending_amend_requests = self.pending_amend_requests.clone();
510        let pending_mass_cancel_requests = self.pending_mass_cancel_requests.clone();
511        let active_client_orders = self.active_client_orders.clone();
512        let emitted_order_accepted = self.emitted_order_accepted.clone();
513        let auth_tracker = self.auth_tracker.clone();
514
515        let instruments_cache = self.instruments_cache.clone();
516        let inner_client = self.inner.clone();
517        let credential_clone = self.credential.clone();
518        let subscriptions_inst_type = self.subscriptions_inst_type.clone();
519        let subscriptions_inst_family = self.subscriptions_inst_family.clone();
520        let subscriptions_inst_id = self.subscriptions_inst_id.clone();
521        let subscriptions_bare = self.subscriptions_bare.clone();
522        let subscriptions_state = self.subscriptions_state.clone();
523        let client_id_aliases = self.client_id_aliases.clone();
524
525        let stream_handle = get_runtime().spawn({
526            let auth_tracker = auth_tracker.clone();
527            let signal = signal.clone();
528            async move {
529                let mut handler = OKXWsMessageHandler::new(
530                    account_id,
531                    instruments_cache,
532                    reader,
533                    signal.clone(),
534                    inner_client.clone(),
535                    tx,
536                    pending_place_requests,
537                    pending_cancel_requests,
538                    pending_amend_requests,
539                    pending_mass_cancel_requests,
540                    active_client_orders,
541                    client_id_aliases,
542                    emitted_order_accepted,
543                    auth_tracker.clone(),
544                    subscriptions_state.clone(),
545                );
546
547                // Main message loop with explicit reconnection handling
548                loop {
549                    match handler.next().await {
550                        Some(NautilusWsMessage::Reconnected) => {
551                            if signal.load(Ordering::Relaxed) {
552                                tracing::debug!("Skipping reconnection resubscription due to stop signal");
553                                continue;
554                            }
555
556                            tracing::debug!("Handling WebSocket reconnection");
557
558                            let auth_tracker_for_task = auth_tracker.clone();
559                            let inner_client_for_task = inner_client.clone();
560                            let subscriptions_inst_type_for_task = subscriptions_inst_type.clone();
561                            let subscriptions_inst_family_for_task = subscriptions_inst_family.clone();
562                            let subscriptions_inst_id_for_task = subscriptions_inst_id.clone();
563                            let subscriptions_bare_for_task = subscriptions_bare.clone();
564                            let subscriptions_state_for_task = subscriptions_state.clone();
565
566                            let auth_wait = if let Some(cred) = &credential_clone {
567                                let rx = auth_tracker.begin();
568                                let inner_guard = inner_client.read().await;
569
570                                if let Some(client) = &*inner_guard {
571                                    let timestamp = SystemTime::now()
572                                        .duration_since(SystemTime::UNIX_EPOCH)
573                                        .expect("System time should be after UNIX epoch")
574                                        .as_secs()
575                                        .to_string();
576                                    let signature =
577                                        cred.sign(&timestamp, "GET", "/users/self/verify", "");
578
579                                    let auth_message = OKXAuthentication {
580                                        op: "login",
581                                        args: vec![OKXAuthenticationArg {
582                                            api_key: cred.api_key.to_string(),
583                                            passphrase: cred.api_passphrase.clone(),
584                                            timestamp,
585                                            sign: signature,
586                                        }],
587                                    };
588
589                                    if let Err(e) = client
590                                        .send_text(serde_json::to_string(&auth_message).unwrap(), None)
591                                        .await
592                                    {
593                                        tracing::error!(
594                                            "Failed to send re-authentication request: {e}",
595                                        );
596                                        auth_tracker.fail(e.to_string());
597                                    } else {
598                                        tracing::debug!(
599                                            "Sent re-authentication request, waiting for response before resubscribing",
600                                        );
601                                    }
602                                } else {
603                                    auth_tracker
604                                        .fail("Cannot authenticate: not connected".to_string());
605                                }
606
607                                drop(inner_guard);
608
609                                Some(rx)
610                            } else {
611                                None
612                            };
613
614                            get_runtime().spawn(async move {
615                                let auth_succeeded = match auth_wait {
616                                    Some(rx) => match auth_tracker_for_task
617                                        .wait_for_result(
618                                            Duration::from_secs(AUTHENTICATION_TIMEOUT_SECS),
619                                            rx,
620                                        )
621                                        .await
622                                    {
623                                        Ok(()) => {
624                                            tracing::debug!(
625                                                "Authentication successful after reconnect, proceeding with resubscription",
626                                            );
627                                            true
628                                        }
629                                        Err(e) => {
630                                            tracing::error!(
631                                                "Authentication after reconnect failed: {e}",
632                                            );
633                                            false
634                                        }
635                                    },
636                                    None => true,
637                                };
638
639                                let confirmed_topic_count = subscriptions_state_for_task.len();
640                                if confirmed_topic_count == 0 {
641                                    tracing::debug!(
642                                        "No confirmed subscriptions recorded before reconnect; resubscribe will rely on pending topics"
643                                    );
644                                } else {
645                                    tracing::debug!(confirmed_topic_count, "Confirmed subscriptions recorded before reconnect");
646                                }
647                                let confirmed_topics = subscriptions_state_for_task.confirmed();
648                                if confirmed_topic_count <= 10 {
649                                    let topics: Vec<_> = confirmed_topics
650                                        .iter()
651                                        .map(|entry| entry.key().clone())
652                                        .collect();
653                                    if !topics.is_empty() {
654                                        tracing::trace!(topics = ?topics, "Confirmed topics before reconnect");
655                                    }
656                                }
657                                drop(confirmed_topics);
658
659                                let pending_topics = subscriptions_state_for_task.pending();
660                                let pending_topic_count = pending_topics.len();
661                                if pending_topic_count > 0 {
662                                    tracing::debug!(pending_topic_count, "Pending subscriptions awaiting replay after reconnect");
663                                }
664                                drop(pending_topics);
665
666                                let inner_guard = inner_client_for_task.read().await;
667                                if let Some(client) = &*inner_guard {
668                                    let should_resubscribe = |channel: &OKXWsChannel| {
669                                        if channel_requires_auth(channel) && !auth_succeeded {
670                                            tracing::warn!(
671                                                ?channel,
672                                                "Skipping private channel resubscription due to missing authentication",
673                                            );
674                                            return false;
675                                        }
676                                        true
677                                    };
678
679                                    let mut inst_type_args = Vec::new();
680                                    for entry in subscriptions_inst_type_for_task.iter() {
681                                        let (channel, inst_types) = entry.pair();
682                                        if !should_resubscribe(channel) {
683                                            continue;
684                                        }
685                                        for inst_type in inst_types.iter() {
686                                            let arg = OKXSubscriptionArg {
687                                                channel: channel.clone(),
688                                                inst_type: Some(*inst_type),
689                                                inst_family: None,
690                                                inst_id: None,
691                                            };
692                                            let topic = topic_from_subscription_arg(&arg);
693                                            subscriptions_state_for_task.mark_subscribe(&topic);
694                                            inst_type_args.push(arg);
695                                        }
696                                    }
697                                    if !inst_type_args.is_empty() {
698                                        let sub_request = OKXSubscription {
699                                            op: OKXWsOperation::Subscribe,
700                                            args: inst_type_args,
701                                        };
702                                        if let Err(e) = client
703                                            .send_text(
704                                                serde_json::to_string(&sub_request).unwrap(),
705                                                None,
706                                            )
707                                            .await
708                                        {
709                                            tracing::error!(
710                                                "Failed to re-subscribe inst_type channels: {e}",
711                                            );
712                                        }
713                                    }
714
715                                    let mut inst_family_args = Vec::new();
716                                    for entry in subscriptions_inst_family_for_task.iter() {
717                                        let (channel, inst_families) = entry.pair();
718                                        if !should_resubscribe(channel) {
719                                            continue;
720                                        }
721                                        for inst_family in inst_families.iter() {
722                                            let arg = OKXSubscriptionArg {
723                                                channel: channel.clone(),
724                                                inst_type: None,
725                                                inst_family: Some(*inst_family),
726                                                inst_id: None,
727                                            };
728                                            let topic = topic_from_subscription_arg(&arg);
729                                            subscriptions_state_for_task.mark_subscribe(&topic);
730                                            inst_family_args.push(arg);
731                                        }
732                                    }
733                                    if !inst_family_args.is_empty() {
734                                        let sub_request = OKXSubscription {
735                                            op: OKXWsOperation::Subscribe,
736                                            args: inst_family_args,
737                                        };
738                                        if let Err(e) = client
739                                            .send_text(
740                                                serde_json::to_string(&sub_request).unwrap(),
741                                                None,
742                                            )
743                                            .await
744                                        {
745                                            tracing::error!(
746                                                "Failed to re-subscribe inst_family channels: {e}",
747                                            );
748                                        }
749                                    }
750
751                                    let mut inst_id_args = Vec::new();
752                                    for entry in subscriptions_inst_id_for_task.iter() {
753                                        let (channel, inst_ids) = entry.pair();
754                                        if !should_resubscribe(channel) {
755                                            continue;
756                                        }
757                                        for inst_id in inst_ids.iter() {
758                                            let arg = OKXSubscriptionArg {
759                                                channel: channel.clone(),
760                                                inst_type: None,
761                                                inst_family: None,
762                                                inst_id: Some(*inst_id),
763                                            };
764                                            let topic = topic_from_subscription_arg(&arg);
765                                            subscriptions_state_for_task.mark_subscribe(&topic);
766                                            inst_id_args.push(arg);
767                                        }
768                                    }
769                                    if !inst_id_args.is_empty() {
770                                        let sub_request = OKXSubscription {
771                                            op: OKXWsOperation::Subscribe,
772                                            args: inst_id_args,
773                                        };
774                                        if let Err(e) = client
775                                            .send_text(
776                                                serde_json::to_string(&sub_request).unwrap(),
777                                                None,
778                                            )
779                                            .await
780                                        {
781                                            tracing::error!(
782                                                "Failed to re-subscribe inst_id channels: {e}",
783                                            );
784                                        }
785                                    }
786
787                                    let mut bare_args = Vec::new();
788                                    for entry in subscriptions_bare_for_task.iter() {
789                                        let channel = entry.key();
790                                        if !should_resubscribe(channel) {
791                                            continue;
792                                        }
793                                        let arg = OKXSubscriptionArg {
794                                            channel: channel.clone(),
795                                            inst_type: None,
796                                            inst_family: None,
797                                            inst_id: None,
798                                        };
799                                        let topic = topic_from_subscription_arg(&arg);
800                                        subscriptions_state_for_task.mark_subscribe(&topic);
801                                        bare_args.push(arg);
802                                    }
803                                    if !bare_args.is_empty() {
804                                        let sub_request = OKXSubscription {
805                                            op: OKXWsOperation::Subscribe,
806                                            args: bare_args,
807                                        };
808                                        if let Err(e) = client
809                                            .send_text(
810                                                serde_json::to_string(&sub_request).unwrap(),
811                                                None,
812                                            )
813                                            .await
814                                        {
815                                            tracing::error!(
816                                                "Failed to re-subscribe bare channels: {e}",
817                                            );
818                                        }
819                                    }
820
821                                    tracing::debug!("Completed re-subscription after reconnect");
822                                } else {
823                                    tracing::warn!(
824                                        "Skipping resubscription after reconnect: websocket client unavailable",
825                                    );
826                                }
827                            });
828
829                            continue;
830                        }
831                        Some(msg) => {
832                            if handler.tx.send(msg).is_err() {
833                                tracing::error!(
834                                    "Failed to send message through channel: receiver dropped",
835                                );
836                                break;
837                            }
838                        }
839                        None => {
840                            if handler.is_stopped() {
841                                tracing::debug!(
842                                    "Stop signal received, ending message processing",
843                                );
844                                break;
845                            }
846                            tracing::warn!("WebSocket stream ended unexpectedly");
847                            break;
848                        }
849                    }
850                }
851            }
852        });
853
854        self.task_handle = Some(Arc::new(stream_handle));
855
856        if self.credential.is_some() {
857            self.authenticate().await?;
858        }
859
860        Ok(())
861    }
862
863    /// Authenticates the WebSocket session with OKX.
864    async fn authenticate(&self) -> Result<(), Error> {
865        let credential = self.credential.as_ref().ok_or_else(|| {
866            Error::Io(std::io::Error::other(
867                "API credentials not available to authenticate",
868            ))
869        })?;
870
871        let rx = self.auth_tracker.begin();
872
873        let timestamp = SystemTime::now()
874            .duration_since(SystemTime::UNIX_EPOCH)
875            .expect("System time should be after UNIX epoch")
876            .as_secs()
877            .to_string();
878        let signature = credential.sign(&timestamp, "GET", "/users/self/verify", "");
879
880        let auth_message = OKXAuthentication {
881            op: "login",
882            args: vec![OKXAuthenticationArg {
883                api_key: credential.api_key.to_string(),
884                passphrase: credential.api_passphrase.clone(),
885                timestamp,
886                sign: signature,
887            }],
888        };
889
890        {
891            let inner_guard = self.inner.read().await;
892            if let Some(inner) = &*inner_guard {
893                if let Err(e) = inner
894                    .send_text(serde_json::to_string(&auth_message).unwrap(), None)
895                    .await
896                {
897                    tracing::error!("Error sending auth message: {e:?}");
898                    self.auth_tracker.fail(e.to_string());
899                    return Err(Error::Io(std::io::Error::other(e.to_string())));
900                }
901            } else {
902                log::error!("Cannot authenticate: not connected");
903                self.auth_tracker
904                    .fail("Cannot authenticate: not connected".to_string());
905                return Err(Error::ConnectionClosed);
906            }
907        }
908
909        match self
910            .auth_tracker
911            .wait_for_result(Duration::from_secs(AUTHENTICATION_TIMEOUT_SECS), rx)
912            .await
913        {
914            Ok(()) => {
915                tracing::info!("Authentication confirmed by client");
916                Ok(())
917            }
918            Err(e) => {
919                tracing::error!("Authentication failed: {e}");
920                Err(Error::Io(std::io::Error::other(e.to_string())))
921            }
922        }
923    }
924
925    /// Provides the internal data stream as a channel-based stream.
926    ///
927    /// # Panics
928    ///
929    /// This function panics if:
930    /// - The websocket is not connected.
931    /// - `stream_data` has already been called somewhere else (stream receiver is then taken).
932    pub fn stream(&mut self) -> impl Stream<Item = NautilusWsMessage> + 'static {
933        let rx = self
934            .rx
935            .take()
936            .expect("Data stream receiver already taken or not connected");
937        let mut rx = Arc::try_unwrap(rx).expect("Cannot take ownership - other references exist");
938        async_stream::stream! {
939            while let Some(data) = rx.recv().await {
940                yield data;
941            }
942        }
943    }
944
945    /// Wait until the WebSocket connection is active.
946    ///
947    /// # Errors
948    ///
949    /// Returns an error if the connection times out.
950    pub async fn wait_until_active(&self, timeout_secs: f64) -> Result<(), OKXWsError> {
951        let timeout = tokio::time::Duration::from_secs_f64(timeout_secs);
952
953        tokio::time::timeout(timeout, async {
954            while !self.is_active() {
955                tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
956            }
957        })
958        .await
959        .map_err(|_| {
960            OKXWsError::ClientError(format!(
961                "WebSocket connection timeout after {timeout_secs} seconds"
962            ))
963        })?;
964
965        Ok(())
966    }
967
968    /// Closes the client.
969    ///
970    /// # Errors
971    ///
972    /// Returns an error if disconnecting the websocket or cleaning up the
973    /// client fails.
974    pub async fn close(&mut self) -> Result<(), Error> {
975        log::debug!("Starting close process");
976
977        self.signal.store(true, Ordering::Relaxed);
978
979        {
980            let inner_guard = self.inner.read().await;
981            if let Some(inner) = &*inner_guard {
982                log::debug!("Disconnecting websocket");
983
984                match tokio::time::timeout(Duration::from_secs(3), inner.disconnect()).await {
985                    Ok(()) => log::debug!("Websocket disconnected successfully"),
986                    Err(_) => {
987                        log::warn!(
988                            "Timeout waiting for websocket disconnect, continuing with cleanup"
989                        );
990                    }
991                }
992            } else {
993                log::debug!("No active connection to disconnect");
994            }
995        }
996
997        // Clean up stream handle with timeout
998        if let Some(stream_handle) = self.task_handle.take() {
999            match Arc::try_unwrap(stream_handle) {
1000                Ok(handle) => {
1001                    log::debug!("Waiting for stream handle to complete");
1002                    match tokio::time::timeout(Duration::from_secs(2), handle).await {
1003                        Ok(Ok(())) => log::debug!("Stream handle completed successfully"),
1004                        Ok(Err(e)) => log::error!("Stream handle encountered an error: {e:?}"),
1005                        Err(_) => {
1006                            log::warn!(
1007                                "Timeout waiting for stream handle, task may still be running"
1008                            );
1009                            // The task will be dropped and should clean up automatically
1010                        }
1011                    }
1012                }
1013                Err(arc_handle) => {
1014                    log::debug!(
1015                        "Cannot take ownership of stream handle - other references exist, aborting task"
1016                    );
1017                    arc_handle.abort();
1018                }
1019            }
1020        } else {
1021            log::debug!("No stream handle to await");
1022        }
1023
1024        log::debug!("Close process completed");
1025
1026        Ok(())
1027    }
1028
1029    /// Get active subscriptions for a specific instrument.
1030    pub fn get_subscriptions(&self, instrument_id: InstrumentId) -> Vec<OKXWsChannel> {
1031        let symbol = instrument_id.symbol.inner();
1032        let mut channels = Vec::new();
1033
1034        for entry in self.subscriptions_inst_id.iter() {
1035            let (channel, instruments) = entry.pair();
1036            if instruments.contains(&symbol) {
1037                channels.push(channel.clone());
1038            }
1039        }
1040
1041        channels
1042    }
1043
1044    fn generate_unique_request_id(&self) -> String {
1045        self.request_id_counter
1046            .fetch_add(1, Ordering::SeqCst)
1047            .to_string()
1048    }
1049
1050    #[allow(
1051        clippy::result_large_err,
1052        reason = "OKXWsError contains large tungstenite::Error variant"
1053    )]
1054    fn get_instrument_type_and_family(
1055        &self,
1056        symbol: Ustr,
1057    ) -> Result<(OKXInstrumentType, String), OKXWsError> {
1058        // Fetch instrument from cache
1059        let instrument = self.instruments_cache.get(&symbol).ok_or_else(|| {
1060            OKXWsError::ClientError(format!("Instrument not found in cache: {symbol}"))
1061        })?;
1062
1063        let inst_type =
1064            okx_instrument_type(instrument).map_err(|e| OKXWsError::ClientError(e.to_string()))?;
1065
1066        // Determine instrument family based on instrument type
1067        let inst_family = match instrument {
1068            InstrumentAny::CurrencyPair(_) => symbol.as_str().to_string(),
1069            InstrumentAny::CryptoPerpetual(_) => {
1070                // For SWAP: "BTC-USDT-SWAP" -> "BTC-USDT"
1071                symbol
1072                    .as_str()
1073                    .strip_suffix("-SWAP")
1074                    .unwrap_or(symbol.as_str())
1075                    .to_string()
1076            }
1077            InstrumentAny::CryptoFuture(_) => {
1078                // For FUTURES: extract the underlying pair
1079                let parts: Vec<&str> = symbol.as_str().split('-').collect();
1080                if parts.len() >= 2 {
1081                    format!("{}-{}", parts[0], parts[1])
1082                } else {
1083                    return Err(OKXWsError::ClientError(format!(
1084                        "Unable to parse futures instrument family from symbol: {symbol}",
1085                    )));
1086                }
1087            }
1088            InstrumentAny::CryptoOption(_) => {
1089                // For OPTIONS: "BTC-USD-241217-92000-C" -> "BTC-USD"
1090                let parts: Vec<&str> = symbol.as_str().split('-').collect();
1091                if parts.len() >= 2 {
1092                    format!("{}-{}", parts[0], parts[1])
1093                } else {
1094                    return Err(OKXWsError::ClientError(format!(
1095                        "Unable to parse option instrument family from symbol: {symbol}",
1096                    )));
1097                }
1098            }
1099            _ => {
1100                return Err(OKXWsError::ClientError(format!(
1101                    "Unsupported instrument type: {instrument:?}",
1102                )));
1103            }
1104        };
1105
1106        Ok((inst_type, inst_family))
1107    }
1108
1109    async fn subscribe(&self, args: Vec<OKXSubscriptionArg>) -> Result<(), OKXWsError> {
1110        for arg in &args {
1111            let topic = topic_from_subscription_arg(arg);
1112            self.subscriptions_state.mark_subscribe(&topic);
1113
1114            // Check if this is a bare channel (no inst params)
1115            if arg.inst_type.is_none() && arg.inst_family.is_none() && arg.inst_id.is_none() {
1116                // Track bare channels like Account
1117                self.subscriptions_bare.insert(arg.channel.clone(), true);
1118            } else {
1119                // Update instrument type subscriptions
1120                if let Some(inst_type) = &arg.inst_type {
1121                    self.subscriptions_inst_type
1122                        .entry(arg.channel.clone())
1123                        .or_default()
1124                        .insert(*inst_type);
1125                }
1126
1127                // Update instrument family subscriptions
1128                if let Some(inst_family) = &arg.inst_family {
1129                    self.subscriptions_inst_family
1130                        .entry(arg.channel.clone())
1131                        .or_default()
1132                        .insert(*inst_family);
1133                }
1134
1135                // Update instrument ID subscriptions
1136                if let Some(inst_id) = &arg.inst_id {
1137                    self.subscriptions_inst_id
1138                        .entry(arg.channel.clone())
1139                        .or_default()
1140                        .insert(*inst_id);
1141                }
1142            }
1143        }
1144
1145        let message = OKXSubscription {
1146            op: OKXWsOperation::Subscribe,
1147            args,
1148        };
1149
1150        let json_txt =
1151            serde_json::to_string(&message).map_err(|e| OKXWsError::JsonError(e.to_string()))?;
1152
1153        {
1154            let inner_guard = self.inner.read().await;
1155            if let Some(inner) = &*inner_guard {
1156                if let Err(e) = inner
1157                    .send_text(json_txt, Some(vec!["subscription".to_string()]))
1158                    .await
1159                {
1160                    tracing::error!("Error sending message: {e:?}");
1161                }
1162            } else {
1163                return Err(OKXWsError::ClientError(
1164                    "Cannot send message: not connected".to_string(),
1165                ));
1166            }
1167        }
1168
1169        Ok(())
1170    }
1171
1172    #[allow(clippy::collapsible_if, reason = "Clearer uncollapsed")]
1173    async fn unsubscribe(&self, args: Vec<OKXSubscriptionArg>) -> Result<(), OKXWsError> {
1174        for arg in &args {
1175            let topic = topic_from_subscription_arg(arg);
1176            self.subscriptions_state.mark_unsubscribe(&topic);
1177
1178            // Check if this is a bare channel
1179            if arg.inst_type.is_none() && arg.inst_family.is_none() && arg.inst_id.is_none() {
1180                // Remove bare channel subscription
1181                self.subscriptions_bare.remove(&arg.channel);
1182            } else {
1183                // Update instrument type subscriptions
1184                if let Some(inst_type) = &arg.inst_type {
1185                    if let Some(mut entry) = self.subscriptions_inst_type.get_mut(&arg.channel) {
1186                        entry.remove(inst_type);
1187                        if entry.is_empty() {
1188                            drop(entry);
1189                            self.subscriptions_inst_type.remove(&arg.channel);
1190                        }
1191                    }
1192                }
1193
1194                // Update instrument family subscriptions
1195                if let Some(inst_family) = &arg.inst_family {
1196                    if let Some(mut entry) = self.subscriptions_inst_family.get_mut(&arg.channel) {
1197                        entry.remove(inst_family);
1198                        if entry.is_empty() {
1199                            drop(entry);
1200                            self.subscriptions_inst_family.remove(&arg.channel);
1201                        }
1202                    }
1203                }
1204
1205                // Update instrument ID subscriptions
1206                if let Some(inst_id) = &arg.inst_id {
1207                    if let Some(mut entry) = self.subscriptions_inst_id.get_mut(&arg.channel) {
1208                        entry.remove(inst_id);
1209                        if entry.is_empty() {
1210                            drop(entry);
1211                            self.subscriptions_inst_id.remove(&arg.channel);
1212                        }
1213                    }
1214                }
1215            }
1216        }
1217
1218        let message = OKXSubscription {
1219            op: OKXWsOperation::Unsubscribe,
1220            args,
1221        };
1222
1223        let json_txt = serde_json::to_string(&message).expect("Must be valid JSON");
1224
1225        {
1226            let inner_guard = self.inner.read().await;
1227            if let Some(inner) = &*inner_guard {
1228                if let Err(e) = inner
1229                    .send_text(json_txt, Some(vec!["subscription".to_string()]))
1230                    .await
1231                {
1232                    tracing::error!("Error sending message: {e:?}");
1233                }
1234            } else {
1235                log::error!("Cannot send message: not connected");
1236            }
1237        }
1238
1239        Ok(())
1240    }
1241
1242    /// Unsubscribes from all active subscriptions in batched messages.
1243    ///
1244    /// Collects all confirmed subscriptions and sends unsubscribe requests in batches,
1245    /// which is significantly more efficient than individual unsubscribes during disconnect.
1246    ///
1247    /// # Errors
1248    ///
1249    /// Returns an error if the unsubscribe request fails to send.
1250    pub async fn unsubscribe_all(&self) -> Result<(), OKXWsError> {
1251        let mut all_args = Vec::new();
1252
1253        for entry in self.subscriptions_inst_type.iter() {
1254            let (channel, inst_types) = entry.pair();
1255            for inst_type in inst_types.iter() {
1256                all_args.push(OKXSubscriptionArg {
1257                    channel: channel.clone(),
1258                    inst_type: Some(*inst_type),
1259                    inst_family: None,
1260                    inst_id: None,
1261                });
1262            }
1263        }
1264
1265        for entry in self.subscriptions_inst_family.iter() {
1266            let (channel, inst_families) = entry.pair();
1267            for inst_family in inst_families.iter() {
1268                all_args.push(OKXSubscriptionArg {
1269                    channel: channel.clone(),
1270                    inst_type: None,
1271                    inst_family: Some(*inst_family),
1272                    inst_id: None,
1273                });
1274            }
1275        }
1276
1277        for entry in self.subscriptions_inst_id.iter() {
1278            let (channel, inst_ids) = entry.pair();
1279            for inst_id in inst_ids.iter() {
1280                all_args.push(OKXSubscriptionArg {
1281                    channel: channel.clone(),
1282                    inst_type: None,
1283                    inst_family: None,
1284                    inst_id: Some(*inst_id),
1285                });
1286            }
1287        }
1288
1289        for entry in self.subscriptions_bare.iter() {
1290            let channel = entry.key();
1291            all_args.push(OKXSubscriptionArg {
1292                channel: channel.clone(),
1293                inst_type: None,
1294                inst_family: None,
1295                inst_id: None,
1296            });
1297        }
1298
1299        if all_args.is_empty() {
1300            tracing::debug!("No active subscriptions to unsubscribe from");
1301            return Ok(());
1302        }
1303
1304        tracing::debug!("Batched unsubscribe from {} channels", all_args.len());
1305
1306        const BATCH_SIZE: usize = 256;
1307
1308        for chunk in all_args.chunks(BATCH_SIZE) {
1309            self.unsubscribe(chunk.to_vec()).await?;
1310        }
1311
1312        Ok(())
1313    }
1314
1315    #[allow(dead_code)]
1316    async fn resubscribe_all(&self) {
1317        // Collect bare channel subscriptions (e.g., Account)
1318        let mut subs_bare = Vec::new();
1319        for entry in self.subscriptions_bare.iter() {
1320            let channel = entry.key();
1321            subs_bare.push(channel.clone());
1322        }
1323
1324        let mut subs_inst_type = Vec::new();
1325        for entry in self.subscriptions_inst_type.iter() {
1326            let (channel, inst_types) = entry.pair();
1327            if !inst_types.is_empty() {
1328                subs_inst_type.push((channel.clone(), inst_types.clone()));
1329            }
1330        }
1331
1332        let mut subs_inst_family = Vec::new();
1333        for entry in self.subscriptions_inst_family.iter() {
1334            let (channel, inst_families) = entry.pair();
1335            if !inst_families.is_empty() {
1336                subs_inst_family.push((channel.clone(), inst_families.clone()));
1337            }
1338        }
1339
1340        let mut subs_inst_id = Vec::new();
1341        for entry in self.subscriptions_inst_id.iter() {
1342            let (channel, inst_ids) = entry.pair();
1343            if !inst_ids.is_empty() {
1344                subs_inst_id.push((channel.clone(), inst_ids.clone()));
1345            }
1346        }
1347
1348        // Process instrument type subscriptions
1349        for (channel, inst_types) in subs_inst_type {
1350            if inst_types.is_empty() {
1351                continue;
1352            }
1353
1354            tracing::debug!("Resubscribing: channel={channel}, instrument_types={inst_types:?}");
1355
1356            for inst_type in inst_types {
1357                let arg = OKXSubscriptionArg {
1358                    channel: channel.clone(),
1359                    inst_type: Some(inst_type),
1360                    inst_family: None,
1361                    inst_id: None,
1362                };
1363
1364                if let Err(e) = self.subscribe(vec![arg]).await {
1365                    tracing::error!(
1366                        "Failed to resubscribe to channel {channel} with instrument type: {e}"
1367                    );
1368                }
1369            }
1370        }
1371
1372        // Process instrument family subscriptions
1373        for (channel, inst_families) in subs_inst_family {
1374            if inst_families.is_empty() {
1375                continue;
1376            }
1377
1378            tracing::debug!(
1379                "Resubscribing: channel={channel}, instrument_families={inst_families:?}"
1380            );
1381
1382            for inst_family in inst_families {
1383                let arg = OKXSubscriptionArg {
1384                    channel: channel.clone(),
1385                    inst_type: None,
1386                    inst_family: Some(inst_family),
1387                    inst_id: None,
1388                };
1389
1390                if let Err(e) = self.subscribe(vec![arg]).await {
1391                    tracing::error!(
1392                        "Failed to resubscribe to channel {channel} with instrument family: {e}"
1393                    );
1394                }
1395            }
1396        }
1397
1398        // Process instrument ID subscriptions
1399        for (channel, inst_ids) in subs_inst_id {
1400            if inst_ids.is_empty() {
1401                continue;
1402            }
1403
1404            tracing::debug!("Resubscribing: channel={channel}, instrument_ids={inst_ids:?}");
1405
1406            for inst_id in inst_ids {
1407                let arg = OKXSubscriptionArg {
1408                    channel: channel.clone(),
1409                    inst_type: None,
1410                    inst_family: None,
1411                    inst_id: Some(inst_id),
1412                };
1413
1414                if let Err(e) = self.subscribe(vec![arg]).await {
1415                    tracing::error!(
1416                        "Failed to resubscribe to channel {channel} with instrument ID: {e}"
1417                    );
1418                }
1419            }
1420        }
1421
1422        // Process bare channel subscriptions (e.g., Account)
1423        for channel in subs_bare {
1424            tracing::debug!("Resubscribing to bare channel: {channel}");
1425
1426            let arg = OKXSubscriptionArg {
1427                channel,
1428                inst_type: None,
1429                inst_family: None,
1430                inst_id: None,
1431            };
1432
1433            if let Err(e) = self.subscribe(vec![arg]).await {
1434                tracing::error!("Failed to resubscribe to bare channel: {e}");
1435            }
1436        }
1437    }
1438
1439    /// Subscribes to instrument updates for a specific instrument type.
1440    ///
1441    /// Provides updates when instrument specifications change.
1442    ///
1443    /// # Errors
1444    ///
1445    /// Returns an error if the subscription request fails.
1446    ///
1447    /// # References
1448    ///
1449    /// <https://www.okx.com/docs-v5/en/#public-data-websocket-instruments-channel>.
1450    pub async fn subscribe_instruments(
1451        &self,
1452        instrument_type: OKXInstrumentType,
1453    ) -> Result<(), OKXWsError> {
1454        let arg = OKXSubscriptionArg {
1455            channel: OKXWsChannel::Instruments,
1456            inst_type: Some(instrument_type),
1457            inst_family: None,
1458            inst_id: None,
1459        };
1460        self.subscribe(vec![arg]).await
1461    }
1462
1463    /// Subscribes to instrument updates for a specific instrument.
1464    ///
1465    /// Provides updates when instrument specifications change.
1466    ///
1467    /// # Errors
1468    ///
1469    /// Returns an error if the subscription request fails.
1470    ///
1471    /// # References
1472    ///
1473    /// <https://www.okx.com/docs-v5/en/#public-data-websocket-instruments-channel>.
1474    pub async fn subscribe_instrument(
1475        &self,
1476        instrument_id: InstrumentId,
1477    ) -> Result<(), OKXWsError> {
1478        let arg = OKXSubscriptionArg {
1479            channel: OKXWsChannel::Instruments,
1480            inst_type: None,
1481            inst_family: None,
1482            inst_id: Some(instrument_id.symbol.inner()),
1483        };
1484        self.subscribe(vec![arg]).await
1485    }
1486
1487    /// Subscribes to order book data for an instrument.
1488    ///
1489    /// This is a convenience method that calls [`Self::subscribe_book_with_depth`] with depth 0,
1490    /// which automatically selects the appropriate channel based on VIP level.
1491    ///
1492    /// # Errors
1493    ///
1494    /// Returns an error if the subscription request fails.
1495    pub async fn subscribe_book(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
1496        self.subscribe_book_with_depth(instrument_id, 0).await
1497    }
1498
1499    /// Subscribes to the standard books channel (internal method).
1500    pub(crate) async fn subscribe_books_channel(
1501        &self,
1502        instrument_id: InstrumentId,
1503    ) -> Result<(), OKXWsError> {
1504        let arg = OKXSubscriptionArg {
1505            channel: OKXWsChannel::Books,
1506            inst_type: None,
1507            inst_family: None,
1508            inst_id: Some(instrument_id.symbol.inner()),
1509        };
1510        self.subscribe(vec![arg]).await
1511    }
1512
1513    /// Subscribes to 5-level order book snapshot data for an instrument.
1514    ///
1515    /// Updates every 100ms when there are changes.
1516    ///
1517    /// # Errors
1518    ///
1519    /// Returns an error if the subscription request fails.
1520    ///
1521    /// # References
1522    ///
1523    /// <https://www.okx.com/docs-v5/en/#order-book-trading-market-data-ws-order-book-5-depth-channel>.
1524    pub async fn subscribe_book_depth5(
1525        &self,
1526        instrument_id: InstrumentId,
1527    ) -> Result<(), OKXWsError> {
1528        let arg = OKXSubscriptionArg {
1529            channel: OKXWsChannel::Books5,
1530            inst_type: None,
1531            inst_family: None,
1532            inst_id: Some(instrument_id.symbol.inner()),
1533        };
1534        self.subscribe(vec![arg]).await
1535    }
1536
1537    /// Subscribes to 50-level tick-by-tick order book data for an instrument.
1538    ///
1539    /// Provides real-time updates whenever order book changes.
1540    ///
1541    /// # Errors
1542    ///
1543    /// Returns an error if the subscription request fails.
1544    ///
1545    /// # References
1546    ///
1547    /// <https://www.okx.com/docs-v5/en/#order-book-trading-market-data-ws-order-book-50-depth-tbt-channel>.
1548    pub async fn subscribe_book50_l2_tbt(
1549        &self,
1550        instrument_id: InstrumentId,
1551    ) -> Result<(), OKXWsError> {
1552        let arg = OKXSubscriptionArg {
1553            channel: OKXWsChannel::Books50Tbt,
1554            inst_type: None,
1555            inst_family: None,
1556            inst_id: Some(instrument_id.symbol.inner()),
1557        };
1558        self.subscribe(vec![arg]).await
1559    }
1560
1561    /// Subscribes to tick-by-tick full depth (400 levels) order book data for an instrument.
1562    ///
1563    /// Provides real-time updates with all depth levels whenever order book changes.
1564    ///
1565    /// # Errors
1566    ///
1567    /// Returns an error if the subscription request fails.
1568    ///
1569    /// # References
1570    ///
1571    /// <https://www.okx.com/docs-v5/en/#order-book-trading-market-data-ws-order-book-400-depth-tbt-channel>.
1572    pub async fn subscribe_book_l2_tbt(
1573        &self,
1574        instrument_id: InstrumentId,
1575    ) -> Result<(), OKXWsError> {
1576        let arg = OKXSubscriptionArg {
1577            channel: OKXWsChannel::BooksTbt,
1578            inst_type: None,
1579            inst_family: None,
1580            inst_id: Some(instrument_id.symbol.inner()),
1581        };
1582        self.subscribe(vec![arg]).await
1583    }
1584
1585    /// Subscribes to order book data with automatic channel selection based on VIP level and depth.
1586    ///
1587    /// Selects the optimal channel based on user's VIP tier and requested depth:
1588    /// - depth 50: Requires VIP4+, subscribes to `books50-l2-tbt`
1589    /// - depth 0 or 400:
1590    ///   - VIP5+: subscribes to `books-l2-tbt` (400 depth, fastest)
1591    ///   - Below VIP5: subscribes to `books` (standard depth)
1592    ///
1593    /// # Errors
1594    ///
1595    /// Returns an error if:
1596    /// - Subscription request fails
1597    /// - depth is 50 but VIP level is below 4
1598    pub async fn subscribe_book_with_depth(
1599        &self,
1600        instrument_id: InstrumentId,
1601        depth: u16,
1602    ) -> anyhow::Result<()> {
1603        let vip = self.vip_level();
1604
1605        match depth {
1606            50 => {
1607                if vip < OKXVipLevel::Vip4 {
1608                    anyhow::bail!(
1609                        "VIP level {vip} insufficient for 50 depth subscription (requires VIP4)"
1610                    );
1611                }
1612                self.subscribe_book50_l2_tbt(instrument_id)
1613                    .await
1614                    .map_err(|e| anyhow::anyhow!(e))
1615            }
1616            0 | 400 => {
1617                if vip >= OKXVipLevel::Vip5 {
1618                    self.subscribe_book_l2_tbt(instrument_id)
1619                        .await
1620                        .map_err(|e| anyhow::anyhow!(e))
1621                } else {
1622                    self.subscribe_books_channel(instrument_id)
1623                        .await
1624                        .map_err(|e| anyhow::anyhow!(e))
1625                }
1626            }
1627            _ => anyhow::bail!("Invalid depth {depth}, must be 0, 50, or 400"),
1628        }
1629    }
1630
1631    /// Subscribes to best bid/ask quote data for an instrument.
1632    ///
1633    /// Provides tick-by-tick updates of the best bid and ask prices using the bbo-tbt channel.
1634    /// Supports all instrument types: SPOT, MARGIN, SWAP, FUTURES, OPTION.
1635    ///
1636    /// # Errors
1637    ///
1638    /// Returns an error if the subscription request fails.
1639    ///
1640    /// # References
1641    ///
1642    /// <https://www.okx.com/docs-v5/en/#order-book-trading-market-data-ws-best-bid-offer-channel>.
1643    pub async fn subscribe_quotes(&self, instrument_id: InstrumentId) -> Result<(), OKXWsError> {
1644        let arg = OKXSubscriptionArg {
1645            channel: OKXWsChannel::BboTbt,
1646            inst_type: None,
1647            inst_family: None,
1648            inst_id: Some(instrument_id.symbol.inner()),
1649        };
1650        self.subscribe(vec![arg]).await
1651    }
1652
1653    /// Subscribes to trade data for an instrument.
1654    ///
1655    /// When `aggregated` is `false`, subscribes to the `trades` channel (per-match updates).
1656    /// When `aggregated` is `true`, subscribes to the `trades-all` channel (aggregated updates).
1657    ///
1658    /// # Errors
1659    ///
1660    /// Returns an error if the subscription request fails.
1661    ///
1662    /// # References
1663    ///
1664    /// <https://www.okx.com/docs-v5/en/#order-book-trading-market-data-ws-trades-channel>.
1665    /// <https://www.okx.com/docs-v5/en/#order-book-trading-market-data-ws-all-trades-channel>.
1666    pub async fn subscribe_trades(
1667        &self,
1668        instrument_id: InstrumentId,
1669        aggregated: bool,
1670    ) -> Result<(), OKXWsError> {
1671        let channel = if aggregated {
1672            OKXWsChannel::TradesAll
1673        } else {
1674            OKXWsChannel::Trades
1675        };
1676
1677        let arg = OKXSubscriptionArg {
1678            channel,
1679            inst_type: None,
1680            inst_family: None,
1681            inst_id: Some(instrument_id.symbol.inner()),
1682        };
1683        self.subscribe(vec![arg]).await
1684    }
1685
1686    /// Subscribes to 24hr rolling ticker data for an instrument.
1687    ///
1688    /// Updates every 100ms with trading statistics.
1689    ///
1690    /// # Errors
1691    ///
1692    /// Returns an error if the subscription request fails.
1693    ///
1694    /// # References
1695    ///
1696    /// <https://www.okx.com/docs-v5/en/#order-book-trading-market-data-ws-tickers-channel>.
1697    pub async fn subscribe_ticker(&self, instrument_id: InstrumentId) -> Result<(), OKXWsError> {
1698        let arg = OKXSubscriptionArg {
1699            channel: OKXWsChannel::Tickers,
1700            inst_type: None,
1701            inst_family: None,
1702            inst_id: Some(instrument_id.symbol.inner()),
1703        };
1704        self.subscribe(vec![arg]).await
1705    }
1706
1707    /// Subscribes to mark price data for derivatives instruments.
1708    ///
1709    /// Updates every 200ms for perpetual swaps, or at settlement for futures.
1710    ///
1711    /// # Errors
1712    ///
1713    /// Returns an error if the subscription request fails.
1714    ///
1715    /// # References
1716    ///
1717    /// <https://www.okx.com/docs-v5/en/#public-data-websocket-mark-price-channel>.
1718    pub async fn subscribe_mark_prices(
1719        &self,
1720        instrument_id: InstrumentId,
1721    ) -> Result<(), OKXWsError> {
1722        let arg = OKXSubscriptionArg {
1723            channel: OKXWsChannel::MarkPrice,
1724            inst_type: None,
1725            inst_family: None,
1726            inst_id: Some(instrument_id.symbol.inner()),
1727        };
1728        self.subscribe(vec![arg]).await
1729    }
1730
1731    /// Subscribes to index price data for an instrument.
1732    ///
1733    /// Updates every second with the underlying index price.
1734    ///
1735    /// # Errors
1736    ///
1737    /// Returns an error if the subscription request fails.
1738    ///
1739    /// # References
1740    ///
1741    /// <https://www.okx.com/docs-v5/en/#public-data-websocket-index-tickers-channel>.
1742    pub async fn subscribe_index_prices(
1743        &self,
1744        instrument_id: InstrumentId,
1745    ) -> Result<(), OKXWsError> {
1746        let arg = OKXSubscriptionArg {
1747            channel: OKXWsChannel::IndexTickers,
1748            inst_type: None,
1749            inst_family: None,
1750            inst_id: Some(instrument_id.symbol.inner()),
1751        };
1752        self.subscribe(vec![arg]).await
1753    }
1754
1755    /// Subscribes to funding rate data for perpetual swap instruments.
1756    ///
1757    /// Updates when funding rate changes or at funding intervals.
1758    ///
1759    /// # Errors
1760    ///
1761    /// Returns an error if the subscription request fails.
1762    ///
1763    /// # References
1764    ///
1765    /// <https://www.okx.com/docs-v5/en/#public-data-websocket-funding-rate-channel>.
1766    pub async fn subscribe_funding_rates(
1767        &self,
1768        instrument_id: InstrumentId,
1769    ) -> Result<(), OKXWsError> {
1770        let arg = OKXSubscriptionArg {
1771            channel: OKXWsChannel::FundingRate,
1772            inst_type: None,
1773            inst_family: None,
1774            inst_id: Some(instrument_id.symbol.inner()),
1775        };
1776        self.subscribe(vec![arg]).await
1777    }
1778
1779    /// Subscribes to candlestick/bar data for an instrument.
1780    ///
1781    /// Supports various time intervals from 1s to 3M.
1782    ///
1783    /// # Errors
1784    ///
1785    /// Returns an error if the subscription request fails.
1786    ///
1787    /// # References
1788    ///
1789    /// <https://www.okx.com/docs-v5/en/#order-book-trading-market-data-ws-candlesticks-channel>.
1790    pub async fn subscribe_bars(&self, bar_type: BarType) -> Result<(), OKXWsError> {
1791        // Use regular trade-price candlesticks which work for all instrument types
1792        let channel = bar_spec_as_okx_channel(bar_type.spec())
1793            .map_err(|e| OKXWsError::ClientError(e.to_string()))?;
1794
1795        let arg = OKXSubscriptionArg {
1796            channel,
1797            inst_type: None,
1798            inst_family: None,
1799            inst_id: Some(bar_type.instrument_id().symbol.inner()),
1800        };
1801        self.subscribe(vec![arg]).await
1802    }
1803
1804    /// Unsubscribes from instrument updates for a specific instrument type.
1805    ///
1806    /// # Errors
1807    ///
1808    /// Returns an error if the subscription request fails.
1809    pub async fn unsubscribe_instruments(
1810        &self,
1811        instrument_type: OKXInstrumentType,
1812    ) -> Result<(), OKXWsError> {
1813        let arg = OKXSubscriptionArg {
1814            channel: OKXWsChannel::Instruments,
1815            inst_type: Some(instrument_type),
1816            inst_family: None,
1817            inst_id: None,
1818        };
1819        self.unsubscribe(vec![arg]).await
1820    }
1821
1822    /// Unsubscribe from instrument updates for a specific instrument.
1823    ///
1824    /// # Errors
1825    ///
1826    /// Returns an error if the subscription request fails.
1827    pub async fn unsubscribe_instrument(
1828        &self,
1829        instrument_id: InstrumentId,
1830    ) -> Result<(), OKXWsError> {
1831        let arg = OKXSubscriptionArg {
1832            channel: OKXWsChannel::Instruments,
1833            inst_type: None,
1834            inst_family: None,
1835            inst_id: Some(instrument_id.symbol.inner()),
1836        };
1837        self.unsubscribe(vec![arg]).await
1838    }
1839
1840    /// Unsubscribe from full order book data for an instrument.
1841    ///
1842    /// # Errors
1843    ///
1844    /// Returns an error if the subscription request fails.
1845    pub async fn unsubscribe_book(&self, instrument_id: InstrumentId) -> Result<(), OKXWsError> {
1846        let arg = OKXSubscriptionArg {
1847            channel: OKXWsChannel::Books,
1848            inst_type: None,
1849            inst_family: None,
1850            inst_id: Some(instrument_id.symbol.inner()),
1851        };
1852        self.unsubscribe(vec![arg]).await
1853    }
1854
1855    /// Unsubscribe from 5-level order book snapshot data for an instrument.
1856    ///
1857    /// # Errors
1858    ///
1859    /// Returns an error if the subscription request fails.
1860    pub async fn unsubscribe_book_depth5(
1861        &self,
1862        instrument_id: InstrumentId,
1863    ) -> Result<(), OKXWsError> {
1864        let arg = OKXSubscriptionArg {
1865            channel: OKXWsChannel::Books5,
1866            inst_type: None,
1867            inst_family: None,
1868            inst_id: Some(instrument_id.symbol.inner()),
1869        };
1870        self.unsubscribe(vec![arg]).await
1871    }
1872
1873    /// Unsubscribe from 50-level tick-by-tick order book data for an instrument.
1874    ///
1875    /// # Errors
1876    ///
1877    /// Returns an error if the subscription request fails.
1878    pub async fn unsubscribe_book50_l2_tbt(
1879        &self,
1880        instrument_id: InstrumentId,
1881    ) -> Result<(), OKXWsError> {
1882        let arg = OKXSubscriptionArg {
1883            channel: OKXWsChannel::Books50Tbt,
1884            inst_type: None,
1885            inst_family: None,
1886            inst_id: Some(instrument_id.symbol.inner()),
1887        };
1888        self.unsubscribe(vec![arg]).await
1889    }
1890
1891    /// Unsubscribe from tick-by-tick full depth order book data for an instrument.
1892    ///
1893    /// # Errors
1894    ///
1895    /// Returns an error if the subscription request fails.
1896    pub async fn unsubscribe_book_l2_tbt(
1897        &self,
1898        instrument_id: InstrumentId,
1899    ) -> Result<(), OKXWsError> {
1900        let arg = OKXSubscriptionArg {
1901            channel: OKXWsChannel::BooksTbt,
1902            inst_type: None,
1903            inst_family: None,
1904            inst_id: Some(instrument_id.symbol.inner()),
1905        };
1906        self.unsubscribe(vec![arg]).await
1907    }
1908
1909    /// Unsubscribe from best bid/ask quote data for an instrument.
1910    ///
1911    /// # Errors
1912    ///
1913    /// Returns an error if the subscription request fails.
1914    pub async fn unsubscribe_quotes(&self, instrument_id: InstrumentId) -> Result<(), OKXWsError> {
1915        let arg = OKXSubscriptionArg {
1916            channel: OKXWsChannel::BboTbt,
1917            inst_type: None,
1918            inst_family: None,
1919            inst_id: Some(instrument_id.symbol.inner()),
1920        };
1921        self.unsubscribe(vec![arg]).await
1922    }
1923
1924    /// Unsubscribe from 24hr rolling ticker data for an instrument.
1925    ///
1926    /// # Errors
1927    ///
1928    /// Returns an error if the subscription request fails.
1929    pub async fn unsubscribe_ticker(&self, instrument_id: InstrumentId) -> Result<(), OKXWsError> {
1930        let arg = OKXSubscriptionArg {
1931            channel: OKXWsChannel::Tickers,
1932            inst_type: None,
1933            inst_family: None,
1934            inst_id: Some(instrument_id.symbol.inner()),
1935        };
1936        self.unsubscribe(vec![arg]).await
1937    }
1938
1939    /// Unsubscribe from mark price data for a derivatives instrument.
1940    ///
1941    /// # Errors
1942    ///
1943    /// Returns an error if the subscription request fails.
1944    pub async fn unsubscribe_mark_prices(
1945        &self,
1946        instrument_id: InstrumentId,
1947    ) -> Result<(), OKXWsError> {
1948        let arg = OKXSubscriptionArg {
1949            channel: OKXWsChannel::MarkPrice,
1950            inst_type: None,
1951            inst_family: None,
1952            inst_id: Some(instrument_id.symbol.inner()),
1953        };
1954        self.unsubscribe(vec![arg]).await
1955    }
1956
1957    /// Unsubscribe from index price data for an instrument.
1958    ///
1959    /// # Errors
1960    ///
1961    /// Returns an error if the subscription request fails.
1962    pub async fn unsubscribe_index_prices(
1963        &self,
1964        instrument_id: InstrumentId,
1965    ) -> Result<(), OKXWsError> {
1966        let arg = OKXSubscriptionArg {
1967            channel: OKXWsChannel::IndexTickers,
1968            inst_type: None,
1969            inst_family: None,
1970            inst_id: Some(instrument_id.symbol.inner()),
1971        };
1972        self.unsubscribe(vec![arg]).await
1973    }
1974
1975    /// Unsubscribe from funding rate data for a perpetual swap instrument.
1976    ///
1977    /// # Errors
1978    ///
1979    /// Returns an error if the subscription request fails.
1980    pub async fn unsubscribe_funding_rates(
1981        &self,
1982        instrument_id: InstrumentId,
1983    ) -> Result<(), OKXWsError> {
1984        let arg = OKXSubscriptionArg {
1985            channel: OKXWsChannel::FundingRate,
1986            inst_type: None,
1987            inst_family: None,
1988            inst_id: Some(instrument_id.symbol.inner()),
1989        };
1990        self.unsubscribe(vec![arg]).await
1991    }
1992
1993    /// Unsubscribe from trade data for an instrument.
1994    ///
1995    /// # Errors
1996    ///
1997    /// Returns an error if the subscription request fails.
1998    pub async fn unsubscribe_trades(
1999        &self,
2000        instrument_id: InstrumentId,
2001        aggregated: bool,
2002    ) -> Result<(), OKXWsError> {
2003        let channel = if aggregated {
2004            OKXWsChannel::TradesAll
2005        } else {
2006            OKXWsChannel::Trades
2007        };
2008
2009        let arg = OKXSubscriptionArg {
2010            channel,
2011            inst_type: None,
2012            inst_family: None,
2013            inst_id: Some(instrument_id.symbol.inner()),
2014        };
2015        self.unsubscribe(vec![arg]).await
2016    }
2017
2018    /// Unsubscribe from candlestick/bar data for an instrument.
2019    ///
2020    /// # Errors
2021    ///
2022    /// Returns an error if the subscription request fails.
2023    pub async fn unsubscribe_bars(&self, bar_type: BarType) -> Result<(), OKXWsError> {
2024        // Use regular trade-price candlesticks which work for all instrument types
2025        let channel = bar_spec_as_okx_channel(bar_type.spec())
2026            .map_err(|e| OKXWsError::ClientError(e.to_string()))?;
2027
2028        let arg = OKXSubscriptionArg {
2029            channel,
2030            inst_type: None,
2031            inst_family: None,
2032            inst_id: Some(bar_type.instrument_id().symbol.inner()),
2033        };
2034        self.unsubscribe(vec![arg]).await
2035    }
2036
2037    /// Subscribes to order updates for the given instrument type.
2038    ///
2039    /// # Errors
2040    ///
2041    /// Returns an error if the subscription request fails.
2042    pub async fn subscribe_orders(
2043        &self,
2044        instrument_type: OKXInstrumentType,
2045    ) -> Result<(), OKXWsError> {
2046        let arg = OKXSubscriptionArg {
2047            channel: OKXWsChannel::Orders,
2048            inst_type: Some(instrument_type),
2049            inst_family: None,
2050            inst_id: None,
2051        };
2052        self.subscribe(vec![arg]).await
2053    }
2054
2055    /// Unsubscribes from order updates for the given instrument type.
2056    ///
2057    /// # Errors
2058    ///
2059    /// Returns an error if the subscription request fails.
2060    pub async fn unsubscribe_orders(
2061        &self,
2062        instrument_type: OKXInstrumentType,
2063    ) -> Result<(), OKXWsError> {
2064        let arg = OKXSubscriptionArg {
2065            channel: OKXWsChannel::Orders,
2066            inst_type: Some(instrument_type),
2067            inst_family: None,
2068            inst_id: None,
2069        };
2070        self.unsubscribe(vec![arg]).await
2071    }
2072
2073    /// Subscribes to algo order updates for the given instrument type.
2074    ///
2075    /// # Errors
2076    ///
2077    /// Returns an error if the subscription request fails.
2078    pub async fn subscribe_orders_algo(
2079        &self,
2080        instrument_type: OKXInstrumentType,
2081    ) -> Result<(), OKXWsError> {
2082        let arg = OKXSubscriptionArg {
2083            channel: OKXWsChannel::OrdersAlgo,
2084            inst_type: Some(instrument_type),
2085            inst_family: None,
2086            inst_id: None,
2087        };
2088        self.subscribe(vec![arg]).await
2089    }
2090
2091    /// Unsubscribes from algo order updates for the given instrument type.
2092    ///
2093    /// # Errors
2094    ///
2095    /// Returns an error if the subscription request fails.
2096    pub async fn unsubscribe_orders_algo(
2097        &self,
2098        instrument_type: OKXInstrumentType,
2099    ) -> Result<(), OKXWsError> {
2100        let arg = OKXSubscriptionArg {
2101            channel: OKXWsChannel::OrdersAlgo,
2102            inst_type: Some(instrument_type),
2103            inst_family: None,
2104            inst_id: None,
2105        };
2106        self.unsubscribe(vec![arg]).await
2107    }
2108
2109    /// Subscribes to fill updates for the given instrument type.
2110    ///
2111    /// # Errors
2112    ///
2113    /// Returns an error if the subscription request fails.
2114    pub async fn subscribe_fills(
2115        &self,
2116        instrument_type: OKXInstrumentType,
2117    ) -> Result<(), OKXWsError> {
2118        let arg = OKXSubscriptionArg {
2119            channel: OKXWsChannel::Fills,
2120            inst_type: Some(instrument_type),
2121            inst_family: None,
2122            inst_id: None,
2123        };
2124        self.subscribe(vec![arg]).await
2125    }
2126
2127    /// Unsubscribes from fill updates for the given instrument type.
2128    ///
2129    /// # Errors
2130    ///
2131    /// Returns an error if the subscription request fails.
2132    pub async fn unsubscribe_fills(
2133        &self,
2134        instrument_type: OKXInstrumentType,
2135    ) -> Result<(), OKXWsError> {
2136        let arg = OKXSubscriptionArg {
2137            channel: OKXWsChannel::Fills,
2138            inst_type: Some(instrument_type),
2139            inst_family: None,
2140            inst_id: None,
2141        };
2142        self.unsubscribe(vec![arg]).await
2143    }
2144
2145    /// Subscribes to account balance updates.
2146    ///
2147    /// # Errors
2148    ///
2149    /// Returns an error if the subscription request fails.
2150    pub async fn subscribe_account(&self) -> Result<(), OKXWsError> {
2151        let arg = OKXSubscriptionArg {
2152            channel: OKXWsChannel::Account,
2153            inst_type: None,
2154            inst_family: None,
2155            inst_id: None,
2156        };
2157        self.subscribe(vec![arg]).await
2158    }
2159
2160    /// Unsubscribes from account balance updates.
2161    ///
2162    /// # Errors
2163    ///
2164    /// Returns an error if the subscription request fails.
2165    pub async fn unsubscribe_account(&self) -> Result<(), OKXWsError> {
2166        let arg = OKXSubscriptionArg {
2167            channel: OKXWsChannel::Account,
2168            inst_type: None,
2169            inst_family: None,
2170            inst_id: None,
2171        };
2172        self.unsubscribe(vec![arg]).await
2173    }
2174
2175    /// Cancel an existing order via WebSocket.
2176    ///
2177    /// # References
2178    ///
2179    /// <https://www.okx.com/docs-v5/en/#order-book-trading-websocket-cancel-order>
2180    async fn ws_cancel_order(
2181        &self,
2182        params: WsCancelOrderParams,
2183        request_id: Option<String>,
2184    ) -> Result<(), OKXWsError> {
2185        let request_id = request_id.unwrap_or(self.generate_unique_request_id());
2186
2187        let req = OKXWsRequest {
2188            id: Some(request_id),
2189            op: OKXWsOperation::CancelOrder,
2190            args: vec![params],
2191            exp_time: None,
2192        };
2193
2194        let txt = serde_json::to_string(&req).map_err(|e| OKXWsError::JsonError(e.to_string()))?;
2195
2196        {
2197            let inner_guard = self.inner.read().await;
2198            if let Some(inner) = &*inner_guard {
2199                if let Err(e) = inner.send_text(txt, Some(vec!["cancel".to_string()])).await {
2200                    tracing::error!("Error sending message: {e:?}");
2201                }
2202                Ok(())
2203            } else {
2204                Err(OKXWsError::ClientError("Not connected".to_string()))
2205            }
2206        }
2207    }
2208
2209    /// Cancel multiple orders at once via WebSocket.
2210    ///
2211    /// # References
2212    ///
2213    /// <https://www.okx.com/docs-v5/en/#order-book-trading-websocket-mass-cancel-order>
2214    async fn ws_mass_cancel_with_id(
2215        &self,
2216        args: Vec<Value>,
2217        request_id: String,
2218    ) -> Result<(), OKXWsError> {
2219        let req = OKXWsRequest {
2220            id: Some(request_id),
2221            op: OKXWsOperation::MassCancel,
2222            args,
2223            exp_time: None,
2224        };
2225
2226        let txt = serde_json::to_string(&req).map_err(|e| OKXWsError::JsonError(e.to_string()))?;
2227
2228        {
2229            let inner_guard = self.inner.read().await;
2230            if let Some(inner) = &*inner_guard {
2231                if let Err(e) = inner.send_text(txt, Some(vec!["cancel".to_string()])).await {
2232                    tracing::error!("Error sending message: {e:?}");
2233                }
2234                Ok(())
2235            } else {
2236                Err(OKXWsError::ClientError("Not connected".to_string()))
2237            }
2238        }
2239    }
2240
2241    /// Amend an existing order via WebSocket.
2242    ///
2243    /// # References
2244    ///
2245    /// <https://www.okx.com/docs-v5/en/#order-book-trading-websocket-amend-order>
2246    async fn ws_amend_order(
2247        &self,
2248        params: WsAmendOrderParams,
2249        request_id: Option<String>,
2250    ) -> Result<(), OKXWsError> {
2251        let request_id = request_id.unwrap_or(self.generate_unique_request_id());
2252
2253        let req = OKXWsRequest {
2254            id: Some(request_id),
2255            op: OKXWsOperation::AmendOrder,
2256            args: vec![params],
2257            exp_time: None,
2258        };
2259
2260        let txt = serde_json::to_string(&req).map_err(|e| OKXWsError::JsonError(e.to_string()))?;
2261
2262        {
2263            let inner_guard = self.inner.read().await;
2264            if let Some(inner) = &*inner_guard {
2265                if let Err(e) = inner.send_text(txt, Some(vec!["amend".to_string()])).await {
2266                    tracing::error!("Error sending message: {e:?}");
2267                }
2268                Ok(())
2269            } else {
2270                Err(OKXWsError::ClientError("Not connected".to_string()))
2271            }
2272        }
2273    }
2274
2275    /// Place multiple orders in a single batch via WebSocket.
2276    ///
2277    /// # References
2278    ///
2279    /// <https://www.okx.com/docs-v5/en/#order-book-trading-websocket-batch-orders>
2280    async fn ws_batch_place_orders(&self, args: Vec<Value>) -> Result<(), OKXWsError> {
2281        let request_id = self.generate_unique_request_id();
2282
2283        let req = OKXWsRequest {
2284            id: Some(request_id),
2285            op: OKXWsOperation::BatchOrders,
2286            args,
2287            exp_time: None,
2288        };
2289
2290        let txt = serde_json::to_string(&req).map_err(|e| OKXWsError::JsonError(e.to_string()))?;
2291
2292        {
2293            let inner_guard = self.inner.read().await;
2294            if let Some(inner) = &*inner_guard {
2295                if let Err(e) = inner.send_text(txt, Some(vec!["order".to_string()])).await {
2296                    tracing::error!("Error sending message: {e:?}");
2297                }
2298                Ok(())
2299            } else {
2300                Err(OKXWsError::ClientError("Not connected".to_string()))
2301            }
2302        }
2303    }
2304
2305    /// Cancel multiple orders in a single batch via WebSocket.
2306    ///
2307    /// # References
2308    ///
2309    /// <https://www.okx.com/docs-v5/en/#order-book-trading-websocket-batch-cancel-orders>
2310    async fn ws_batch_cancel_orders(&self, args: Vec<Value>) -> Result<(), OKXWsError> {
2311        let request_id = self.generate_unique_request_id();
2312
2313        let req = OKXWsRequest {
2314            id: Some(request_id),
2315            op: OKXWsOperation::BatchCancelOrders,
2316            args,
2317            exp_time: None,
2318        };
2319
2320        let txt = serde_json::to_string(&req).map_err(|e| OKXWsError::JsonError(e.to_string()))?;
2321
2322        {
2323            let inner_guard = self.inner.read().await;
2324            if let Some(inner) = &*inner_guard {
2325                if let Err(e) = inner.send_text(txt, Some(vec!["cancel".to_string()])).await {
2326                    tracing::error!("Error sending message: {e:?}");
2327                }
2328                Ok(())
2329            } else {
2330                Err(OKXWsError::ClientError("Not connected".to_string()))
2331            }
2332        }
2333    }
2334
2335    /// Amend multiple orders in a single batch via WebSocket.
2336    ///
2337    /// # References
2338    ///
2339    /// <https://www.okx.com/docs-v5/en/#order-book-trading-websocket-batch-amend-orders>
2340    async fn ws_batch_amend_orders(&self, args: Vec<Value>) -> Result<(), OKXWsError> {
2341        let request_id = self.generate_unique_request_id();
2342
2343        let req = OKXWsRequest {
2344            id: Some(request_id),
2345            op: OKXWsOperation::BatchAmendOrders,
2346            args,
2347            exp_time: None,
2348        };
2349
2350        let txt = serde_json::to_string(&req).map_err(|e| OKXWsError::JsonError(e.to_string()))?;
2351
2352        {
2353            let inner_guard = self.inner.read().await;
2354            if let Some(inner) = &*inner_guard {
2355                if let Err(e) = inner.send_text(txt, Some(vec!["amend".to_string()])).await {
2356                    tracing::error!("Error sending message: {e:?}");
2357                }
2358                Ok(())
2359            } else {
2360                Err(OKXWsError::ClientError("Not connected".to_string()))
2361            }
2362        }
2363    }
2364
2365    /// Submits an order, automatically routing conditional orders to the algo endpoint.
2366    ///
2367    /// # Errors
2368    ///
2369    /// Returns an error if the order parameters are invalid or if the request
2370    /// cannot be sent to the websocket client.
2371    ///
2372    /// # References
2373    ///
2374    /// - Regular orders: <https://www.okx.com/docs-v5/en/#order-book-trading-trade-ws-place-order>
2375    /// - Algo orders: <https://www.okx.com/docs-v5/en/#order-book-trading-algo-trading-post-place-algo-order>
2376    #[allow(clippy::too_many_arguments)]
2377    pub async fn submit_order(
2378        &self,
2379        trader_id: TraderId,
2380        strategy_id: StrategyId,
2381        instrument_id: InstrumentId,
2382        td_mode: OKXTradeMode,
2383        client_order_id: ClientOrderId,
2384        order_side: OrderSide,
2385        order_type: OrderType,
2386        quantity: Quantity,
2387        time_in_force: Option<TimeInForce>,
2388        price: Option<Price>,
2389        trigger_price: Option<Price>,
2390        post_only: Option<bool>,
2391        reduce_only: Option<bool>,
2392        quote_quantity: Option<bool>,
2393        position_side: Option<PositionSide>,
2394    ) -> Result<(), OKXWsError> {
2395        if !OKX_SUPPORTED_ORDER_TYPES.contains(&order_type) {
2396            return Err(OKXWsError::ClientError(format!(
2397                "Unsupported order type: {order_type:?}",
2398            )));
2399        }
2400
2401        if let Some(tif) = time_in_force
2402            && !OKX_SUPPORTED_TIME_IN_FORCE.contains(&tif)
2403        {
2404            return Err(OKXWsError::ClientError(format!(
2405                "Unsupported time in force: {tif:?}",
2406            )));
2407        }
2408
2409        let mut builder = WsPostOrderParamsBuilder::default();
2410
2411        builder.inst_id(instrument_id.symbol.as_str());
2412        builder.td_mode(td_mode);
2413        builder.cl_ord_id(client_order_id.as_str());
2414
2415        let instrument = self
2416            .instruments_cache
2417            .get(&instrument_id.symbol.inner())
2418            .ok_or_else(|| {
2419                OKXWsError::ClientError(format!("Unknown instrument {instrument_id}"))
2420            })?;
2421
2422        let instrument_type =
2423            okx_instrument_type(instrument).map_err(|e| OKXWsError::ClientError(e.to_string()))?;
2424        let quote_currency = instrument.quote_currency();
2425
2426        match instrument_type {
2427            OKXInstrumentType::Spot => {
2428                // SPOT: ccy parameter is required by OKX for spot trading
2429                builder.ccy(quote_currency.to_string());
2430            }
2431            OKXInstrumentType::Margin => {
2432                builder.ccy(quote_currency.to_string());
2433
2434                if let Some(ro) = reduce_only
2435                    && ro
2436                {
2437                    builder.reduce_only(ro);
2438                }
2439            }
2440            OKXInstrumentType::Swap | OKXInstrumentType::Futures => {
2441                // SWAP/FUTURES: use quote currency for margin (required by OKX)
2442                builder.ccy(quote_currency.to_string());
2443
2444                // For derivatives, posSide is required by OKX
2445                // Use Net for one-way mode (default for NETTING OMS)
2446                if position_side.is_none() {
2447                    builder.pos_side(OKXPositionSide::Net);
2448                }
2449            }
2450            _ => {
2451                builder.ccy(quote_currency.to_string());
2452
2453                // For derivatives, posSide is required
2454                if position_side.is_none() {
2455                    builder.pos_side(OKXPositionSide::Net);
2456                }
2457
2458                if let Some(ro) = reduce_only
2459                    && ro
2460                {
2461                    builder.reduce_only(ro);
2462                }
2463            }
2464        };
2465
2466        // For SPOT market orders in Cash mode, handle tgtCcy parameter
2467        // https://www.okx.com/docs-v5/en/#order-book-trading-trade-post-place-order
2468        // OKX API default behavior for SPOT market orders:
2469        // - BUY orders default to tgtCcy=quote_ccy (sz represents quote currency amount)
2470        // - SELL orders default to tgtCcy=base_ccy (sz represents base currency amount)
2471        // Note: tgtCcy is ONLY supported for Cash trading mode, not for margin modes (Cross/Isolated)
2472        if instrument_type == OKXInstrumentType::Spot
2473            && order_type == OrderType::Market
2474            && td_mode == OKXTradeMode::Cash
2475        {
2476            match quote_quantity {
2477                Some(true) => {
2478                    // Explicitly request quote currency sizing
2479                    builder.tgt_ccy(OKXTargetCurrency::QuoteCcy);
2480                }
2481                Some(false) => {
2482                    if order_side == OrderSide::Buy {
2483                        // For BUY orders, must explicitly set to base_ccy to override OKX default
2484                        builder.tgt_ccy(OKXTargetCurrency::BaseCcy);
2485                    }
2486                    // For SELL orders with quote_quantity=false, omit tgtCcy (OKX defaults to base_ccy correctly)
2487                }
2488                None => {
2489                    // No preference specified, use OKX defaults
2490                }
2491            }
2492        }
2493
2494        builder.side(order_side);
2495
2496        if let Some(pos_side) = position_side {
2497            builder.pos_side(pos_side);
2498        };
2499
2500        let (okx_ord_type, price) = if post_only.unwrap_or(false) {
2501            (OKXOrderType::PostOnly, price)
2502        } else {
2503            (OKXOrderType::from(order_type), price)
2504        };
2505
2506        log::debug!(
2507            "Order type mapping: order_type={:?}, time_in_force={:?}, post_only={:?} -> okx_ord_type={:?}",
2508            order_type,
2509            time_in_force,
2510            post_only,
2511            okx_ord_type
2512        );
2513
2514        builder.ord_type(okx_ord_type);
2515        builder.sz(quantity.to_string());
2516
2517        if let Some(tp) = trigger_price {
2518            builder.px(tp.to_string());
2519        } else if let Some(p) = price {
2520            builder.px(p.to_string());
2521        }
2522
2523        builder.tag(OKX_NAUTILUS_BROKER_ID);
2524
2525        let params = builder
2526            .build()
2527            .map_err(|e| OKXWsError::ClientError(format!("Build order params error: {e}")))?;
2528
2529        let request_id = self.generate_unique_request_id();
2530
2531        self.pending_place_requests.insert(
2532            request_id.clone(),
2533            (
2534                PendingOrderParams::Regular(params.clone()),
2535                client_order_id,
2536                trader_id,
2537                strategy_id,
2538                instrument_id,
2539            ),
2540        );
2541
2542        self.active_client_orders
2543            .insert(client_order_id, (trader_id, strategy_id, instrument_id));
2544
2545        self.retry_manager
2546            .execute_with_retry_with_cancel(
2547                "submit_order",
2548                || {
2549                    let params = params.clone();
2550                    let request_id = request_id.clone();
2551                    async move { self.ws_place_order(params, Some(request_id)).await }
2552                },
2553                should_retry_okx_error,
2554                create_okx_timeout_error,
2555                &self.cancellation_token,
2556            )
2557            .await
2558    }
2559
2560    /// Cancels an existing order.
2561    ///
2562    /// # Errors
2563    ///
2564    /// Returns an error if the cancel parameters are invalid or if the
2565    /// cancellation request fails to send.
2566    ///
2567    /// # References
2568    ///
2569    /// <https://www.okx.com/docs-v5/en/#order-book-trading-trade-ws-cancel-order>.
2570    #[allow(clippy::too_many_arguments)]
2571    pub async fn cancel_order(
2572        &self,
2573        trader_id: TraderId,
2574        strategy_id: StrategyId,
2575        instrument_id: InstrumentId,
2576        client_order_id: Option<ClientOrderId>,
2577        venue_order_id: Option<VenueOrderId>,
2578    ) -> Result<(), OKXWsError> {
2579        let mut builder = WsCancelOrderParamsBuilder::default();
2580        // Note: instType should NOT be included in cancel order requests
2581        // For WebSocket orders, use the full symbol (including SWAP/FUTURES suffix if present)
2582        builder.inst_id(instrument_id.symbol.as_str());
2583
2584        if let Some(venue_order_id) = venue_order_id {
2585            builder.ord_id(venue_order_id.as_str());
2586        }
2587
2588        // Set client order ID before building params (fix for potential bug)
2589        if let Some(client_order_id) = client_order_id {
2590            builder.cl_ord_id(client_order_id.as_str());
2591        }
2592
2593        let params = builder
2594            .build()
2595            .map_err(|e| OKXWsError::ClientError(format!("Build cancel params error: {e}")))?;
2596
2597        let request_id = self.generate_unique_request_id();
2598
2599        // External orders may not have a client order ID,
2600        // for now we just track those with a client order ID as pending requests.
2601        if let Some(client_order_id) = client_order_id {
2602            self.pending_cancel_requests.insert(
2603                request_id.clone(),
2604                (
2605                    client_order_id,
2606                    trader_id,
2607                    strategy_id,
2608                    instrument_id,
2609                    venue_order_id,
2610                ),
2611            );
2612        }
2613
2614        self.retry_manager
2615            .execute_with_retry_with_cancel(
2616                "cancel_order",
2617                || {
2618                    let params = params.clone();
2619                    let request_id = request_id.clone();
2620                    async move { self.ws_cancel_order(params, Some(request_id)).await }
2621                },
2622                should_retry_okx_error,
2623                create_okx_timeout_error,
2624                &self.cancellation_token,
2625            )
2626            .await
2627    }
2628
2629    /// Place a new order via WebSocket.
2630    ///
2631    /// # References
2632    ///
2633    /// <https://www.okx.com/docs-v5/en/#order-book-trading-websocket-place-order>
2634    async fn ws_place_order(
2635        &self,
2636        params: WsPostOrderParams,
2637        request_id: Option<String>,
2638    ) -> Result<(), OKXWsError> {
2639        let request_id = request_id.unwrap_or(self.generate_unique_request_id());
2640
2641        let req = OKXWsRequest {
2642            id: Some(request_id),
2643            op: OKXWsOperation::Order,
2644            exp_time: None,
2645            args: vec![params],
2646        };
2647
2648        let txt = serde_json::to_string(&req).map_err(|e| OKXWsError::JsonError(e.to_string()))?;
2649
2650        {
2651            let inner_guard = self.inner.read().await;
2652            if let Some(inner) = &*inner_guard {
2653                if let Err(e) = inner.send_text(txt, Some(vec!["order".to_string()])).await {
2654                    tracing::error!("Error sending message: {e:?}");
2655                }
2656                Ok(())
2657            } else {
2658                Err(OKXWsError::ClientError("Not connected".to_string()))
2659            }
2660        }
2661    }
2662
2663    /// Modifies an existing order.
2664    ///
2665    /// # Errors
2666    ///
2667    /// Returns an error if the amend parameters are invalid or if the
2668    /// websocket request fails to send.
2669    ///
2670    /// # References
2671    ///
2672    /// <https://www.okx.com/docs-v5/en/#order-book-trading-trade-ws-amend-order>.
2673    #[allow(clippy::too_many_arguments)]
2674    pub async fn modify_order(
2675        &self,
2676        trader_id: TraderId,
2677        strategy_id: StrategyId,
2678        instrument_id: InstrumentId,
2679        client_order_id: Option<ClientOrderId>,
2680        price: Option<Price>,
2681        quantity: Option<Quantity>,
2682        venue_order_id: Option<VenueOrderId>,
2683    ) -> Result<(), OKXWsError> {
2684        let mut builder = WsAmendOrderParamsBuilder::default();
2685
2686        builder.inst_id(instrument_id.symbol.as_str());
2687
2688        if let Some(venue_order_id) = venue_order_id {
2689            builder.ord_id(venue_order_id.as_str());
2690        }
2691
2692        if let Some(client_order_id) = client_order_id {
2693            builder.cl_ord_id(client_order_id.as_str());
2694        }
2695
2696        if let Some(price) = price {
2697            builder.new_px(price.to_string());
2698        }
2699
2700        if let Some(quantity) = quantity {
2701            builder.new_sz(quantity.to_string());
2702        }
2703
2704        let params = builder
2705            .build()
2706            .map_err(|e| OKXWsError::ClientError(format!("Build amend params error: {e}")))?;
2707
2708        // Generate unique request ID for WebSocket message
2709        let request_id = self
2710            .request_id_counter
2711            .fetch_add(1, Ordering::SeqCst)
2712            .to_string();
2713
2714        // External orders may not have a client order ID,
2715        // for now we just track those with a client order ID as pending requests.
2716        if let Some(client_order_id) = client_order_id {
2717            self.pending_amend_requests.insert(
2718                request_id.clone(),
2719                (
2720                    client_order_id,
2721                    trader_id,
2722                    strategy_id,
2723                    instrument_id,
2724                    venue_order_id,
2725                ),
2726            );
2727        }
2728
2729        self.retry_manager
2730            .execute_with_retry_with_cancel(
2731                "modify_order",
2732                || {
2733                    let params = params.clone();
2734                    let request_id = request_id.clone();
2735                    async move { self.ws_amend_order(params, Some(request_id)).await }
2736                },
2737                should_retry_okx_error,
2738                create_okx_timeout_error,
2739                &self.cancellation_token,
2740            )
2741            .await
2742    }
2743
2744    /// Submits multiple orders.
2745    ///
2746    /// # Errors
2747    ///
2748    /// Returns an error if any batch order parameters are invalid or if the
2749    /// batch request fails to send.
2750    #[allow(clippy::type_complexity)]
2751    #[allow(clippy::too_many_arguments)]
2752    pub async fn batch_submit_orders(
2753        &self,
2754        orders: Vec<(
2755            OKXInstrumentType,
2756            InstrumentId,
2757            OKXTradeMode,
2758            ClientOrderId,
2759            OrderSide,
2760            Option<PositionSide>,
2761            OrderType,
2762            Quantity,
2763            Option<Price>,
2764            Option<Price>,
2765            Option<bool>,
2766            Option<bool>,
2767        )>,
2768    ) -> Result<(), OKXWsError> {
2769        let mut args: Vec<Value> = Vec::with_capacity(orders.len());
2770        for (
2771            inst_type,
2772            inst_id,
2773            td_mode,
2774            cl_ord_id,
2775            ord_side,
2776            pos_side,
2777            ord_type,
2778            qty,
2779            pr,
2780            tp,
2781            post_only,
2782            reduce_only,
2783        ) in orders
2784        {
2785            let mut builder = WsPostOrderParamsBuilder::default();
2786            builder.inst_type(inst_type);
2787            builder.inst_id(inst_id.symbol.inner());
2788            builder.td_mode(td_mode);
2789            builder.cl_ord_id(cl_ord_id.as_str());
2790            builder.side(ord_side);
2791
2792            if let Some(ps) = pos_side {
2793                builder.pos_side(OKXPositionSide::from(ps));
2794            }
2795
2796            let okx_ord_type = if post_only.unwrap_or(false) {
2797                OKXOrderType::PostOnly
2798            } else {
2799                OKXOrderType::from(ord_type)
2800            };
2801
2802            builder.ord_type(okx_ord_type);
2803            builder.sz(qty.to_string());
2804
2805            if let Some(p) = pr {
2806                builder.px(p.to_string());
2807            } else if let Some(p) = tp {
2808                builder.px(p.to_string());
2809            }
2810
2811            if let Some(ro) = reduce_only {
2812                builder.reduce_only(ro);
2813            }
2814
2815            builder.tag(OKX_NAUTILUS_BROKER_ID);
2816
2817            let params = builder
2818                .build()
2819                .map_err(|e| OKXWsError::ClientError(format!("Build order params error: {e}")))?;
2820            let val =
2821                serde_json::to_value(params).map_err(|e| OKXWsError::JsonError(e.to_string()))?;
2822            args.push(val);
2823        }
2824
2825        self.ws_batch_place_orders(args).await
2826    }
2827
2828    /// Cancels multiple orders.
2829    ///
2830    /// Supports up to 20 orders per batch.
2831    ///
2832    /// # Errors
2833    ///
2834    /// Returns an error if cancel parameters are invalid or if the batch
2835    /// request fails to send.
2836    ///
2837    /// # References
2838    ///
2839    /// <https://www.okx.com/docs-v5/en/#order-book-trading-websocket-batch-cancel-orders>
2840    #[allow(clippy::type_complexity)]
2841    pub async fn batch_cancel_orders(
2842        &self,
2843        orders: Vec<(InstrumentId, Option<ClientOrderId>, Option<VenueOrderId>)>,
2844    ) -> Result<(), OKXWsError> {
2845        let mut args: Vec<Value> = Vec::with_capacity(orders.len());
2846        for (inst_id, cl_ord_id, ord_id) in orders {
2847            let mut builder = WsCancelOrderParamsBuilder::default();
2848            // Note: instType should NOT be included in cancel order requests
2849            builder.inst_id(inst_id.symbol.inner());
2850
2851            if let Some(c) = cl_ord_id {
2852                builder.cl_ord_id(c.as_str());
2853            }
2854
2855            if let Some(o) = ord_id {
2856                builder.ord_id(o.as_str());
2857            }
2858
2859            let params = builder.build().map_err(|e| {
2860                OKXWsError::ClientError(format!("Build cancel batch params error: {e}"))
2861            })?;
2862            let val =
2863                serde_json::to_value(params).map_err(|e| OKXWsError::JsonError(e.to_string()))?;
2864            args.push(val);
2865        }
2866
2867        self.ws_batch_cancel_orders(args).await
2868    }
2869
2870    /// Mass cancels all orders for a given instrument via WebSocket.
2871    ///
2872    /// # Errors
2873    ///
2874    /// Returns an error if instrument metadata cannot be resolved or if the
2875    /// cancel request fails to send.
2876    ///
2877    /// # Parameters
2878    /// - `inst_id`: The instrument ID. The instrument type will be automatically determined from the symbol.
2879    ///
2880    /// # References
2881    /// <https://www.okx.com/docs-v5/en/#order-book-trading-websocket-mass-cancel-order>
2882    /// Helper function to determine instrument type and family from symbol using instruments cache.
2883    pub async fn mass_cancel_orders(&self, inst_id: InstrumentId) -> Result<(), OKXWsError> {
2884        let (inst_type, inst_family) =
2885            self.get_instrument_type_and_family(inst_id.symbol.inner())?;
2886
2887        let params = WsMassCancelParams {
2888            inst_type,
2889            inst_family: Ustr::from(&inst_family),
2890        };
2891
2892        let args =
2893            vec![serde_json::to_value(params).map_err(|e| OKXWsError::JsonError(e.to_string()))?];
2894
2895        let request_id = self.generate_unique_request_id();
2896
2897        self.pending_mass_cancel_requests
2898            .insert(request_id.clone(), inst_id);
2899
2900        self.retry_manager
2901            .execute_with_retry_with_cancel(
2902                "mass_cancel_orders",
2903                || {
2904                    let args = args.clone();
2905                    let request_id = request_id.clone();
2906                    async move { self.ws_mass_cancel_with_id(args, request_id).await }
2907                },
2908                should_retry_okx_error,
2909                create_okx_timeout_error,
2910                &self.cancellation_token,
2911            )
2912            .await
2913    }
2914
2915    /// Modifies multiple orders via WebSocket using Nautilus domain types.
2916    ///
2917    /// # Errors
2918    ///
2919    /// Returns an error if amend parameters are invalid or if the batch request
2920    /// fails to send.
2921    #[allow(clippy::type_complexity)]
2922    #[allow(clippy::too_many_arguments)]
2923    pub async fn batch_modify_orders(
2924        &self,
2925        orders: Vec<(
2926            OKXInstrumentType,
2927            InstrumentId,
2928            ClientOrderId,
2929            ClientOrderId,
2930            Option<Price>,
2931            Option<Quantity>,
2932        )>,
2933    ) -> Result<(), OKXWsError> {
2934        let mut args: Vec<Value> = Vec::with_capacity(orders.len());
2935        for (_inst_type, inst_id, cl_ord_id, new_cl_ord_id, pr, sz) in orders {
2936            let mut builder = WsAmendOrderParamsBuilder::default();
2937            // Note: instType should NOT be included in amend order requests
2938            builder.inst_id(inst_id.symbol.inner());
2939            builder.cl_ord_id(cl_ord_id.as_str());
2940            builder.new_cl_ord_id(new_cl_ord_id.as_str());
2941
2942            if let Some(p) = pr {
2943                builder.new_px(p.to_string());
2944            }
2945
2946            if let Some(q) = sz {
2947                builder.new_sz(q.to_string());
2948            }
2949
2950            let params = builder.build().map_err(|e| {
2951                OKXWsError::ClientError(format!("Build amend batch params error: {e}"))
2952            })?;
2953            let val =
2954                serde_json::to_value(params).map_err(|e| OKXWsError::JsonError(e.to_string()))?;
2955            args.push(val);
2956        }
2957
2958        self.ws_batch_amend_orders(args).await
2959    }
2960
2961    /// Submits an algo order (conditional/stop order).
2962    ///
2963    /// # Errors
2964    ///
2965    /// Returns an error if the order parameters are invalid or if the request
2966    /// cannot be sent.
2967    ///
2968    /// # References
2969    ///
2970    /// <https://www.okx.com/docs-v5/en/#order-book-trading-algo-trading-post-place-algo-order>
2971    #[allow(clippy::too_many_arguments)]
2972    pub async fn submit_algo_order(
2973        &self,
2974        trader_id: TraderId,
2975        strategy_id: StrategyId,
2976        instrument_id: InstrumentId,
2977        td_mode: OKXTradeMode,
2978        client_order_id: ClientOrderId,
2979        order_side: OrderSide,
2980        order_type: OrderType,
2981        quantity: Quantity,
2982        trigger_price: Price,
2983        trigger_type: Option<TriggerType>,
2984        limit_price: Option<Price>,
2985        reduce_only: Option<bool>,
2986    ) -> Result<(), OKXWsError> {
2987        if !is_conditional_order(order_type) {
2988            return Err(OKXWsError::ClientError(format!(
2989                "Order type {order_type:?} is not a conditional order"
2990            )));
2991        }
2992
2993        let mut builder = WsPostAlgoOrderParamsBuilder::default();
2994        if !matches!(order_side, OrderSide::Buy | OrderSide::Sell) {
2995            return Err(OKXWsError::ClientError(
2996                "Invalid order side for OKX".to_string(),
2997            ));
2998        }
2999
3000        builder.inst_id(instrument_id.symbol.inner());
3001        builder.td_mode(td_mode);
3002        builder.cl_ord_id(client_order_id.as_str());
3003        builder.side(order_side);
3004        builder.ord_type(
3005            conditional_order_to_algo_type(order_type)
3006                .map_err(|e| OKXWsError::ClientError(e.to_string()))?,
3007        );
3008        builder.sz(quantity.to_string());
3009        builder.trigger_px(trigger_price.to_string());
3010
3011        // Map Nautilus TriggerType to OKX trigger type
3012        let okx_trigger_type = trigger_type.map_or(OKXTriggerType::Last, Into::into);
3013        builder.trigger_px_type(okx_trigger_type);
3014
3015        // For stop-limit orders, set the limit price
3016        if matches!(order_type, OrderType::StopLimit | OrderType::LimitIfTouched)
3017            && let Some(price) = limit_price
3018        {
3019            builder.order_px(price.to_string());
3020        }
3021
3022        if let Some(reduce) = reduce_only {
3023            builder.reduce_only(reduce);
3024        }
3025
3026        builder.tag(OKX_NAUTILUS_BROKER_ID);
3027
3028        let params = builder
3029            .build()
3030            .map_err(|e| OKXWsError::ClientError(format!("Build algo order params error: {e}")))?;
3031
3032        let request_id = self.generate_unique_request_id();
3033
3034        self.pending_place_requests.insert(
3035            request_id.clone(),
3036            (
3037                PendingOrderParams::Algo(()),
3038                client_order_id,
3039                trader_id,
3040                strategy_id,
3041                instrument_id,
3042            ),
3043        );
3044
3045        self.retry_manager
3046            .execute_with_retry_with_cancel(
3047                "submit_algo_order",
3048                || {
3049                    let params = params.clone();
3050                    let request_id = request_id.clone();
3051                    async move { self.ws_place_algo_order(params, Some(request_id)).await }
3052                },
3053                should_retry_okx_error,
3054                create_okx_timeout_error,
3055                &self.cancellation_token,
3056            )
3057            .await
3058    }
3059
3060    /// Cancels an algo order.
3061    ///
3062    /// # Errors
3063    ///
3064    /// Returns an error if cancel parameters are invalid or if the request
3065    /// fails to send.
3066    ///
3067    /// # References
3068    ///
3069    /// <https://www.okx.com/docs-v5/en/#order-book-trading-algo-trading-post-cancel-algo-order>
3070    pub async fn cancel_algo_order(
3071        &self,
3072        trader_id: TraderId,
3073        strategy_id: StrategyId,
3074        instrument_id: InstrumentId,
3075        client_order_id: Option<ClientOrderId>,
3076        algo_order_id: Option<String>,
3077    ) -> Result<(), OKXWsError> {
3078        let mut builder = WsCancelAlgoOrderParamsBuilder::default();
3079        builder.inst_id(instrument_id.symbol.inner());
3080
3081        if let Some(client_order_id) = client_order_id {
3082            builder.algo_cl_ord_id(client_order_id.as_str());
3083        }
3084
3085        if let Some(algo_id) = algo_order_id {
3086            builder.algo_id(algo_id);
3087        }
3088
3089        let params = builder
3090            .build()
3091            .map_err(|e| OKXWsError::ClientError(format!("Build cancel algo params error: {e}")))?;
3092
3093        let request_id = self.generate_unique_request_id();
3094
3095        // Track pending cancellation if we have a client order ID
3096        if let Some(client_order_id) = client_order_id {
3097            self.pending_cancel_requests.insert(
3098                request_id.clone(),
3099                (client_order_id, trader_id, strategy_id, instrument_id, None),
3100            );
3101        }
3102
3103        self.retry_manager
3104            .execute_with_retry_with_cancel(
3105                "cancel_algo_order",
3106                || {
3107                    let params = params.clone();
3108                    let request_id = request_id.clone();
3109                    async move { self.ws_cancel_algo_order(params, Some(request_id)).await }
3110                },
3111                should_retry_okx_error,
3112                create_okx_timeout_error,
3113                &self.cancellation_token,
3114            )
3115            .await
3116    }
3117
3118    /// Place a new algo order via WebSocket.
3119    async fn ws_place_algo_order(
3120        &self,
3121        params: WsPostAlgoOrderParams,
3122        request_id: Option<String>,
3123    ) -> Result<(), OKXWsError> {
3124        let request_id = request_id.unwrap_or(self.generate_unique_request_id());
3125
3126        let req = OKXWsRequest {
3127            id: Some(request_id),
3128            op: OKXWsOperation::OrderAlgo,
3129            exp_time: None,
3130            args: vec![params],
3131        };
3132
3133        let txt = serde_json::to_string(&req).map_err(|e| OKXWsError::JsonError(e.to_string()))?;
3134
3135        {
3136            let inner_guard = self.inner.read().await;
3137            if let Some(inner) = &*inner_guard {
3138                if let Err(e) = inner
3139                    .send_text(txt, Some(vec!["orders-algo".to_string()]))
3140                    .await
3141                {
3142                    tracing::error!("Error sending algo order message: {e:?}");
3143                }
3144                Ok(())
3145            } else {
3146                Err(OKXWsError::ClientError("Not connected".to_string()))
3147            }
3148        }
3149    }
3150
3151    /// Cancel an algo order via WebSocket.
3152    async fn ws_cancel_algo_order(
3153        &self,
3154        params: WsCancelAlgoOrderParams,
3155        request_id: Option<String>,
3156    ) -> Result<(), OKXWsError> {
3157        let request_id = request_id.unwrap_or(self.generate_unique_request_id());
3158
3159        let req = OKXWsRequest {
3160            id: Some(request_id),
3161            op: OKXWsOperation::CancelAlgos,
3162            exp_time: None,
3163            args: vec![params],
3164        };
3165
3166        let txt = serde_json::to_string(&req).map_err(|e| OKXWsError::JsonError(e.to_string()))?;
3167
3168        {
3169            let inner_guard = self.inner.read().await;
3170            if let Some(inner) = &*inner_guard {
3171                if let Err(e) = inner
3172                    .send_text(txt, Some(vec!["cancel-algos".to_string()]))
3173                    .await
3174                {
3175                    tracing::error!("Error sending cancel algo message: {e:?}");
3176                }
3177                Ok(())
3178            } else {
3179                Err(OKXWsError::ClientError("Not connected".to_string()))
3180            }
3181        }
3182    }
3183}
3184
3185struct OKXFeedHandler {
3186    receiver: UnboundedReceiver<Message>,
3187    signal: Arc<AtomicBool>,
3188}
3189
3190impl OKXFeedHandler {
3191    /// Creates a new [`OKXFeedHandler`] instance.
3192    pub fn new(receiver: UnboundedReceiver<Message>, signal: Arc<AtomicBool>) -> Self {
3193        Self { receiver, signal }
3194    }
3195
3196    /// Gets the next message from the WebSocket stream.
3197    async fn next(&mut self) -> Option<OKXWebSocketEvent> {
3198        loop {
3199            tokio::select! {
3200                msg = self.receiver.recv() => match msg {
3201                    Some(msg) => match msg {
3202                        Message::Text(text) => {
3203                            // Handle ping/pong messages
3204                            if text == TEXT_PONG {
3205                                tracing::trace!("Received pong from OKX");
3206                                continue;
3207                            }
3208                            if text == TEXT_PING {
3209                                tracing::trace!("Received ping from OKX (text)");
3210                                return Some(OKXWebSocketEvent::Ping);
3211                            }
3212
3213                            // Check for reconnection signal
3214                            if text == RECONNECTED {
3215                                tracing::debug!("Received WebSocket reconnection signal");
3216                                return Some(OKXWebSocketEvent::Reconnected);
3217                            }
3218                            tracing::trace!("Received WebSocket message: {text}");
3219
3220                            match serde_json::from_str(&text) {
3221                                Ok(ws_event) => match &ws_event {
3222                                    OKXWebSocketEvent::Error { code, msg } => {
3223                                        tracing::error!("WebSocket error: {code} - {msg}");
3224                                        return Some(ws_event);
3225                                    }
3226                                    OKXWebSocketEvent::Login {
3227                                        event,
3228                                        code,
3229                                        msg,
3230                                        conn_id,
3231                                    } => {
3232                                        if code == "0" {
3233                                            tracing::info!(
3234                                                "Successfully authenticated with OKX WebSocket, conn_id={conn_id}"
3235                                            );
3236                                        } else {
3237                                            tracing::error!(
3238                                                "Authentication failed: {event} {code} - {msg}"
3239                                            );
3240                                        }
3241                                        return Some(ws_event);
3242                                    }
3243                                    OKXWebSocketEvent::Subscription {
3244                                        event,
3245                                        arg,
3246                                        conn_id, .. } => {
3247                                        let channel_str = serde_json::to_string(&arg.channel)
3248                                            .expect("Invalid OKX websocket channel")
3249                                            .trim_matches('"')
3250                                            .to_string();
3251                                        tracing::debug!(
3252                                            "{event}d: channel={channel_str}, conn_id={conn_id}"
3253                                        );
3254                                        continue;
3255                                    }
3256                                    OKXWebSocketEvent::ChannelConnCount {
3257                                        event: _,
3258                                        channel,
3259                                        conn_count,
3260                                        conn_id,
3261                                    } => {
3262                                        let channel_str = serde_json::to_string(&channel)
3263                                            .expect("Invalid OKX websocket channel")
3264                                            .trim_matches('"')
3265                                            .to_string();
3266                                        tracing::debug!(
3267                                            "Channel connection status: channel={channel_str}, connections={conn_count}, conn_id={conn_id}",
3268                                        );
3269                                        continue;
3270                                    }
3271                                    OKXWebSocketEvent::Ping => {
3272                                        tracing::trace!("Ignoring ping event parsed from text payload");
3273                                        continue;
3274                                    }
3275                                    OKXWebSocketEvent::Data { .. } => return Some(ws_event),
3276                                    OKXWebSocketEvent::BookData { .. } => return Some(ws_event),
3277                                    OKXWebSocketEvent::OrderResponse {
3278                                        id,
3279                                        op,
3280                                        code,
3281                                        msg: _,
3282                                        data,
3283                                    } => {
3284                                        if code == "0" {
3285                                            tracing::debug!(
3286                                                "Order operation successful: id={:?}, op={op}, code={code}",
3287                                                id
3288                                            );
3289
3290                                            // Extract success message
3291                                            if let Some(order_data) = data.first() {
3292                                                let success_msg = order_data
3293                                                    .get("sMsg")
3294                                                    .and_then(|s| s.as_str())
3295                                                    .unwrap_or("Order operation successful");
3296                                                tracing::debug!("Order success details: {success_msg}");
3297                                            }
3298                                        }
3299                                        return Some(ws_event);
3300                                    }
3301                                    OKXWebSocketEvent::Reconnected => {
3302                                        // This shouldn't happen as we handle RECONNECTED string directly
3303                                        tracing::warn!("Unexpected Reconnected event from deserialization");
3304                                        continue;
3305                                    }
3306                                },
3307                                Err(e) => {
3308                                    tracing::error!("Failed to parse message: {e}: {text}");
3309                                    return None;
3310                                }
3311                            }
3312                        }
3313                        Message::Ping(payload) => {
3314                            tracing::trace!("Received ping frame from OKX ({} bytes)", payload.len());
3315                            continue;
3316                        }
3317                        Message::Pong(payload) => {
3318                            tracing::trace!("Received pong frame from OKX ({} bytes)", payload.len());
3319                            continue;
3320                        }
3321                        Message::Binary(msg) => {
3322                            tracing::debug!("Raw binary: {msg:?}");
3323                        }
3324                        Message::Close(_) => {
3325                            tracing::debug!("Received close message");
3326                            return None;
3327                        }
3328                        msg => {
3329                            tracing::warn!("Unexpected message: {msg}");
3330                        }
3331                    }
3332                    None => {
3333                        tracing::info!("WebSocket stream closed");
3334                        return None;
3335                    }
3336                },
3337                _ = tokio::time::sleep(Duration::from_millis(1)) => {
3338                    if self.signal.load(std::sync::atomic::Ordering::Relaxed) {
3339                        tracing::debug!("Stop signal received");
3340                        return None;
3341                    }
3342                }
3343            }
3344        }
3345    }
3346}
3347
3348struct OKXWsMessageHandler {
3349    account_id: AccountId,
3350    inner: Arc<tokio::sync::RwLock<Option<WebSocketClient>>>,
3351    handler: OKXFeedHandler,
3352    #[allow(dead_code)]
3353    tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
3354    pending_place_requests: Arc<DashMap<String, PlaceRequestData>>,
3355    pending_cancel_requests: Arc<DashMap<String, CancelRequestData>>,
3356    pending_amend_requests: Arc<DashMap<String, AmendRequestData>>,
3357    pending_mass_cancel_requests: Arc<DashMap<String, MassCancelRequestData>>,
3358    active_client_orders: Arc<DashMap<ClientOrderId, (TraderId, StrategyId, InstrumentId)>>,
3359    client_id_aliases: Arc<DashMap<ClientOrderId, ClientOrderId>>,
3360    emitted_order_accepted: Arc<DashMap<VenueOrderId, ()>>,
3361    instruments_cache: Arc<AHashMap<Ustr, InstrumentAny>>,
3362    last_account_state: Option<AccountState>,
3363    fee_cache: AHashMap<Ustr, Money>,           // Key is order ID
3364    filled_qty_cache: AHashMap<Ustr, Quantity>, // Key is order ID
3365    funding_rate_cache: AHashMap<Ustr, (Ustr, u64)>, // Cache (funding_rate, funding_time) by inst_id
3366    auth_tracker: AuthTracker,
3367    pending_messages: VecDeque<NautilusWsMessage>,
3368    subscriptions_state: SubscriptionState,
3369}
3370
3371impl OKXWsMessageHandler {
3372    fn schedule_text_pong(&self) {
3373        let inner = self.inner.clone();
3374        get_runtime().spawn(async move {
3375            let guard = inner.read().await;
3376
3377            if let Some(client) = guard.as_ref() {
3378                if let Err(e) = client.send_text(TEXT_PONG.to_string(), None).await {
3379                    tracing::warn!(error = %e, "Failed to send pong response to OKX text ping");
3380                } else {
3381                    tracing::trace!("Sent pong response to OKX text ping");
3382                }
3383            } else {
3384                tracing::debug!("Received text ping with no active websocket client");
3385            }
3386        });
3387    }
3388
3389    fn try_handle_post_only_auto_cancel(
3390        &mut self,
3391        msg: &OKXOrderMsg,
3392        ts_init: UnixNanos,
3393        exec_reports: &mut Vec<ExecutionReport>,
3394    ) -> bool {
3395        if !Self::is_post_only_auto_cancel(msg) {
3396            return false;
3397        }
3398
3399        let Some(client_order_id) = parse_client_order_id(&msg.cl_ord_id) else {
3400            return false;
3401        };
3402
3403        let Some((_, (trader_id, strategy_id, instrument_id))) =
3404            self.active_client_orders.remove(&client_order_id)
3405        else {
3406            return false;
3407        };
3408
3409        self.client_id_aliases.remove(&client_order_id);
3410
3411        if !exec_reports.is_empty() {
3412            let reports = std::mem::take(exec_reports);
3413            self.pending_messages
3414                .push_back(NautilusWsMessage::ExecutionReports(reports));
3415        }
3416
3417        let reason = msg
3418            .cancel_source_reason
3419            .as_ref()
3420            .filter(|reason| !reason.is_empty())
3421            .map_or_else(
3422                || Ustr::from(OKX_POST_ONLY_CANCEL_REASON),
3423                |reason| Ustr::from(reason.as_str()),
3424            );
3425
3426        let ts_event = parse_millisecond_timestamp(msg.u_time);
3427        let rejected = OrderRejected::new(
3428            trader_id,
3429            strategy_id,
3430            instrument_id,
3431            client_order_id,
3432            self.account_id,
3433            reason,
3434            UUID4::new(),
3435            ts_event,
3436            ts_init,
3437            false,
3438            true,
3439        );
3440
3441        self.pending_messages
3442            .push_back(NautilusWsMessage::OrderRejected(rejected));
3443
3444        true
3445    }
3446
3447    fn is_post_only_auto_cancel(msg: &OKXOrderMsg) -> bool {
3448        if msg.state != OKXOrderStatus::Canceled {
3449            return false;
3450        }
3451
3452        let cancel_source_matches = matches!(
3453            msg.cancel_source.as_deref(),
3454            Some(source) if source == OKX_POST_ONLY_CANCEL_SOURCE
3455        );
3456
3457        let reason_matches = matches!(
3458            msg.cancel_source_reason.as_deref(),
3459            Some(reason) if reason.contains("POST_ONLY")
3460        );
3461
3462        if !(cancel_source_matches || reason_matches) {
3463            return false;
3464        }
3465
3466        msg.acc_fill_sz
3467            .as_ref()
3468            .is_none_or(|filled| filled == "0" || filled.is_empty())
3469    }
3470
3471    fn register_client_order_aliases(
3472        &self,
3473        raw_child: &Option<ClientOrderId>,
3474        parent_from_msg: &Option<ClientOrderId>,
3475    ) -> Option<ClientOrderId> {
3476        if let Some(parent) = parent_from_msg {
3477            self.client_id_aliases.insert(*parent, *parent);
3478            if let Some(child) = raw_child.as_ref().filter(|child| **child != *parent) {
3479                self.client_id_aliases.insert(*child, *parent);
3480            }
3481            Some(*parent)
3482        } else if let Some(child) = raw_child.as_ref() {
3483            if let Some(mapped) = self.client_id_aliases.get(child) {
3484                Some(*mapped.value())
3485            } else {
3486                self.client_id_aliases.insert(*child, *child);
3487                Some(*child)
3488            }
3489        } else {
3490            None
3491        }
3492    }
3493
3494    fn adjust_execution_report(
3495        &self,
3496        report: ExecutionReport,
3497        effective_client_id: &Option<ClientOrderId>,
3498        raw_child: &Option<ClientOrderId>,
3499    ) -> ExecutionReport {
3500        match report {
3501            ExecutionReport::Order(status_report) => {
3502                let mut adjusted = status_report;
3503                let mut final_id = *effective_client_id;
3504
3505                if final_id.is_none() {
3506                    final_id = adjusted.client_order_id;
3507                }
3508
3509                if final_id.is_none()
3510                    && let Some(child) = raw_child.as_ref()
3511                    && let Some(mapped) = self.client_id_aliases.get(child)
3512                {
3513                    final_id = Some(*mapped.value());
3514                }
3515
3516                if let Some(final_id_value) = final_id {
3517                    if adjusted.client_order_id != Some(final_id_value) {
3518                        adjusted = adjusted.with_client_order_id(final_id_value);
3519                    }
3520                    self.client_id_aliases
3521                        .insert(final_id_value, final_id_value);
3522
3523                    if let Some(child) =
3524                        raw_child.as_ref().filter(|child| **child != final_id_value)
3525                    {
3526                        adjusted = adjusted.with_linked_order_ids(vec![*child]);
3527                    }
3528                }
3529
3530                ExecutionReport::Order(adjusted)
3531            }
3532            ExecutionReport::Fill(mut fill_report) => {
3533                let mut final_id = *effective_client_id;
3534                if final_id.is_none() {
3535                    final_id = fill_report.client_order_id;
3536                }
3537                if final_id.is_none()
3538                    && let Some(child) = raw_child.as_ref()
3539                    && let Some(mapped) = self.client_id_aliases.get(child)
3540                {
3541                    final_id = Some(*mapped.value());
3542                }
3543
3544                if let Some(final_id_value) = final_id {
3545                    fill_report.client_order_id = Some(final_id_value);
3546                    self.client_id_aliases
3547                        .insert(final_id_value, final_id_value);
3548                }
3549
3550                ExecutionReport::Fill(fill_report)
3551            }
3552        }
3553    }
3554
3555    fn update_caches_with_report(&mut self, report: &ExecutionReport) {
3556        match report {
3557            ExecutionReport::Fill(fill_report) => {
3558                let order_id = fill_report.venue_order_id.inner();
3559                let current_fee = self
3560                    .fee_cache
3561                    .get(&order_id)
3562                    .copied()
3563                    .unwrap_or_else(|| Money::new(0.0, fill_report.commission.currency));
3564                let total_fee = current_fee + fill_report.commission;
3565                self.fee_cache.insert(order_id, total_fee);
3566
3567                let current_filled_qty = self
3568                    .filled_qty_cache
3569                    .get(&order_id)
3570                    .copied()
3571                    .unwrap_or_else(|| Quantity::zero(fill_report.last_qty.precision));
3572                let total_filled_qty = current_filled_qty + fill_report.last_qty;
3573                self.filled_qty_cache.insert(order_id, total_filled_qty);
3574            }
3575            ExecutionReport::Order(status_report) => {
3576                if matches!(status_report.order_status, OrderStatus::Filled) {
3577                    self.fee_cache.remove(&status_report.venue_order_id.inner());
3578                    self.filled_qty_cache
3579                        .remove(&status_report.venue_order_id.inner());
3580                }
3581
3582                if matches!(
3583                    status_report.order_status,
3584                    OrderStatus::Canceled
3585                        | OrderStatus::Expired
3586                        | OrderStatus::Filled
3587                        | OrderStatus::Rejected,
3588                ) {
3589                    if let Some(client_order_id) = status_report.client_order_id {
3590                        self.active_client_orders.remove(&client_order_id);
3591                        self.client_id_aliases.remove(&client_order_id);
3592                    }
3593                    if let Some(linked) = &status_report.linked_order_ids {
3594                        for child in linked {
3595                            self.client_id_aliases.remove(child);
3596                        }
3597                    }
3598                }
3599            }
3600        }
3601    }
3602
3603    /// Creates a new [`OKXFeedHandler`] instance.
3604    #[allow(clippy::too_many_arguments)]
3605    pub fn new(
3606        account_id: AccountId,
3607        instruments_cache: Arc<AHashMap<Ustr, InstrumentAny>>,
3608        reader: UnboundedReceiver<Message>,
3609        signal: Arc<AtomicBool>,
3610        inner: Arc<tokio::sync::RwLock<Option<WebSocketClient>>>,
3611        tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
3612        pending_place_requests: Arc<DashMap<String, PlaceRequestData>>,
3613        pending_cancel_requests: Arc<DashMap<String, CancelRequestData>>,
3614        pending_amend_requests: Arc<DashMap<String, AmendRequestData>>,
3615        pending_mass_cancel_requests: Arc<DashMap<String, MassCancelRequestData>>,
3616        active_client_orders: Arc<DashMap<ClientOrderId, (TraderId, StrategyId, InstrumentId)>>,
3617        client_id_aliases: Arc<DashMap<ClientOrderId, ClientOrderId>>,
3618        emitted_order_accepted: Arc<DashMap<VenueOrderId, ()>>,
3619        auth_tracker: AuthTracker,
3620        subscriptions_state: SubscriptionState,
3621    ) -> Self {
3622        Self {
3623            account_id,
3624            inner,
3625            handler: OKXFeedHandler::new(reader, signal),
3626            tx,
3627            pending_place_requests,
3628            pending_cancel_requests,
3629            pending_amend_requests,
3630            pending_mass_cancel_requests,
3631            active_client_orders,
3632            client_id_aliases,
3633            emitted_order_accepted,
3634            instruments_cache,
3635            last_account_state: None,
3636            fee_cache: AHashMap::new(),
3637            filled_qty_cache: AHashMap::new(),
3638            funding_rate_cache: AHashMap::new(),
3639            auth_tracker,
3640            pending_messages: VecDeque::new(),
3641            subscriptions_state,
3642        }
3643    }
3644
3645    fn is_stopped(&self) -> bool {
3646        self.handler
3647            .signal
3648            .load(std::sync::atomic::Ordering::Relaxed)
3649    }
3650
3651    #[allow(dead_code)]
3652    async fn run(&mut self) {
3653        while let Some(data) = self.next().await {
3654            if let Err(e) = self.tx.send(data) {
3655                tracing::error!("Error sending data: {e}");
3656                break; // Stop processing on channel error for now
3657            }
3658        }
3659    }
3660
3661    async fn next(&mut self) -> Option<NautilusWsMessage> {
3662        if let Some(message) = self.pending_messages.pop_front() {
3663            return Some(message);
3664        }
3665
3666        let clock = get_atomic_clock_realtime();
3667
3668        while let Some(event) = self.handler.next().await {
3669            let ts_init = clock.get_time_ns();
3670
3671            match event {
3672                OKXWebSocketEvent::Ping => {
3673                    self.schedule_text_pong();
3674                    continue;
3675                }
3676                OKXWebSocketEvent::Login {
3677                    code, msg, conn_id, ..
3678                } => {
3679                    if code == "0" {
3680                        self.auth_tracker.succeed();
3681                        continue;
3682                    }
3683
3684                    tracing::error!("Authentication failed: {msg}");
3685                    self.auth_tracker.fail(msg.clone());
3686
3687                    let error = OKXWebSocketError {
3688                        code,
3689                        message: msg,
3690                        conn_id: Some(conn_id),
3691                        timestamp: clock.get_time_ns().as_u64(),
3692                    };
3693                    self.pending_messages
3694                        .push_back(NautilusWsMessage::Error(error));
3695                    continue;
3696                }
3697                OKXWebSocketEvent::BookData { arg, action, data } => {
3698                    let Some(inst_id) = arg.inst_id else {
3699                        tracing::error!("Instrument ID missing for book data event");
3700                        continue;
3701                    };
3702
3703                    let Some(inst) = self.instruments_cache.get(&inst_id) else {
3704                        continue;
3705                    };
3706
3707                    let instrument_id = inst.id();
3708                    let price_precision = inst.price_precision();
3709                    let size_precision = inst.size_precision();
3710
3711                    match parse_book_msg_vec(
3712                        data,
3713                        &instrument_id,
3714                        price_precision,
3715                        size_precision,
3716                        action,
3717                        ts_init,
3718                    ) {
3719                        Ok(payloads) => return Some(NautilusWsMessage::Data(payloads)),
3720                        Err(e) => {
3721                            tracing::error!("Failed to parse book message: {e}");
3722                            continue;
3723                        }
3724                    }
3725                }
3726                OKXWebSocketEvent::OrderResponse {
3727                    id,
3728                    op,
3729                    code,
3730                    msg,
3731                    data,
3732                } => {
3733                    if code == "0" {
3734                        tracing::debug!(
3735                            "Order operation successful: id={id:?} op={op} code={code}"
3736                        );
3737
3738                        if op == OKXWsOperation::MassCancel
3739                            && let Some(request_id) = &id
3740                            && let Some((_, instrument_id)) =
3741                                self.pending_mass_cancel_requests.remove(request_id)
3742                        {
3743                            tracing::info!(
3744                                "Mass cancel operation successful for instrument: {}",
3745                                instrument_id
3746                            );
3747                        } else if op == OKXWsOperation::Order
3748                            && let Some(request_id) = &id
3749                            && let Some((
3750                                _,
3751                                (params, client_order_id, _trader_id, _strategy_id, instrument_id),
3752                            )) = self.pending_place_requests.remove(request_id)
3753                        {
3754                            let (venue_order_id, ts_accepted) = if let Some(first) = data.first() {
3755                                let ord_id = first
3756                                    .get("ordId")
3757                                    .and_then(|v| v.as_str())
3758                                    .filter(|s| !s.is_empty())
3759                                    .map(VenueOrderId::new);
3760
3761                                let ts = first
3762                                    .get("ts")
3763                                    .and_then(|v| v.as_str())
3764                                    .and_then(|s| s.parse::<u64>().ok())
3765                                    .map_or_else(
3766                                        || clock.get_time_ns(),
3767                                        |ms| UnixNanos::from(ms * 1_000_000),
3768                                    );
3769
3770                                (ord_id, ts)
3771                            } else {
3772                                (None, clock.get_time_ns())
3773                            };
3774
3775                            if let Some(instrument) = self
3776                                .instruments_cache
3777                                .get(&Ustr::from(instrument_id.symbol.as_str()))
3778                            {
3779                                match params {
3780                                    PendingOrderParams::Regular(order_params) => {
3781                                        // Check if this is an explicit quote-sized order
3782                                        let is_explicit_quote_sized = order_params
3783                                            .tgt_ccy
3784                                            .is_some_and(|tgt| tgt == OKXTargetCurrency::QuoteCcy);
3785
3786                                        // Check if this is an implicit quote-sized order:
3787                                        // SPOT market BUY in cash mode with no tgt_ccy defaults to quote-sizing
3788                                        let is_implicit_quote_sized =
3789                                            order_params.tgt_ccy.is_none()
3790                                                && order_params.side == OKXSide::Buy
3791                                                && matches!(
3792                                                    order_params.ord_type,
3793                                                    OKXOrderType::Market
3794                                                )
3795                                                && order_params.td_mode == OKXTradeMode::Cash
3796                                                && instrument.instrument_class().as_ref() == "SPOT";
3797
3798                                        if is_explicit_quote_sized || is_implicit_quote_sized {
3799                                            // For quote-sized orders, sz is in quote currency (USDT),
3800                                            // not base currency (ETH). We can't accurately parse the
3801                                            // base quantity without the fill price, so we skip the
3802                                            // synthetic OrderAccepted and rely on the orders channel
3803                                            tracing::info!(
3804                                                "Skipping synthetic OrderAccepted for {} quote-sized order: client_order_id={client_order_id}, venue_order_id={:?}",
3805                                                if is_explicit_quote_sized {
3806                                                    "explicit"
3807                                                } else {
3808                                                    "implicit"
3809                                                },
3810                                                venue_order_id
3811                                            );
3812                                            continue;
3813                                        }
3814
3815                                        let order_side = order_params.side.into();
3816                                        let order_type = order_params.ord_type.into();
3817                                        let time_in_force = match order_params.ord_type {
3818                                            OKXOrderType::Fok => TimeInForce::Fok,
3819                                            OKXOrderType::Ioc | OKXOrderType::OptimalLimitIoc => {
3820                                                TimeInForce::Ioc
3821                                            }
3822                                            _ => TimeInForce::Gtc,
3823                                        };
3824
3825                                        let size_precision = instrument.size_precision();
3826                                        let quantity = match parse_quantity(
3827                                            &order_params.sz,
3828                                            size_precision,
3829                                        ) {
3830                                            Ok(q) => q,
3831                                            Err(e) => {
3832                                                tracing::error!(
3833                                                    "Failed to parse quantity for accepted order: {e}"
3834                                                );
3835                                                continue;
3836                                            }
3837                                        };
3838
3839                                        let filled_qty = Quantity::zero(size_precision);
3840
3841                                        let mut report = OrderStatusReport::new(
3842                                            self.account_id,
3843                                            instrument_id,
3844                                            Some(client_order_id),
3845                                            venue_order_id
3846                                                .unwrap_or_else(|| VenueOrderId::new("PENDING")),
3847                                            order_side,
3848                                            order_type,
3849                                            time_in_force,
3850                                            OrderStatus::Accepted,
3851                                            quantity,
3852                                            filled_qty,
3853                                            ts_accepted,
3854                                            ts_accepted, // ts_last same as ts_accepted for new orders
3855                                            ts_init,
3856                                            None, // Generate UUID4 automatically
3857                                        );
3858
3859                                        if let Some(px) = &order_params.px
3860                                            && !px.is_empty()
3861                                            && let Ok(price) =
3862                                                parse_price(px, instrument.price_precision())
3863                                        {
3864                                            report = report.with_price(price);
3865                                        }
3866
3867                                        if let Some(true) = order_params.reduce_only {
3868                                            report = report.with_reduce_only(true);
3869                                        }
3870
3871                                        if order_type == OrderType::Limit
3872                                            && order_params.ord_type == OKXOrderType::PostOnly
3873                                        {
3874                                            report = report.with_post_only(true);
3875                                        }
3876
3877                                        if let Some(ref v_order_id) = venue_order_id {
3878                                            self.emitted_order_accepted.insert(*v_order_id, ());
3879                                        }
3880
3881                                        tracing::debug!(
3882                                            "Order accepted: client_order_id={client_order_id}, venue_order_id={:?}",
3883                                            venue_order_id
3884                                        );
3885
3886                                        return Some(NautilusWsMessage::ExecutionReports(vec![
3887                                            ExecutionReport::Order(report),
3888                                        ]));
3889                                    }
3890                                    PendingOrderParams::Algo(_) => {
3891                                        tracing::info!(
3892                                            "Algo order placement confirmed: client_order_id={client_order_id}, venue_order_id={:?}",
3893                                            venue_order_id
3894                                        );
3895                                    }
3896                                }
3897                            } else {
3898                                tracing::error!(
3899                                    "Instrument not found for accepted order: {instrument_id}"
3900                                );
3901                            }
3902                        }
3903
3904                        if let Some(first) = data.first()
3905                            && let Some(success_msg) =
3906                                first.get("sMsg").and_then(|value| value.as_str())
3907                        {
3908                            tracing::debug!("Order details: {success_msg}");
3909                        }
3910
3911                        continue;
3912                    }
3913
3914                    let error_msg = data
3915                        .first()
3916                        .and_then(|d| d.get("sMsg"))
3917                        .and_then(|s| s.as_str())
3918                        .unwrap_or(&msg)
3919                        .to_string();
3920
3921                    if let Some(first) = data.first() {
3922                        tracing::debug!(
3923                            "Error data fields: {}",
3924                            serde_json::to_string_pretty(first)
3925                                .unwrap_or_else(|_| "unable to serialize".to_string())
3926                        );
3927                    }
3928
3929                    tracing::warn!(
3930                        "Order operation failed: id={id:?} op={op} code={code} msg={error_msg}"
3931                    );
3932
3933                    if let Some(request_id) = &id {
3934                        match op {
3935                            OKXWsOperation::Order => {
3936                                if let Some((
3937                                    _,
3938                                    (
3939                                        _params,
3940                                        client_order_id,
3941                                        trader_id,
3942                                        strategy_id,
3943                                        instrument_id,
3944                                    ),
3945                                )) = self.pending_place_requests.remove(request_id)
3946                                {
3947                                    let ts_event = clock.get_time_ns();
3948                                    let due_post_only =
3949                                        is_post_only_rejection(code.as_str(), &data);
3950                                    let rejected = OrderRejected::new(
3951                                        trader_id,
3952                                        strategy_id,
3953                                        instrument_id,
3954                                        client_order_id,
3955                                        self.account_id,
3956                                        Ustr::from(error_msg.as_str()),
3957                                        UUID4::new(),
3958                                        ts_event,
3959                                        ts_init,
3960                                        false, // Not from reconciliation
3961                                        due_post_only,
3962                                    );
3963
3964                                    return Some(NautilusWsMessage::OrderRejected(rejected));
3965                                }
3966                            }
3967                            OKXWsOperation::CancelOrder => {
3968                                if let Some((
3969                                    _,
3970                                    (
3971                                        client_order_id,
3972                                        trader_id,
3973                                        strategy_id,
3974                                        instrument_id,
3975                                        venue_order_id,
3976                                    ),
3977                                )) = self.pending_cancel_requests.remove(request_id)
3978                                {
3979                                    let ts_event = clock.get_time_ns();
3980                                    let rejected = OrderCancelRejected::new(
3981                                        trader_id,
3982                                        strategy_id,
3983                                        instrument_id,
3984                                        client_order_id,
3985                                        Ustr::from(error_msg.as_str()),
3986                                        UUID4::new(),
3987                                        ts_event,
3988                                        ts_init,
3989                                        false, // Not from reconciliation
3990                                        venue_order_id,
3991                                        Some(self.account_id),
3992                                    );
3993
3994                                    return Some(NautilusWsMessage::OrderCancelRejected(rejected));
3995                                }
3996                            }
3997                            OKXWsOperation::AmendOrder => {
3998                                if let Some((
3999                                    _,
4000                                    (
4001                                        client_order_id,
4002                                        trader_id,
4003                                        strategy_id,
4004                                        instrument_id,
4005                                        venue_order_id,
4006                                    ),
4007                                )) = self.pending_amend_requests.remove(request_id)
4008                                {
4009                                    let ts_event = clock.get_time_ns();
4010                                    let rejected = OrderModifyRejected::new(
4011                                        trader_id,
4012                                        strategy_id,
4013                                        instrument_id,
4014                                        client_order_id,
4015                                        Ustr::from(error_msg.as_str()),
4016                                        UUID4::new(),
4017                                        ts_event,
4018                                        ts_init,
4019                                        false, // Not from reconciliation
4020                                        venue_order_id,
4021                                        Some(self.account_id),
4022                                    );
4023
4024                                    return Some(NautilusWsMessage::OrderModifyRejected(rejected));
4025                                }
4026                            }
4027                            OKXWsOperation::MassCancel => {
4028                                if let Some((_, instrument_id)) =
4029                                    self.pending_mass_cancel_requests.remove(request_id)
4030                                {
4031                                    tracing::error!(
4032                                        "Mass cancel operation failed for {}: code={code} msg={error_msg}",
4033                                        instrument_id
4034                                    );
4035                                    let error = OKXWebSocketError {
4036                                        code,
4037                                        message: format!(
4038                                            "Mass cancel failed for {}: {}",
4039                                            instrument_id, error_msg
4040                                        ),
4041                                        conn_id: None,
4042                                        timestamp: clock.get_time_ns().as_u64(),
4043                                    };
4044                                    return Some(NautilusWsMessage::Error(error));
4045                                } else {
4046                                    tracing::error!(
4047                                        "Mass cancel operation failed: code={code} msg={error_msg}"
4048                                    );
4049                                }
4050                            }
4051                            _ => tracing::warn!("Unhandled operation type for rejection: {op}"),
4052                        }
4053                    }
4054
4055                    let error = OKXWebSocketError {
4056                        code,
4057                        message: error_msg,
4058                        conn_id: None,
4059                        timestamp: clock.get_time_ns().as_u64(),
4060                    };
4061                    return Some(NautilusWsMessage::Error(error));
4062                }
4063                OKXWebSocketEvent::Data { arg, data } => {
4064                    let OKXWebSocketArg {
4065                        channel, inst_id, ..
4066                    } = arg;
4067
4068                    match channel {
4069                        OKXWsChannel::Account => {
4070                            match serde_json::from_value::<Vec<OKXAccount>>(data) {
4071                                Ok(accounts) => {
4072                                    if let Some(account) = accounts.first() {
4073                                        match parse_account_state(account, self.account_id, ts_init)
4074                                        {
4075                                            Ok(account_state) => {
4076                                                if let Some(last_account_state) =
4077                                                    &self.last_account_state
4078                                                    && account_state.has_same_balances_and_margins(
4079                                                        last_account_state,
4080                                                    )
4081                                                {
4082                                                    continue;
4083                                                }
4084                                                self.last_account_state =
4085                                                    Some(account_state.clone());
4086                                                return Some(NautilusWsMessage::AccountUpdate(
4087                                                    account_state,
4088                                                ));
4089                                            }
4090                                            Err(e) => tracing::error!(
4091                                                "Failed to parse account state: {e}"
4092                                            ),
4093                                        }
4094                                    }
4095                                }
4096                                Err(e) => tracing::error!("Failed to parse account data: {e}"),
4097                            }
4098                            continue;
4099                        }
4100                        OKXWsChannel::Orders => {
4101                            let orders: Vec<OKXOrderMsg> = match serde_json::from_value(data) {
4102                                Ok(orders) => orders,
4103                                Err(e) => {
4104                                    tracing::error!(
4105                                        "Failed to deserialize orders channel payload: {e}"
4106                                    );
4107                                    continue;
4108                                }
4109                            };
4110
4111                            tracing::debug!(
4112                                "Received {} order message(s) from orders channel",
4113                                orders.len()
4114                            );
4115
4116                            let mut exec_reports: Vec<ExecutionReport> =
4117                                Vec::with_capacity(orders.len());
4118
4119                            for msg in orders {
4120                                tracing::debug!(
4121                                    "Processing order message: inst_id={}, cl_ord_id={}, state={:?}, exec_type={:?}",
4122                                    msg.inst_id,
4123                                    msg.cl_ord_id,
4124                                    msg.state,
4125                                    msg.exec_type
4126                                );
4127
4128                                if self.try_handle_post_only_auto_cancel(
4129                                    &msg,
4130                                    ts_init,
4131                                    &mut exec_reports,
4132                                ) {
4133                                    continue;
4134                                }
4135
4136                                let raw_child = parse_client_order_id(&msg.cl_ord_id);
4137                                let parent_from_msg = msg
4138                                    .algo_cl_ord_id
4139                                    .as_ref()
4140                                    .filter(|value| !value.is_empty())
4141                                    .map(ClientOrderId::new);
4142                                let effective_client_id = self
4143                                    .register_client_order_aliases(&raw_child, &parent_from_msg);
4144
4145                                match parse_order_msg(
4146                                    &msg,
4147                                    self.account_id,
4148                                    &self.instruments_cache,
4149                                    &self.fee_cache,
4150                                    &self.filled_qty_cache,
4151                                    ts_init,
4152                                ) {
4153                                    Ok(report) => {
4154                                        tracing::debug!(
4155                                            "Successfully parsed execution report: {:?}",
4156                                            report
4157                                        );
4158
4159                                        // Check for duplicate OrderAccepted events
4160                                        let is_duplicate_accepted =
4161                                            if let ExecutionReport::Order(ref status_report) =
4162                                                report
4163                                            {
4164                                                if status_report.order_status
4165                                                    == OrderStatus::Accepted
4166                                                {
4167                                                    self.emitted_order_accepted
4168                                                        .contains_key(&status_report.venue_order_id)
4169                                                } else {
4170                                                    false
4171                                                }
4172                                            } else {
4173                                                false
4174                                            };
4175
4176                                        if is_duplicate_accepted {
4177                                            tracing::debug!(
4178                                                "Skipping duplicate OrderAccepted for venue_order_id={}",
4179                                                if let ExecutionReport::Order(ref r) = report {
4180                                                    r.venue_order_id.to_string()
4181                                                } else {
4182                                                    "unknown".to_string()
4183                                                }
4184                                            );
4185                                            continue;
4186                                        }
4187
4188                                        if let ExecutionReport::Order(ref status_report) = report
4189                                            && status_report.order_status == OrderStatus::Accepted
4190                                        {
4191                                            self.emitted_order_accepted
4192                                                .insert(status_report.venue_order_id, ());
4193                                        }
4194
4195                                        let adjusted = self.adjust_execution_report(
4196                                            report,
4197                                            &effective_client_id,
4198                                            &raw_child,
4199                                        );
4200
4201                                        // Clean up tracking for terminal states
4202                                        if let ExecutionReport::Order(ref status_report) = adjusted
4203                                            && matches!(
4204                                                status_report.order_status,
4205                                                OrderStatus::Filled
4206                                                    | OrderStatus::Canceled
4207                                                    | OrderStatus::Expired
4208                                                    | OrderStatus::Rejected
4209                                            )
4210                                        {
4211                                            self.emitted_order_accepted
4212                                                .remove(&status_report.venue_order_id);
4213                                        }
4214
4215                                        self.update_caches_with_report(&adjusted);
4216                                        exec_reports.push(adjusted);
4217                                    }
4218                                    Err(e) => tracing::error!("Failed to parse order message: {e}"),
4219                                }
4220                            }
4221
4222                            if !exec_reports.is_empty() {
4223                                tracing::debug!(
4224                                    "Pushing {} execution report(s) to message queue",
4225                                    exec_reports.len()
4226                                );
4227                                self.pending_messages
4228                                    .push_back(NautilusWsMessage::ExecutionReports(exec_reports));
4229                            } else {
4230                                tracing::debug!(
4231                                    "No execution reports generated from order messages"
4232                                );
4233                            }
4234
4235                            if let Some(message) = self.pending_messages.pop_front() {
4236                                return Some(message);
4237                            }
4238
4239                            continue;
4240                        }
4241                        OKXWsChannel::OrdersAlgo => {
4242                            let orders: Vec<OKXAlgoOrderMsg> = match serde_json::from_value(data) {
4243                                Ok(orders) => orders,
4244                                Err(e) => {
4245                                    tracing::error!(
4246                                        "Failed to deserialize algo orders payload: {e}"
4247                                    );
4248                                    continue;
4249                                }
4250                            };
4251
4252                            let mut exec_reports: Vec<ExecutionReport> =
4253                                Vec::with_capacity(orders.len());
4254
4255                            for msg in orders {
4256                                let raw_child = parse_client_order_id(&msg.cl_ord_id);
4257                                let parent_from_msg = parse_client_order_id(&msg.algo_cl_ord_id);
4258                                let effective_client_id = self
4259                                    .register_client_order_aliases(&raw_child, &parent_from_msg);
4260
4261                                match parse_algo_order_msg(
4262                                    msg,
4263                                    self.account_id,
4264                                    &self.instruments_cache,
4265                                    ts_init,
4266                                ) {
4267                                    Ok(report) => {
4268                                        let adjusted = self.adjust_execution_report(
4269                                            report,
4270                                            &effective_client_id,
4271                                            &raw_child,
4272                                        );
4273                                        self.update_caches_with_report(&adjusted);
4274                                        exec_reports.push(adjusted);
4275                                    }
4276                                    Err(e) => {
4277                                        tracing::error!("Failed to parse algo order message: {e}");
4278                                    }
4279                                }
4280                            }
4281
4282                            if !exec_reports.is_empty() {
4283                                return Some(NautilusWsMessage::ExecutionReports(exec_reports));
4284                            }
4285
4286                            continue;
4287                        }
4288                        _ => {
4289                            let Some(inst_id) = inst_id else {
4290                                tracing::error!("No instrument for channel {:?}", channel);
4291                                continue;
4292                            };
4293
4294                            let Some(instrument) = self.instruments_cache.get(&inst_id) else {
4295                                tracing::error!(
4296                                    "No instrument for channel {:?}, inst_id {:?}",
4297                                    channel,
4298                                    inst_id
4299                                );
4300                                continue;
4301                            };
4302
4303                            let instrument_id = instrument.id();
4304                            let price_precision = instrument.price_precision();
4305                            let size_precision = instrument.size_precision();
4306
4307                            match parse_ws_message_data(
4308                                &channel,
4309                                data,
4310                                &instrument_id,
4311                                price_precision,
4312                                size_precision,
4313                                ts_init,
4314                                &mut self.funding_rate_cache,
4315                                &self.instruments_cache,
4316                            ) {
4317                                Ok(Some(msg)) => return Some(msg),
4318                                Ok(None) => continue,
4319                                Err(e) => {
4320                                    tracing::error!(
4321                                        "Error parsing message for channel {:?}: {e}",
4322                                        channel
4323                                    );
4324                                    continue;
4325                                }
4326                            }
4327                        }
4328                    }
4329                }
4330                OKXWebSocketEvent::Error { code, msg } => {
4331                    let error = OKXWebSocketError {
4332                        code,
4333                        message: msg,
4334                        conn_id: None,
4335                        timestamp: clock.get_time_ns().as_u64(),
4336                    };
4337                    return Some(NautilusWsMessage::Error(error));
4338                }
4339                OKXWebSocketEvent::Reconnected => {
4340                    return Some(NautilusWsMessage::Reconnected);
4341                }
4342                OKXWebSocketEvent::Subscription {
4343                    event,
4344                    arg,
4345                    code,
4346                    msg,
4347                    ..
4348                } => {
4349                    let topic = topic_from_websocket_arg(&arg);
4350                    let success = code.as_deref().is_none_or(|c| c == "0");
4351
4352                    match event {
4353                        OKXSubscriptionEvent::Subscribe => {
4354                            if success {
4355                                self.subscriptions_state.confirm(&topic);
4356                            } else {
4357                                tracing::warn!(?topic, error = ?msg, code = ?code, "Subscription failed");
4358                                self.subscriptions_state.mark_failure(&topic);
4359                            }
4360                        }
4361                        OKXSubscriptionEvent::Unsubscribe => {
4362                            if success {
4363                                self.subscriptions_state.clear_pending(&topic);
4364                            } else {
4365                                tracing::warn!(?topic, error = ?msg, code = ?code, "Unsubscription failed");
4366                                self.subscriptions_state.mark_failure(&topic);
4367                            }
4368                        }
4369                    }
4370
4371                    continue;
4372                }
4373                OKXWebSocketEvent::ChannelConnCount { .. } => continue,
4374            }
4375        }
4376
4377        None
4378    }
4379}
4380
4381/// Returns `true` when an OKX error payload represents a post-only rejection.
4382pub fn is_post_only_rejection(code: &str, data: &[Value]) -> bool {
4383    if code == OKX_POST_ONLY_ERROR_CODE {
4384        return true;
4385    }
4386
4387    for entry in data {
4388        if let Some(s_code) = entry.get("sCode").and_then(|value| value.as_str())
4389            && s_code == OKX_POST_ONLY_ERROR_CODE
4390        {
4391            return true;
4392        }
4393
4394        if let Some(inner_code) = entry.get("code").and_then(|value| value.as_str())
4395            && inner_code == OKX_POST_ONLY_ERROR_CODE
4396        {
4397            return true;
4398        }
4399    }
4400
4401    false
4402}
4403
4404////////////////////////////////////////////////////////////////////////////////
4405// Tests
4406////////////////////////////////////////////////////////////////////////////////
4407
4408#[cfg(test)]
4409mod tests {
4410    use futures_util;
4411    use rstest::rstest;
4412
4413    use super::*;
4414    use crate::common::enums::{OKXExecType, OKXOrderCategory, OKXSide};
4415
4416    #[rstest]
4417    fn test_timestamp_format_for_websocket_auth() {
4418        let timestamp = SystemTime::now()
4419            .duration_since(SystemTime::UNIX_EPOCH)
4420            .expect("System time should be after UNIX epoch")
4421            .as_secs()
4422            .to_string();
4423
4424        assert!(timestamp.parse::<u64>().is_ok());
4425        assert_eq!(timestamp.len(), 10);
4426        assert!(timestamp.chars().all(|c| c.is_ascii_digit()));
4427    }
4428
4429    #[rstest]
4430    fn test_new_without_credentials() {
4431        let client = OKXWebSocketClient::default();
4432        assert!(client.credential.is_none());
4433        assert_eq!(client.api_key(), None);
4434    }
4435
4436    #[rstest]
4437    fn test_new_with_credentials() {
4438        let client = OKXWebSocketClient::new(
4439            None,
4440            Some("test_key".to_string()),
4441            Some("test_secret".to_string()),
4442            Some("test_passphrase".to_string()),
4443            None,
4444            None,
4445        )
4446        .unwrap();
4447        assert!(client.credential.is_some());
4448        assert_eq!(client.api_key(), Some("test_key"));
4449    }
4450
4451    #[rstest]
4452    fn test_new_partial_credentials_fails() {
4453        let result = OKXWebSocketClient::new(
4454            None,
4455            Some("test_key".to_string()),
4456            None,
4457            Some("test_passphrase".to_string()),
4458            None,
4459            None,
4460        );
4461        assert!(result.is_err());
4462    }
4463
4464    #[rstest]
4465    fn test_request_id_generation() {
4466        let client = OKXWebSocketClient::default();
4467
4468        let initial_counter = client.request_id_counter.load(Ordering::SeqCst);
4469
4470        let id1 = client.request_id_counter.fetch_add(1, Ordering::SeqCst);
4471        let id2 = client.request_id_counter.fetch_add(1, Ordering::SeqCst);
4472
4473        assert_eq!(id1, initial_counter);
4474        assert_eq!(id2, initial_counter + 1);
4475        assert_eq!(
4476            client.request_id_counter.load(Ordering::SeqCst),
4477            initial_counter + 2
4478        );
4479    }
4480
4481    #[rstest]
4482    fn test_client_state_management() {
4483        let client = OKXWebSocketClient::default();
4484
4485        assert!(client.is_closed());
4486        assert!(!client.is_active());
4487
4488        let client_with_heartbeat =
4489            OKXWebSocketClient::new(None, None, None, None, None, Some(30)).unwrap();
4490
4491        assert!(client_with_heartbeat.heartbeat.is_some());
4492        assert_eq!(client_with_heartbeat.heartbeat.unwrap(), 30);
4493    }
4494
4495    #[rstest]
4496    fn test_request_cache_operations() {
4497        let client = OKXWebSocketClient::default();
4498
4499        assert_eq!(client.pending_place_requests.len(), 0);
4500        assert_eq!(client.pending_cancel_requests.len(), 0);
4501        assert_eq!(client.pending_amend_requests.len(), 0);
4502
4503        let client_order_id = ClientOrderId::from("test-order-123");
4504        let trader_id = TraderId::from("test-trader-001");
4505        let strategy_id = StrategyId::from("test-strategy-001");
4506        let instrument_id = InstrumentId::from("BTC-USDT.OKX");
4507
4508        let dummy_params = WsPostOrderParamsBuilder::default()
4509            .inst_id("BTC-USDT".to_string())
4510            .td_mode(OKXTradeMode::Cash)
4511            .side(OKXSide::Buy)
4512            .ord_type(OKXOrderType::Limit)
4513            .sz("1".to_string())
4514            .build()
4515            .unwrap();
4516
4517        client.pending_place_requests.insert(
4518            "place-123".to_string(),
4519            (
4520                PendingOrderParams::Regular(dummy_params),
4521                client_order_id,
4522                trader_id,
4523                strategy_id,
4524                instrument_id,
4525            ),
4526        );
4527
4528        assert_eq!(client.pending_place_requests.len(), 1);
4529        assert!(client.pending_place_requests.contains_key("place-123"));
4530
4531        let removed = client.pending_place_requests.remove("place-123");
4532        assert!(removed.is_some());
4533        assert_eq!(client.pending_place_requests.len(), 0);
4534    }
4535
4536    #[rstest]
4537    fn test_websocket_error_handling() {
4538        let clock = get_atomic_clock_realtime();
4539        let ts = clock.get_time_ns().as_u64();
4540
4541        let error = OKXWebSocketError {
4542            code: "60012".to_string(),
4543            message: "Invalid request".to_string(),
4544            conn_id: None,
4545            timestamp: ts,
4546        };
4547
4548        assert_eq!(error.code, "60012");
4549        assert_eq!(error.message, "Invalid request");
4550        assert_eq!(error.timestamp, ts);
4551
4552        let nautilus_msg = NautilusWsMessage::Error(error);
4553        match nautilus_msg {
4554            NautilusWsMessage::Error(e) => {
4555                assert_eq!(e.code, "60012");
4556                assert_eq!(e.message, "Invalid request");
4557            }
4558            _ => panic!("Expected Error variant"),
4559        }
4560    }
4561
4562    #[rstest]
4563    fn test_request_id_generation_sequence() {
4564        let client = OKXWebSocketClient::default();
4565
4566        let initial_counter = client
4567            .request_id_counter
4568            .load(std::sync::atomic::Ordering::SeqCst);
4569        let mut ids = Vec::new();
4570        for _ in 0..10 {
4571            let id = client
4572                .request_id_counter
4573                .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
4574            ids.push(id);
4575        }
4576
4577        for (i, &id) in ids.iter().enumerate() {
4578            assert_eq!(id, initial_counter + i as u64);
4579        }
4580
4581        assert_eq!(
4582            client
4583                .request_id_counter
4584                .load(std::sync::atomic::Ordering::SeqCst),
4585            initial_counter + 10
4586        );
4587    }
4588
4589    #[rstest]
4590    fn test_client_state_transitions() {
4591        let client = OKXWebSocketClient::default();
4592
4593        assert!(client.is_closed());
4594        assert!(!client.is_active());
4595
4596        let client_with_heartbeat = OKXWebSocketClient::new(
4597            None,
4598            None,
4599            None,
4600            None,
4601            None,
4602            Some(30), // 30 second heartbeat
4603        )
4604        .unwrap();
4605
4606        assert!(client_with_heartbeat.heartbeat.is_some());
4607        assert_eq!(client_with_heartbeat.heartbeat.unwrap(), 30);
4608
4609        let account_id = AccountId::from("test-account-123");
4610        let client_with_account =
4611            OKXWebSocketClient::new(None, None, None, None, Some(account_id), None).unwrap();
4612
4613        assert_eq!(client_with_account.account_id, account_id);
4614    }
4615
4616    #[tokio::test]
4617    async fn test_concurrent_request_handling() {
4618        let client = Arc::new(OKXWebSocketClient::default());
4619
4620        let initial_counter = client
4621            .request_id_counter
4622            .load(std::sync::atomic::Ordering::SeqCst);
4623        let mut handles = Vec::new();
4624
4625        for i in 0..10 {
4626            let client_clone = Arc::clone(&client);
4627            let handle = tokio::spawn(async move {
4628                let client_order_id = ClientOrderId::from(format!("order-{i}").as_str());
4629                let trader_id = TraderId::from("trader-001");
4630                let strategy_id = StrategyId::from("strategy-001");
4631                let instrument_id = InstrumentId::from("BTC-USDT.OKX");
4632
4633                let request_id = client_clone
4634                    .request_id_counter
4635                    .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
4636                let request_id_str = request_id.to_string();
4637
4638                let dummy_params = WsPostOrderParamsBuilder::default()
4639                    .inst_id(instrument_id.symbol.to_string())
4640                    .td_mode(OKXTradeMode::Cash)
4641                    .side(OKXSide::Buy)
4642                    .ord_type(OKXOrderType::Limit)
4643                    .sz("1".to_string())
4644                    .build()
4645                    .unwrap();
4646
4647                client_clone.pending_place_requests.insert(
4648                    request_id_str.clone(),
4649                    (
4650                        PendingOrderParams::Regular(dummy_params),
4651                        client_order_id,
4652                        trader_id,
4653                        strategy_id,
4654                        instrument_id,
4655                    ),
4656                );
4657
4658                // Simulate processing delay
4659                tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
4660
4661                // Remove from cache (simulating response processing)
4662                let removed = client_clone.pending_place_requests.remove(&request_id_str);
4663                assert!(removed.is_some());
4664
4665                request_id
4666            });
4667            handles.push(handle);
4668        }
4669
4670        // Wait for all operations to complete
4671        let results: Vec<_> = futures_util::future::join_all(handles).await;
4672
4673        assert_eq!(results.len(), 10);
4674        for result in results {
4675            assert!(result.is_ok());
4676        }
4677
4678        assert_eq!(client.pending_place_requests.len(), 0);
4679
4680        let final_counter = client
4681            .request_id_counter
4682            .load(std::sync::atomic::Ordering::SeqCst);
4683        assert_eq!(final_counter, initial_counter + 10);
4684    }
4685
4686    #[rstest]
4687    fn test_websocket_error_scenarios() {
4688        let clock = get_atomic_clock_realtime();
4689        let ts = clock.get_time_ns().as_u64();
4690
4691        let error_scenarios = vec![
4692            ("60012", "Invalid request", None),
4693            ("60009", "Invalid API key", Some("conn-123".to_string())),
4694            ("60014", "Too many requests", None),
4695            ("50001", "Order not found", None),
4696        ];
4697
4698        for (code, message, conn_id) in error_scenarios {
4699            let error = OKXWebSocketError {
4700                code: code.to_string(),
4701                message: message.to_string(),
4702                conn_id: conn_id.clone(),
4703                timestamp: ts,
4704            };
4705
4706            assert_eq!(error.code, code);
4707            assert_eq!(error.message, message);
4708            assert_eq!(error.conn_id, conn_id);
4709            assert_eq!(error.timestamp, ts);
4710
4711            let nautilus_msg = NautilusWsMessage::Error(error);
4712            match nautilus_msg {
4713                NautilusWsMessage::Error(e) => {
4714                    assert_eq!(e.code, code);
4715                    assert_eq!(e.message, message);
4716                    assert_eq!(e.conn_id, conn_id);
4717                }
4718                _ => panic!("Expected Error variant"),
4719            }
4720        }
4721    }
4722
4723    #[tokio::test]
4724    async fn test_feed_handler_reconnection_detection() {
4725        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
4726        let signal = Arc::new(AtomicBool::new(false));
4727        let mut handler = OKXFeedHandler::new(rx, signal.clone());
4728
4729        tx.send(Message::Text(RECONNECTED.to_string().into()))
4730            .unwrap();
4731
4732        let result = handler.next().await;
4733        assert!(matches!(result, Some(OKXWebSocketEvent::Reconnected)));
4734    }
4735
4736    #[tokio::test]
4737    async fn test_feed_handler_normal_message_processing() {
4738        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
4739        let signal = Arc::new(AtomicBool::new(false));
4740        let mut handler = OKXFeedHandler::new(rx, signal.clone());
4741
4742        // Send a ping message (OKX sends pings)
4743        let ping_msg = TEXT_PING;
4744        tx.send(Message::Text(ping_msg.to_string().into())).unwrap();
4745
4746        // Send a valid subscription response
4747        let sub_msg = r#"{
4748            "event": "subscribe",
4749            "arg": {
4750                "channel": "tickers",
4751                "instType": "SPOT"
4752            },
4753            "connId": "a4d3ae55"
4754        }"#;
4755
4756        tx.send(Message::Text(sub_msg.to_string().into())).unwrap();
4757
4758        let first = handler.next().await;
4759        assert!(matches!(first, Some(OKXWebSocketEvent::Ping)));
4760
4761        // Now ensure we can still shut down cleanly even with a pending subscription message.
4762        signal.store(true, std::sync::atomic::Ordering::Relaxed);
4763
4764        let result = handler.next().await;
4765        assert!(result.is_none());
4766    }
4767
4768    #[tokio::test]
4769    async fn test_feed_handler_stop_signal() {
4770        let (_tx, rx) = tokio::sync::mpsc::unbounded_channel();
4771        let signal = Arc::new(AtomicBool::new(true)); // Signal already set
4772        let mut handler = OKXFeedHandler::new(rx, signal.clone());
4773
4774        let result = handler.next().await;
4775        assert!(result.is_none());
4776    }
4777
4778    #[tokio::test]
4779    async fn test_feed_handler_close_message() {
4780        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
4781        let signal = Arc::new(AtomicBool::new(false));
4782        let mut handler = OKXFeedHandler::new(rx, signal.clone());
4783
4784        // Send close message
4785        tx.send(Message::Close(None)).unwrap();
4786
4787        let result = handler.next().await;
4788        assert!(result.is_none());
4789    }
4790
4791    #[tokio::test]
4792    async fn test_reconnection_message_constant() {
4793        assert_eq!(RECONNECTED, "__RECONNECTED__");
4794    }
4795
4796    #[tokio::test]
4797    async fn test_multiple_reconnection_signals() {
4798        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
4799        let signal = Arc::new(AtomicBool::new(false));
4800        let mut handler = OKXFeedHandler::new(rx, signal.clone());
4801
4802        // Send multiple reconnection messages
4803        for _ in 0..3 {
4804            tx.send(Message::Text(RECONNECTED.to_string().into()))
4805                .unwrap();
4806
4807            let result = handler.next().await;
4808            assert!(matches!(result, Some(OKXWebSocketEvent::Reconnected)));
4809        }
4810    }
4811
4812    #[tokio::test]
4813    async fn test_wait_until_active_timeout() {
4814        let client = OKXWebSocketClient::new(
4815            None,
4816            Some("test_key".to_string()),
4817            Some("test_secret".to_string()),
4818            Some("test_passphrase".to_string()),
4819            Some(AccountId::from("test-account")),
4820            None,
4821        )
4822        .unwrap();
4823
4824        // Should timeout since client is not connected
4825        let result = client.wait_until_active(0.1).await;
4826
4827        assert!(result.is_err());
4828        assert!(!client.is_active());
4829    }
4830
4831    fn sample_canceled_order_msg() -> OKXOrderMsg {
4832        OKXOrderMsg {
4833            acc_fill_sz: Some("0".to_string()),
4834            avg_px: "0".to_string(),
4835            c_time: 0,
4836            cancel_source: None,
4837            cancel_source_reason: None,
4838            category: OKXOrderCategory::Normal,
4839            ccy: ustr::Ustr::from("USDT"),
4840            cl_ord_id: "order-1".to_string(),
4841            algo_cl_ord_id: None,
4842            fee: None,
4843            fee_ccy: ustr::Ustr::from("USDT"),
4844            fill_px: "0".to_string(),
4845            fill_sz: "0".to_string(),
4846            fill_time: 0,
4847            inst_id: ustr::Ustr::from("ETH-USDT-SWAP"),
4848            inst_type: OKXInstrumentType::Swap,
4849            lever: "1".to_string(),
4850            ord_id: ustr::Ustr::from("123456"),
4851            ord_type: OKXOrderType::Limit,
4852            pnl: "0".to_string(),
4853            pos_side: OKXPositionSide::Net,
4854            px: "0".to_string(),
4855            reduce_only: "false".to_string(),
4856            side: OKXSide::Buy,
4857            state: OKXOrderStatus::Canceled,
4858            exec_type: OKXExecType::None,
4859            sz: "1".to_string(),
4860            td_mode: OKXTradeMode::Cross,
4861            tgt_ccy: None,
4862            trade_id: String::new(),
4863            u_time: 0,
4864        }
4865    }
4866
4867    #[rstest]
4868    fn test_is_post_only_auto_cancel_detects_cancel_source() {
4869        let mut msg = sample_canceled_order_msg();
4870        msg.cancel_source = Some(super::OKX_POST_ONLY_CANCEL_SOURCE.to_string());
4871
4872        assert!(OKXWsMessageHandler::is_post_only_auto_cancel(&msg));
4873    }
4874
4875    #[rstest]
4876    fn test_is_post_only_auto_cancel_detects_reason() {
4877        let mut msg = sample_canceled_order_msg();
4878        msg.cancel_source_reason = Some("POST_ONLY would take liquidity".to_string());
4879
4880        assert!(OKXWsMessageHandler::is_post_only_auto_cancel(&msg));
4881    }
4882
4883    #[rstest]
4884    fn test_is_post_only_auto_cancel_false_without_markers() {
4885        let msg = sample_canceled_order_msg();
4886
4887        assert!(!OKXWsMessageHandler::is_post_only_auto_cancel(&msg));
4888    }
4889
4890    #[rstest]
4891    fn test_is_post_only_auto_cancel_false_for_order_type_only() {
4892        let mut msg = sample_canceled_order_msg();
4893        msg.ord_type = OKXOrderType::PostOnly;
4894
4895        assert!(!OKXWsMessageHandler::is_post_only_auto_cancel(&msg));
4896    }
4897
4898    #[rstest]
4899    fn test_is_post_only_rejection_detects_by_code() {
4900        assert!(super::is_post_only_rejection("51019", &[]));
4901    }
4902
4903    #[rstest]
4904    fn test_is_post_only_rejection_detects_by_inner_code() {
4905        let data = vec![serde_json::json!({
4906            "sCode": "51019"
4907        })];
4908        assert!(super::is_post_only_rejection("50000", &data));
4909    }
4910
4911    #[rstest]
4912    fn test_is_post_only_rejection_false_for_unrelated_error() {
4913        let data = vec![serde_json::json!({
4914            "sMsg": "Insufficient balance"
4915        })];
4916        assert!(!super::is_post_only_rejection("50000", &data));
4917    }
4918
4919    #[tokio::test]
4920    async fn test_batch_cancel_orders_with_multiple_orders() {
4921        use nautilus_model::identifiers::{ClientOrderId, InstrumentId, VenueOrderId};
4922
4923        let client = OKXWebSocketClient::new(
4924            Some("wss://test.okx.com".to_string()),
4925            None,
4926            None,
4927            None,
4928            None,
4929            None,
4930        )
4931        .expect("Failed to create client");
4932
4933        let instrument_id = InstrumentId::from("BTC-USDT.OKX");
4934        let client_order_id1 = ClientOrderId::new("order1");
4935        let client_order_id2 = ClientOrderId::new("order2");
4936        let venue_order_id1 = VenueOrderId::new("venue1");
4937        let venue_order_id2 = VenueOrderId::new("venue2");
4938
4939        let orders = vec![
4940            (instrument_id, Some(client_order_id1), Some(venue_order_id1)),
4941            (instrument_id, Some(client_order_id2), Some(venue_order_id2)),
4942        ];
4943
4944        // This will fail to send since we're not connected, but we're testing the payload building
4945        let result = client.batch_cancel_orders(orders).await;
4946
4947        // Should get an error because not connected, but it means payload was built correctly
4948        assert!(result.is_err());
4949    }
4950
4951    #[tokio::test]
4952    async fn test_batch_cancel_orders_with_only_client_order_id() {
4953        use nautilus_model::identifiers::{ClientOrderId, InstrumentId};
4954
4955        let client = OKXWebSocketClient::new(
4956            Some("wss://test.okx.com".to_string()),
4957            None,
4958            None,
4959            None,
4960            None,
4961            None,
4962        )
4963        .expect("Failed to create client");
4964
4965        let instrument_id = InstrumentId::from("BTC-USDT.OKX");
4966        let client_order_id = ClientOrderId::new("order1");
4967
4968        let orders = vec![(instrument_id, Some(client_order_id), None)];
4969
4970        let result = client.batch_cancel_orders(orders).await;
4971
4972        // Should get an error because not connected
4973        assert!(result.is_err());
4974    }
4975
4976    #[tokio::test]
4977    async fn test_batch_cancel_orders_with_only_venue_order_id() {
4978        use nautilus_model::identifiers::{InstrumentId, VenueOrderId};
4979
4980        let client = OKXWebSocketClient::new(
4981            Some("wss://test.okx.com".to_string()),
4982            None,
4983            None,
4984            None,
4985            None,
4986            None,
4987        )
4988        .expect("Failed to create client");
4989
4990        let instrument_id = InstrumentId::from("BTC-USDT.OKX");
4991        let venue_order_id = VenueOrderId::new("venue1");
4992
4993        let orders = vec![(instrument_id, None, Some(venue_order_id))];
4994
4995        let result = client.batch_cancel_orders(orders).await;
4996
4997        // Should get an error because not connected
4998        assert!(result.is_err());
4999    }
5000
5001    #[tokio::test]
5002    async fn test_batch_cancel_orders_with_both_ids() {
5003        use nautilus_model::identifiers::{ClientOrderId, InstrumentId, VenueOrderId};
5004
5005        let client = OKXWebSocketClient::new(
5006            Some("wss://test.okx.com".to_string()),
5007            None,
5008            None,
5009            None,
5010            None,
5011            None,
5012        )
5013        .expect("Failed to create client");
5014
5015        let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
5016        let client_order_id = ClientOrderId::new("order1");
5017        let venue_order_id = VenueOrderId::new("venue1");
5018
5019        let orders = vec![(instrument_id, Some(client_order_id), Some(venue_order_id))];
5020
5021        let result = client.batch_cancel_orders(orders).await;
5022
5023        // Should get an error because not connected
5024        assert!(result.is_err());
5025    }
5026}