nautilus_okx/websocket/
handler.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! WebSocket message handler for OKX.
17//!
18//! The handler runs in a dedicated Tokio task as the I/O boundary between the client
19//! orchestrator and the network layer. It exclusively owns the `WebSocketClient` and
20//! processes commands from the client via an unbounded channel, serializing them to JSON
21//! and sending via the WebSocket. Raw messages are received from the network, deserialized,
22//! and transformed into `NautilusWsMessage` events which are emitted back to the client.
23//!
24//! Key responsibilities:
25//! - Command processing: Receives `HandlerCommand` from client, executes WebSocket operations.
26//! - Message transformation: Parses raw venue messages into Nautilus domain events.
27//! - Pending state tracking: Owns `AHashMap` for matching requests/responses (single-threaded).
28//! - Retry logic: Retries transient WebSocket send failures using `RetryManager`.
29//! - Error event emission: Emits `OrderRejected`, `OrderCancelRejected` when retries exhausted.
30
31use std::{
32    collections::VecDeque,
33    sync::{
34        Arc,
35        atomic::{AtomicBool, AtomicU64, Ordering},
36    },
37};
38
39use ahash::AHashMap;
40use dashmap::DashMap;
41use nautilus_core::{AtomicTime, UUID4, nanos::UnixNanos, time::get_atomic_clock_realtime};
42use nautilus_model::{
43    enums::{OrderStatus, OrderType, TimeInForce},
44    events::{AccountState, OrderCancelRejected, OrderModifyRejected, OrderRejected},
45    identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
46    instruments::{Instrument, InstrumentAny},
47    reports::OrderStatusReport,
48    types::{Money, Quantity},
49};
50use nautilus_network::{
51    RECONNECTED,
52    retry::{RetryManager, create_websocket_retry_manager},
53    websocket::{AuthTracker, SubscriptionState, TEXT_PING, TEXT_PONG, WebSocketClient},
54};
55use serde_json::Value;
56use tokio_tungstenite::tungstenite::Message;
57use ustr::Ustr;
58
59use super::{
60    enums::{OKXSubscriptionEvent, OKXWsChannel, OKXWsOperation},
61    error::OKXWsError,
62    messages::{
63        ExecutionReport, NautilusWsMessage, OKXAlgoOrderMsg, OKXBookMsg, OKXOrderMsg,
64        OKXSubscription, OKXSubscriptionArg, OKXWebSocketArg, OKXWebSocketError, OKXWsMessage,
65        OKXWsRequest, WsAmendOrderParams, WsCancelAlgoOrderParamsBuilder,
66        WsCancelOrderParamsBuilder, WsMassCancelParams, WsPostAlgoOrderParams, WsPostOrderParams,
67    },
68    parse::{parse_algo_order_msg, parse_book_msg_vec, parse_order_msg, parse_ws_message_data},
69    subscription::topic_from_websocket_arg,
70};
71use crate::{
72    common::{
73        consts::{
74            OKX_POST_ONLY_CANCEL_REASON, OKX_POST_ONLY_CANCEL_SOURCE, OKX_POST_ONLY_ERROR_CODE,
75            should_retry_error_code,
76        },
77        enums::{
78            OKXBookAction, OKXInstrumentType, OKXOrderStatus, OKXOrderType, OKXSide,
79            OKXTargetCurrency, OKXTradeMode,
80        },
81        parse::{
82            determine_order_type, okx_instrument_type, parse_account_state, parse_client_order_id,
83            parse_millisecond_timestamp, parse_position_status_report, parse_price, parse_quantity,
84        },
85    },
86    http::models::{OKXAccount, OKXPosition},
87    websocket::client::{
88        OKX_RATE_LIMIT_KEY_AMEND, OKX_RATE_LIMIT_KEY_CANCEL, OKX_RATE_LIMIT_KEY_ORDER,
89        OKX_RATE_LIMIT_KEY_SUBSCRIPTION,
90    },
91};
92
93/// Data cached for pending place requests to correlate with responses.
94type PlaceRequestData = (
95    PendingOrderParams,
96    ClientOrderId,
97    TraderId,
98    StrategyId,
99    InstrumentId,
100);
101
102/// Data cached for pending cancel requests to correlate with responses.
103type CancelRequestData = (
104    ClientOrderId,
105    TraderId,
106    StrategyId,
107    InstrumentId,
108    Option<VenueOrderId>,
109);
110
111/// Data cached for pending amend requests to correlate with responses.
112type AmendRequestData = (
113    ClientOrderId,
114    TraderId,
115    StrategyId,
116    InstrumentId,
117    Option<VenueOrderId>,
118);
119
120#[derive(Debug)]
121pub enum PendingOrderParams {
122    Regular(WsPostOrderParams),
123    Algo(WsPostAlgoOrderParams),
124}
125
126/// Commands sent from the outer client to the inner message handler.
127#[allow(
128    clippy::large_enum_variant,
129    reason = "Commands are ephemeral and immediately consumed"
130)]
131#[allow(missing_debug_implementations)]
132pub enum HandlerCommand {
133    SetClient(WebSocketClient),
134    Disconnect,
135    Authenticate {
136        payload: String,
137    },
138    InitializeInstruments(Vec<InstrumentAny>),
139    UpdateInstrument(InstrumentAny),
140    Subscribe {
141        args: Vec<OKXSubscriptionArg>,
142    },
143    Unsubscribe {
144        args: Vec<OKXSubscriptionArg>,
145    },
146    PlaceOrder {
147        params: WsPostOrderParams,
148        client_order_id: ClientOrderId,
149        trader_id: TraderId,
150        strategy_id: StrategyId,
151        instrument_id: InstrumentId,
152    },
153    PlaceAlgoOrder {
154        params: WsPostAlgoOrderParams,
155        client_order_id: ClientOrderId,
156        trader_id: TraderId,
157        strategy_id: StrategyId,
158        instrument_id: InstrumentId,
159    },
160    AmendOrder {
161        params: WsAmendOrderParams,
162        client_order_id: ClientOrderId,
163        trader_id: TraderId,
164        strategy_id: StrategyId,
165        instrument_id: InstrumentId,
166        venue_order_id: Option<VenueOrderId>,
167    },
168    CancelOrder {
169        client_order_id: Option<ClientOrderId>,
170        venue_order_id: Option<VenueOrderId>,
171        instrument_id: InstrumentId,
172        trader_id: TraderId,
173        strategy_id: StrategyId,
174    },
175    CancelAlgoOrder {
176        client_order_id: Option<ClientOrderId>,
177        algo_order_id: Option<VenueOrderId>,
178        instrument_id: InstrumentId,
179        trader_id: TraderId,
180        strategy_id: StrategyId,
181    },
182    MassCancel {
183        instrument_id: InstrumentId,
184    },
185    BatchPlaceOrders {
186        args: Vec<Value>,
187        request_id: String,
188    },
189    BatchAmendOrders {
190        args: Vec<Value>,
191        request_id: String,
192    },
193    BatchCancelOrders {
194        args: Vec<Value>,
195        request_id: String,
196    },
197}
198
199pub(super) struct OKXWsFeedHandler {
200    clock: &'static AtomicTime,
201    account_id: AccountId,
202    signal: Arc<AtomicBool>,
203    inner: Option<WebSocketClient>,
204    cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
205    raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
206    out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
207    auth_tracker: AuthTracker,
208    subscriptions_state: SubscriptionState,
209    retry_manager: RetryManager<OKXWsError>,
210    pending_place_requests: AHashMap<String, PlaceRequestData>,
211    pending_cancel_requests: AHashMap<String, CancelRequestData>,
212    pending_amend_requests: AHashMap<String, AmendRequestData>,
213    pending_mass_cancel_requests: AHashMap<String, InstrumentId>,
214    pending_messages: VecDeque<NautilusWsMessage>,
215    active_client_orders: Arc<DashMap<ClientOrderId, (TraderId, StrategyId, InstrumentId)>>,
216    client_id_aliases: Arc<DashMap<ClientOrderId, ClientOrderId>>,
217    emitted_order_accepted: Arc<DashMap<VenueOrderId, ()>>,
218    instruments_cache: AHashMap<Ustr, InstrumentAny>,
219    fee_cache: AHashMap<Ustr, Money>,           // Key is order ID
220    filled_qty_cache: AHashMap<Ustr, Quantity>, // Key is order ID
221    funding_rate_cache: AHashMap<Ustr, (Ustr, u64)>, // Cache (funding_rate, funding_time) by inst_id
222    last_account_state: Option<AccountState>,
223    request_id_counter: AtomicU64,
224}
225
226impl OKXWsFeedHandler {
227    /// Creates a new [`OKXWsFeedHandler`] instance.
228    #[allow(clippy::too_many_arguments)]
229    pub fn new(
230        account_id: AccountId,
231        signal: Arc<AtomicBool>,
232        cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
233        raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
234        out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
235        active_client_orders: Arc<DashMap<ClientOrderId, (TraderId, StrategyId, InstrumentId)>>,
236        client_id_aliases: Arc<DashMap<ClientOrderId, ClientOrderId>>,
237        emitted_order_accepted: Arc<DashMap<VenueOrderId, ()>>,
238        auth_tracker: AuthTracker,
239        subscriptions_state: SubscriptionState,
240    ) -> Self {
241        Self {
242            clock: get_atomic_clock_realtime(),
243            account_id,
244            signal,
245            inner: None,
246            cmd_rx,
247            raw_rx,
248            out_tx,
249            auth_tracker,
250            subscriptions_state,
251            retry_manager: create_websocket_retry_manager(),
252            pending_place_requests: AHashMap::new(),
253            pending_cancel_requests: AHashMap::new(),
254            pending_amend_requests: AHashMap::new(),
255            pending_mass_cancel_requests: AHashMap::new(),
256            pending_messages: VecDeque::new(),
257            active_client_orders,
258            client_id_aliases,
259            emitted_order_accepted,
260            instruments_cache: AHashMap::new(),
261            fee_cache: AHashMap::new(),
262            filled_qty_cache: AHashMap::new(),
263            funding_rate_cache: AHashMap::new(),
264            last_account_state: None,
265            request_id_counter: AtomicU64::new(0),
266        }
267    }
268
269    pub(super) fn is_stopped(&self) -> bool {
270        self.signal.load(std::sync::atomic::Ordering::Relaxed)
271    }
272
273    pub(super) fn send(&self, msg: NautilusWsMessage) -> Result<(), ()> {
274        self.out_tx.send(msg).map_err(|_| ())
275    }
276
277    /// Sends a WebSocket message with retry logic.
278    async fn send_with_retry(
279        &self,
280        payload: String,
281        rate_limit_keys: Option<Vec<String>>,
282    ) -> Result<(), OKXWsError> {
283        if let Some(client) = &self.inner {
284            self.retry_manager
285                .execute_with_retry(
286                    "websocket_send",
287                    || {
288                        let payload = payload.clone();
289                        let keys = rate_limit_keys.clone();
290                        async move {
291                            client
292                                .send_text(payload, keys)
293                                .await
294                                .map_err(|e| OKXWsError::ClientError(format!("Send failed: {e}")))
295                        }
296                    },
297                    should_retry_okx_error,
298                    create_okx_timeout_error,
299                )
300                .await
301        } else {
302            Err(OKXWsError::ClientError(
303                "No active WebSocket client".to_string(),
304            ))
305        }
306    }
307
308    /// Sends a pong response to OKX.
309    pub(super) async fn send_pong(&self) -> anyhow::Result<()> {
310        match self.send_with_retry(TEXT_PONG.to_string(), None).await {
311            Ok(()) => {
312                tracing::trace!("Sent pong response to OKX text ping");
313                Ok(())
314            }
315            Err(e) => {
316                tracing::warn!(error = %e, "Failed to send pong after retries");
317                Err(anyhow::anyhow!("Failed to send pong: {e}"))
318            }
319        }
320    }
321
322    pub(super) async fn next(&mut self) -> Option<NautilusWsMessage> {
323        if let Some(message) = self.pending_messages.pop_front() {
324            return Some(message);
325        }
326
327        loop {
328            tokio::select! {
329                Some(cmd) = self.cmd_rx.recv() => {
330                    match cmd {
331                        HandlerCommand::SetClient(client) => {
332                            tracing::info!("Handler received WebSocket client");
333                            self.inner = Some(client);
334                        }
335                        HandlerCommand::Disconnect => {
336                            tracing::info!("Handler disconnecting WebSocket client");
337                            self.inner = None;
338                        }
339                        HandlerCommand::Authenticate { payload } => {
340                            if let Err(e) = self.send_with_retry(payload, Some(vec![OKX_RATE_LIMIT_KEY_SUBSCRIPTION.to_string()])).await {
341                                tracing::error!(error = %e, "Failed to send authentication message after retries");
342                            }
343                        }
344                        HandlerCommand::InitializeInstruments(instruments) => {
345                            for inst in instruments {
346                                self.instruments_cache.insert(inst.symbol().inner(), inst);
347                            }
348                        }
349                        HandlerCommand::UpdateInstrument(inst) => {
350                            self.instruments_cache.insert(inst.symbol().inner(), inst);
351                        }
352                        HandlerCommand::Subscribe { args } => {
353                            if let Err(e) = self.handle_subscribe(args).await {
354                                tracing::error!(error = %e, "Failed to handle subscribe command");
355                            }
356                        }
357                        HandlerCommand::Unsubscribe { args } => {
358                            if let Err(e) = self.handle_unsubscribe(args).await {
359                                tracing::error!(error = %e, "Failed to handle unsubscribe command");
360                            }
361                        }
362                        HandlerCommand::CancelOrder {
363                            client_order_id,
364                            venue_order_id,
365                            instrument_id,
366                            trader_id,
367                            strategy_id,
368                        } => {
369                            if let Err(e) = self
370                                .handle_cancel_order(
371                                    client_order_id,
372                                    venue_order_id,
373                                    instrument_id,
374                                    trader_id,
375                                    strategy_id,
376                                )
377                                .await
378                            {
379                                tracing::error!(error = %e, "Failed to handle cancel order command");
380                            }
381                        }
382                        HandlerCommand::CancelAlgoOrder {
383                            client_order_id,
384                            algo_order_id,
385                            instrument_id,
386                            trader_id,
387                            strategy_id,
388                        } => {
389                            if let Err(e) = self
390                                .handle_cancel_algo_order(
391                                    client_order_id,
392                                    algo_order_id,
393                                    instrument_id,
394                                    trader_id,
395                                    strategy_id,
396                                )
397                                .await
398                            {
399                                tracing::error!(error = %e, "Failed to handle cancel algo order command");
400                            }
401                        }
402                        HandlerCommand::PlaceOrder {
403                            params,
404                            client_order_id,
405                            trader_id,
406                            strategy_id,
407                            instrument_id,
408                        } => {
409                            if let Err(e) = self
410                                .handle_place_order(
411                                    params,
412                                    client_order_id,
413                                    trader_id,
414                                    strategy_id,
415                                    instrument_id,
416                                )
417                                .await
418                            {
419                                tracing::error!(error = %e, "Failed to handle place order command");
420                            }
421                        }
422                        HandlerCommand::PlaceAlgoOrder {
423                            params,
424                            client_order_id,
425                            trader_id,
426                            strategy_id,
427                            instrument_id,
428                        } => {
429                            if let Err(e) = self
430                                .handle_place_algo_order(
431                                    params,
432                                    client_order_id,
433                                    trader_id,
434                                    strategy_id,
435                                    instrument_id,
436                                )
437                                .await
438                            {
439                                tracing::error!(error = %e, "Failed to handle place algo order command");
440                            }
441                        }
442                        HandlerCommand::AmendOrder {
443                            params,
444                            client_order_id,
445                            trader_id,
446                            strategy_id,
447                            instrument_id,
448                            venue_order_id,
449                        } => {
450                            if let Err(e) = self
451                                .handle_amend_order(
452                                    params,
453                                    client_order_id,
454                                    trader_id,
455                                    strategy_id,
456                                    instrument_id,
457                                    venue_order_id,
458                                )
459                                .await
460                            {
461                                tracing::error!(error = %e, "Failed to handle amend order command");
462                            }
463                        }
464                        HandlerCommand::MassCancel { instrument_id } => {
465                            if let Err(e) = self.handle_mass_cancel(instrument_id).await {
466                                tracing::error!(error = %e, "Failed to handle mass cancel command");
467                            }
468                        }
469                        HandlerCommand::BatchCancelOrders { args, request_id } => {
470                            if let Err(e) = self.handle_batch_cancel_orders(args, request_id).await {
471                                tracing::error!(error = %e, "Failed to handle batch cancel orders command");
472                            }
473                        }
474                        HandlerCommand::BatchPlaceOrders { args, request_id } => {
475                            if let Err(e) = self.handle_batch_place_orders(args, request_id).await {
476                                tracing::error!(error = %e, "Failed to handle batch place orders command");
477                            }
478                        }
479                        HandlerCommand::BatchAmendOrders { args, request_id } => {
480                            if let Err(e) = self.handle_batch_amend_orders(args, request_id).await {
481                                tracing::error!(error = %e, "Failed to handle batch amend orders command");
482                            }
483                        }
484                    }
485                    // Continue processing following command
486                    continue;
487                }
488
489                _ = tokio::time::sleep(std::time::Duration::from_millis(100)) => {
490                    if self.signal.load(std::sync::atomic::Ordering::Relaxed) {
491                        tracing::debug!("Stop signal received during idle period");
492                        return None;
493                    }
494                    continue;
495                }
496
497                msg = self.raw_rx.recv() => {
498                    let event = match msg {
499                        Some(msg) => match Self::parse_raw_message(msg) {
500                            Some(event) => event,
501                            None => continue,
502                        },
503                        None => {
504                            tracing::debug!("WebSocket stream closed");
505                            return None;
506                        }
507                    };
508
509                    let ts_init = self.clock.get_time_ns();
510
511            match event {
512                OKXWsMessage::Ping => {
513                    if let Err(e) = self.send_pong().await {
514                        tracing::warn!(error = %e, "Failed to send pong response");
515                    }
516                    continue;
517                }
518                OKXWsMessage::Login {
519                    code, msg, conn_id, ..
520                } => {
521                    if code == "0" {
522                        self.auth_tracker.succeed();
523
524                        // Must return immediately to deliver Authenticated message.
525                        // Using push_back() + continue blocks the select! loop and prevents
526                        // the spawn block from receiving this event, breaking reconnection flow.
527                        return Some(NautilusWsMessage::Authenticated);
528                    }
529
530                    tracing::error!(error = %msg, "WebSocket authentication failed");
531                    self.auth_tracker.fail(msg.clone());
532
533                    let error = OKXWebSocketError {
534                        code,
535                        message: msg,
536                        conn_id: Some(conn_id),
537                        timestamp: self.clock.get_time_ns().as_u64(),
538                    };
539                    self.pending_messages
540                        .push_back(NautilusWsMessage::Error(error));
541                    continue;
542                }
543                OKXWsMessage::BookData { arg, action, data } => {
544                    if let Some(msg) = self.handle_book_data(arg, action, data, ts_init) {
545                        return Some(msg);
546                    }
547                    continue;
548                }
549                OKXWsMessage::OrderResponse {
550                    id,
551                    op,
552                    code,
553                    msg,
554                    data,
555                } => {
556                    if let Some(msg) = self.handle_order_response(id, op, code, msg, data, ts_init) {
557                        return Some(msg);
558                    }
559                    continue;
560                }
561                OKXWsMessage::Data { arg, data } => {
562                    let OKXWebSocketArg {
563                        channel, inst_id, ..
564                    } = arg;
565
566                    match channel {
567                        OKXWsChannel::Account => {
568                            if let Some(msg) = self.handle_account_data(data, ts_init) {
569                                return Some(msg);
570                            }
571                            continue;
572                        }
573                        OKXWsChannel::Positions => {
574                            self.handle_positions_data(data, ts_init);
575                            continue;
576                        }
577                        OKXWsChannel::Orders => {
578                            if let Some(msg) = self.handle_orders_data(data, ts_init) {
579                                return Some(msg);
580                            }
581                            continue;
582                        }
583                        OKXWsChannel::OrdersAlgo => {
584                            if let Some(msg) = self.handle_algo_orders_data(data, ts_init) {
585                                return Some(msg);
586                            }
587                            continue;
588                        }
589                        _ => {
590                            if let Some(msg) =
591                                self.handle_other_channel_data(channel, inst_id, data, ts_init)
592                            {
593                                return Some(msg);
594                            }
595                            continue;
596                        }
597                    }
598                }
599                OKXWsMessage::Error { code, msg } => {
600                    let error = OKXWebSocketError {
601                        code,
602                        message: msg,
603                        conn_id: None,
604                        timestamp: self.clock.get_time_ns().as_u64(),
605                    };
606                    return Some(NautilusWsMessage::Error(error));
607                }
608                OKXWsMessage::Reconnected => {
609                    return Some(NautilusWsMessage::Reconnected);
610                }
611                OKXWsMessage::Subscription {
612                    event,
613                    arg,
614                    code,
615                    msg,
616                    ..
617                } => {
618                    let topic = topic_from_websocket_arg(&arg);
619                    let success = code.as_deref().is_none_or(|c| c == "0");
620
621                    match event {
622                        OKXSubscriptionEvent::Subscribe => {
623                            if success {
624                                self.subscriptions_state.confirm_subscribe(&topic);
625                            } else {
626                                tracing::warn!(?topic, error = ?msg, code = ?code, "Subscription failed");
627                                self.subscriptions_state.mark_failure(&topic);
628                            }
629                        }
630                        OKXSubscriptionEvent::Unsubscribe => {
631                            if success {
632                                self.subscriptions_state.confirm_unsubscribe(&topic);
633                            } else {
634                                tracing::warn!(?topic, error = ?msg, code = ?code, "Unsubscription failed - restoring subscription");
635                                // Venue rejected unsubscribe, so we're still subscribed. Restore state:
636                                self.subscriptions_state.confirm_unsubscribe(&topic); // Clear pending_unsubscribe
637                                self.subscriptions_state.mark_subscribe(&topic);      // Mark as subscribing
638                                self.subscriptions_state.confirm_subscribe(&topic);   // Confirm subscription
639                            }
640                        }
641                    }
642
643                    continue;
644                }
645                OKXWsMessage::ChannelConnCount { .. } => continue,
646            }
647                }
648
649                // Handle shutdown - either channel closed or stream ended
650                else => {
651                    tracing::debug!("Handler shutting down: stream ended or command channel closed");
652                    return None;
653                }
654            }
655        }
656    }
657
658    pub(super) fn is_post_only_auto_cancel(msg: &OKXOrderMsg) -> bool {
659        if msg.state != OKXOrderStatus::Canceled {
660            return false;
661        }
662
663        let cancel_source_matches = matches!(
664            msg.cancel_source.as_deref(),
665            Some(source) if source == OKX_POST_ONLY_CANCEL_SOURCE
666        );
667
668        let reason_matches = matches!(
669            msg.cancel_source_reason.as_deref(),
670            Some(reason) if reason.contains("POST_ONLY")
671        );
672
673        if !(cancel_source_matches || reason_matches) {
674            return false;
675        }
676
677        msg.acc_fill_sz
678            .as_ref()
679            .is_none_or(|filled| filled == "0" || filled.is_empty())
680    }
681
682    fn try_handle_post_only_auto_cancel(
683        &mut self,
684        msg: &OKXOrderMsg,
685        ts_init: UnixNanos,
686        exec_reports: &mut Vec<ExecutionReport>,
687    ) -> bool {
688        if !Self::is_post_only_auto_cancel(msg) {
689            return false;
690        }
691
692        let Some(client_order_id) = parse_client_order_id(&msg.cl_ord_id) else {
693            return false;
694        };
695
696        let Some((_, (trader_id, strategy_id, instrument_id))) =
697            self.active_client_orders.remove(&client_order_id)
698        else {
699            return false;
700        };
701
702        self.client_id_aliases.remove(&client_order_id);
703
704        if !exec_reports.is_empty() {
705            let reports = std::mem::take(exec_reports);
706            self.pending_messages
707                .push_back(NautilusWsMessage::ExecutionReports(reports));
708        }
709
710        let reason = msg
711            .cancel_source_reason
712            .as_ref()
713            .filter(|reason| !reason.is_empty())
714            .map_or_else(
715                || Ustr::from(OKX_POST_ONLY_CANCEL_REASON),
716                |reason| Ustr::from(reason.as_str()),
717            );
718
719        let ts_event = parse_millisecond_timestamp(msg.u_time);
720        let rejected = OrderRejected::new(
721            trader_id,
722            strategy_id,
723            instrument_id,
724            client_order_id,
725            self.account_id,
726            reason,
727            UUID4::new(),
728            ts_event,
729            ts_init,
730            false,
731            true,
732        );
733
734        self.pending_messages
735            .push_back(NautilusWsMessage::OrderRejected(rejected));
736
737        true
738    }
739
740    fn register_client_order_aliases(
741        &self,
742        raw_child: &Option<ClientOrderId>,
743        parent_from_msg: &Option<ClientOrderId>,
744    ) -> Option<ClientOrderId> {
745        if let Some(parent) = parent_from_msg {
746            self.client_id_aliases.insert(*parent, *parent);
747            if let Some(child) = raw_child.as_ref().filter(|child| **child != *parent) {
748                self.client_id_aliases.insert(*child, *parent);
749            }
750            Some(*parent)
751        } else if let Some(child) = raw_child.as_ref() {
752            if let Some(mapped) = self.client_id_aliases.get(child) {
753                Some(*mapped.value())
754            } else {
755                self.client_id_aliases.insert(*child, *child);
756                Some(*child)
757            }
758        } else {
759            None
760        }
761    }
762
763    fn adjust_execution_report(
764        &self,
765        report: ExecutionReport,
766        effective_client_id: &Option<ClientOrderId>,
767        raw_child: &Option<ClientOrderId>,
768    ) -> ExecutionReport {
769        match report {
770            ExecutionReport::Order(status_report) => {
771                let mut adjusted = status_report;
772                let mut final_id = *effective_client_id;
773
774                if final_id.is_none() {
775                    final_id = adjusted.client_order_id;
776                }
777
778                if final_id.is_none()
779                    && let Some(child) = raw_child.as_ref()
780                    && let Some(mapped) = self.client_id_aliases.get(child)
781                {
782                    final_id = Some(*mapped.value());
783                }
784
785                if let Some(final_id_value) = final_id {
786                    if adjusted.client_order_id != Some(final_id_value) {
787                        adjusted = adjusted.with_client_order_id(final_id_value);
788                    }
789                    self.client_id_aliases
790                        .insert(final_id_value, final_id_value);
791
792                    if let Some(child) =
793                        raw_child.as_ref().filter(|child| **child != final_id_value)
794                    {
795                        adjusted = adjusted.with_linked_order_ids(vec![*child]);
796                    }
797                }
798
799                ExecutionReport::Order(adjusted)
800            }
801            ExecutionReport::Fill(mut fill_report) => {
802                let mut final_id = *effective_client_id;
803                if final_id.is_none() {
804                    final_id = fill_report.client_order_id;
805                }
806                if final_id.is_none()
807                    && let Some(child) = raw_child.as_ref()
808                    && let Some(mapped) = self.client_id_aliases.get(child)
809                {
810                    final_id = Some(*mapped.value());
811                }
812
813                if let Some(final_id_value) = final_id {
814                    fill_report.client_order_id = Some(final_id_value);
815                    self.client_id_aliases
816                        .insert(final_id_value, final_id_value);
817                }
818
819                ExecutionReport::Fill(fill_report)
820            }
821        }
822    }
823
824    fn update_caches_with_report(&mut self, report: &ExecutionReport) {
825        match report {
826            ExecutionReport::Fill(fill_report) => {
827                let order_id = fill_report.venue_order_id.inner();
828                let current_fee = self
829                    .fee_cache
830                    .get(&order_id)
831                    .copied()
832                    .unwrap_or_else(|| Money::new(0.0, fill_report.commission.currency));
833                let total_fee = current_fee + fill_report.commission;
834                self.fee_cache.insert(order_id, total_fee);
835
836                let current_filled_qty = self
837                    .filled_qty_cache
838                    .get(&order_id)
839                    .copied()
840                    .unwrap_or_else(|| Quantity::zero(fill_report.last_qty.precision));
841                let total_filled_qty = current_filled_qty + fill_report.last_qty;
842                self.filled_qty_cache.insert(order_id, total_filled_qty);
843            }
844            ExecutionReport::Order(status_report) => {
845                if matches!(status_report.order_status, OrderStatus::Filled) {
846                    self.fee_cache.remove(&status_report.venue_order_id.inner());
847                    self.filled_qty_cache
848                        .remove(&status_report.venue_order_id.inner());
849                }
850
851                if matches!(
852                    status_report.order_status,
853                    OrderStatus::Canceled
854                        | OrderStatus::Expired
855                        | OrderStatus::Filled
856                        | OrderStatus::Rejected,
857                ) {
858                    if let Some(client_order_id) = status_report.client_order_id {
859                        self.active_client_orders.remove(&client_order_id);
860                        self.client_id_aliases.remove(&client_order_id);
861                    }
862                    if let Some(linked) = &status_report.linked_order_ids {
863                        for child in linked {
864                            self.client_id_aliases.remove(child);
865                        }
866                    }
867                }
868            }
869        }
870    }
871
872    #[allow(clippy::too_many_lines)]
873    fn handle_order_response(
874        &mut self,
875        id: Option<String>,
876        op: OKXWsOperation,
877        code: String,
878        msg: String,
879        data: Vec<Value>,
880        ts_init: UnixNanos,
881    ) -> Option<NautilusWsMessage> {
882        if code == "0" {
883            tracing::debug!("Order operation successful: id={id:?} op={op} code={code}");
884
885            if op == OKXWsOperation::BatchCancelOrders {
886                tracing::debug!(
887                    "Batch cancel operation successful: id={id:?} cancelled_count={}",
888                    data.len()
889                );
890
891                // Check for per-order errors even when top-level code is "0"
892                for (idx, entry) in data.iter().enumerate() {
893                    if let Some(entry_code) = entry.get("sCode").and_then(|v| v.as_str())
894                        && entry_code != "0"
895                    {
896                        let entry_msg = entry
897                            .get("sMsg")
898                            .and_then(|v| v.as_str())
899                            .unwrap_or("Unknown error");
900
901                        if let Some(cl_ord_id_str) = entry
902                            .get("clOrdId")
903                            .and_then(|v| v.as_str())
904                            .filter(|s| !s.is_empty())
905                        {
906                            tracing::error!(
907                                "Batch cancel partial failure for order {}: sCode={} sMsg={}",
908                                cl_ord_id_str,
909                                entry_code,
910                                entry_msg
911                            );
912                            // TODO: Emit OrderCancelRejected for this specific order
913                        } else {
914                            tracing::error!(
915                                "Batch cancel entry[{}] failed: sCode={} sMsg={} data={:?}",
916                                idx,
917                                entry_code,
918                                entry_msg,
919                                entry
920                            );
921                        }
922                    }
923                }
924
925                return None;
926            } else if op == OKXWsOperation::MassCancel
927                && let Some(request_id) = &id
928                && let Some(instrument_id) = self.pending_mass_cancel_requests.remove(request_id)
929            {
930                tracing::info!(
931                    "Mass cancel operation successful for instrument: {}",
932                    instrument_id
933                );
934            } else if op == OKXWsOperation::Order
935                && let Some(request_id) = &id
936                && let Some((params, client_order_id, _trader_id, _strategy_id, instrument_id)) =
937                    self.pending_place_requests.remove(request_id)
938            {
939                let (venue_order_id, ts_accepted) = if let Some(first) = data.first() {
940                    let ord_id = first
941                        .get("ordId")
942                        .and_then(|v| v.as_str())
943                        .filter(|s| !s.is_empty())
944                        .map(VenueOrderId::new);
945
946                    let ts = first
947                        .get("ts")
948                        .and_then(|v| v.as_str())
949                        .and_then(|s| s.parse::<u64>().ok())
950                        .map_or_else(
951                            || self.clock.get_time_ns(),
952                            |ms| UnixNanos::from(ms * 1_000_000),
953                        );
954
955                    (ord_id, ts)
956                } else {
957                    (None, self.clock.get_time_ns())
958                };
959
960                if let Some(instrument) = self.instruments_cache.get(&instrument_id.symbol.inner())
961                {
962                    match params {
963                        PendingOrderParams::Regular(order_params) => {
964                            let order_type = determine_order_type(
965                                order_params.ord_type,
966                                order_params.px.as_deref().unwrap_or(""),
967                            );
968
969                            let is_explicit_quote_sized = order_params
970                                .tgt_ccy
971                                .is_some_and(|tgt| tgt == OKXTargetCurrency::QuoteCcy);
972
973                            // SPOT market BUY in cash mode with no tgt_ccy defaults to quote-sizing
974                            let is_implicit_quote_sized = order_params.tgt_ccy.is_none()
975                                && order_params.side == OKXSide::Buy
976                                && order_type == OrderType::Market
977                                && order_params.td_mode == OKXTradeMode::Cash
978                                && instrument.instrument_class().as_ref() == "SPOT";
979
980                            if is_explicit_quote_sized || is_implicit_quote_sized {
981                                // For quote-sized orders, sz is in quote currency (USDT),
982                                // not base currency (ETH). We can't accurately parse the
983                                // base quantity without the fill price, so we skip the
984                                // synthetic OrderAccepted and rely on the orders channel
985                                tracing::info!(
986                                    "Skipping synthetic OrderAccepted for {} quote-sized order: client_order_id={client_order_id}, venue_order_id={venue_order_id:?}",
987                                    if is_explicit_quote_sized {
988                                        "explicit"
989                                    } else {
990                                        "implicit"
991                                    },
992                                );
993                                return None;
994                            }
995
996                            let order_side = order_params.side.into();
997                            let time_in_force = match order_params.ord_type {
998                                OKXOrderType::Fok => TimeInForce::Fok,
999                                OKXOrderType::Ioc | OKXOrderType::OptimalLimitIoc => {
1000                                    TimeInForce::Ioc
1001                                }
1002                                _ => TimeInForce::Gtc,
1003                            };
1004
1005                            let size_precision = instrument.size_precision();
1006                            let quantity = match parse_quantity(&order_params.sz, size_precision) {
1007                                Ok(q) => q,
1008                                Err(e) => {
1009                                    tracing::error!(
1010                                        "Failed to parse quantity for accepted order: {e}"
1011                                    );
1012                                    return None;
1013                                }
1014                            };
1015
1016                            let filled_qty = Quantity::zero(size_precision);
1017
1018                            let mut report = OrderStatusReport::new(
1019                                self.account_id,
1020                                instrument_id,
1021                                Some(client_order_id),
1022                                venue_order_id.unwrap_or_else(|| VenueOrderId::new("PENDING")),
1023                                order_side,
1024                                order_type,
1025                                time_in_force,
1026                                OrderStatus::Accepted,
1027                                quantity,
1028                                filled_qty,
1029                                ts_accepted,
1030                                ts_accepted, // ts_last same as ts_accepted for new orders
1031                                ts_init,
1032                                None, // Generate UUID4 automatically
1033                            );
1034
1035                            if let Some(px) = &order_params.px
1036                                && !px.is_empty()
1037                                && let Ok(price) = parse_price(px, instrument.price_precision())
1038                            {
1039                                report = report.with_price(price);
1040                            }
1041
1042                            if let Some(true) = order_params.reduce_only {
1043                                report = report.with_reduce_only(true);
1044                            }
1045
1046                            if order_type == OrderType::Limit
1047                                && order_params.ord_type == OKXOrderType::PostOnly
1048                            {
1049                                report = report.with_post_only(true);
1050                            }
1051
1052                            if let Some(ref v_order_id) = venue_order_id {
1053                                self.emitted_order_accepted.insert(*v_order_id, ());
1054                            }
1055
1056                            tracing::debug!(
1057                                "Order accepted: client_order_id={client_order_id}, venue_order_id={:?}",
1058                                venue_order_id
1059                            );
1060
1061                            return Some(NautilusWsMessage::ExecutionReports(vec![
1062                                ExecutionReport::Order(report),
1063                            ]));
1064                        }
1065                        PendingOrderParams::Algo(_) => {
1066                            tracing::info!(
1067                                "Algo order placement confirmed: client_order_id={client_order_id}, venue_order_id={:?}",
1068                                venue_order_id
1069                            );
1070                        }
1071                    }
1072                } else {
1073                    tracing::error!("Instrument not found for accepted order: {instrument_id}");
1074                }
1075            }
1076
1077            if let Some(first) = data.first()
1078                && let Some(success_msg) = first.get("sMsg").and_then(|value| value.as_str())
1079            {
1080                tracing::debug!("Order details: {success_msg}");
1081            }
1082
1083            return None;
1084        }
1085
1086        let error_msg = data
1087            .first()
1088            .and_then(|d| d.get("sMsg"))
1089            .and_then(|s| s.as_str())
1090            .unwrap_or(&msg)
1091            .to_string();
1092
1093        if let Some(first) = data.first() {
1094            tracing::debug!(
1095                "Error data fields: {}",
1096                serde_json::to_string_pretty(first)
1097                    .unwrap_or_else(|_| "unable to serialize".to_string())
1098            );
1099        }
1100
1101        tracing::warn!("Order operation failed: id={id:?} op={op} code={code} msg={error_msg}");
1102
1103        let ts_event = self.clock.get_time_ns();
1104
1105        if let Some(request_id) = &id {
1106            match op {
1107                OKXWsOperation::Order => {
1108                    if let Some((_params, client_order_id, trader_id, strategy_id, instrument_id)) =
1109                        self.pending_place_requests.remove(request_id)
1110                    {
1111                        let due_post_only = is_post_only_rejection(code.as_str(), &data);
1112                        let rejected = OrderRejected::new(
1113                            trader_id,
1114                            strategy_id,
1115                            instrument_id,
1116                            client_order_id,
1117                            self.account_id,
1118                            Ustr::from(error_msg.as_str()),
1119                            UUID4::new(),
1120                            ts_event,
1121                            ts_init,
1122                            false, // Not from reconciliation
1123                            due_post_only,
1124                        );
1125
1126                        return Some(NautilusWsMessage::OrderRejected(rejected));
1127                    }
1128                }
1129                OKXWsOperation::CancelOrder => {
1130                    if let Some((
1131                        client_order_id,
1132                        trader_id,
1133                        strategy_id,
1134                        instrument_id,
1135                        venue_order_id,
1136                    )) = self.pending_cancel_requests.remove(request_id)
1137                    {
1138                        let rejected = OrderCancelRejected::new(
1139                            trader_id,
1140                            strategy_id,
1141                            instrument_id,
1142                            client_order_id,
1143                            Ustr::from(error_msg.as_str()),
1144                            UUID4::new(),
1145                            ts_event,
1146                            ts_init,
1147                            false, // Not from reconciliation
1148                            venue_order_id,
1149                            Some(self.account_id),
1150                        );
1151
1152                        return Some(NautilusWsMessage::OrderCancelRejected(rejected));
1153                    }
1154                }
1155                OKXWsOperation::AmendOrder => {
1156                    if let Some((
1157                        client_order_id,
1158                        trader_id,
1159                        strategy_id,
1160                        instrument_id,
1161                        venue_order_id,
1162                    )) = self.pending_amend_requests.remove(request_id)
1163                    {
1164                        let rejected = OrderModifyRejected::new(
1165                            trader_id,
1166                            strategy_id,
1167                            instrument_id,
1168                            client_order_id,
1169                            Ustr::from(error_msg.as_str()),
1170                            UUID4::new(),
1171                            ts_event,
1172                            ts_init,
1173                            false, // Not from reconciliation
1174                            venue_order_id,
1175                            Some(self.account_id),
1176                        );
1177
1178                        return Some(NautilusWsMessage::OrderModifyRejected(rejected));
1179                    }
1180                }
1181                OKXWsOperation::OrderAlgo => {
1182                    if let Some((_params, client_order_id, trader_id, strategy_id, instrument_id)) =
1183                        self.pending_place_requests.remove(request_id)
1184                    {
1185                        let due_post_only = is_post_only_rejection(code.as_str(), &data);
1186                        let rejected = OrderRejected::new(
1187                            trader_id,
1188                            strategy_id,
1189                            instrument_id,
1190                            client_order_id,
1191                            self.account_id,
1192                            Ustr::from(error_msg.as_str()),
1193                            UUID4::new(),
1194                            ts_event,
1195                            ts_init,
1196                            false, // Not from reconciliation
1197                            due_post_only,
1198                        );
1199
1200                        return Some(NautilusWsMessage::OrderRejected(rejected));
1201                    }
1202                }
1203                OKXWsOperation::CancelAlgos => {
1204                    if let Some((
1205                        client_order_id,
1206                        trader_id,
1207                        strategy_id,
1208                        instrument_id,
1209                        venue_order_id,
1210                    )) = self.pending_cancel_requests.remove(request_id)
1211                    {
1212                        let rejected = OrderCancelRejected::new(
1213                            trader_id,
1214                            strategy_id,
1215                            instrument_id,
1216                            client_order_id,
1217                            Ustr::from(error_msg.as_str()),
1218                            UUID4::new(),
1219                            ts_event,
1220                            ts_init,
1221                            false, // Not from reconciliation
1222                            venue_order_id,
1223                            Some(self.account_id),
1224                        );
1225
1226                        return Some(NautilusWsMessage::OrderCancelRejected(rejected));
1227                    }
1228                }
1229                OKXWsOperation::MassCancel => {
1230                    if let Some(instrument_id) =
1231                        self.pending_mass_cancel_requests.remove(request_id)
1232                    {
1233                        tracing::error!(
1234                            "Mass cancel operation failed for {}: code={code} msg={error_msg}",
1235                            instrument_id
1236                        );
1237                        let error = OKXWebSocketError {
1238                            code,
1239                            message: format!(
1240                                "Mass cancel failed for {}: {}",
1241                                instrument_id, error_msg
1242                            ),
1243                            conn_id: None,
1244                            timestamp: ts_event.as_u64(),
1245                        };
1246                        return Some(NautilusWsMessage::Error(error));
1247                    } else {
1248                        tracing::error!(
1249                            "Mass cancel operation failed: code={code} msg={error_msg}"
1250                        );
1251                    }
1252                }
1253                OKXWsOperation::BatchCancelOrders => {
1254                    tracing::warn!(
1255                        "Batch cancel operation failed: id={id:?} code={code} msg={error_msg} data_count={}",
1256                        data.len()
1257                    );
1258
1259                    // Iterate through data array to check per-order errors
1260                    for (idx, entry) in data.iter().enumerate() {
1261                        let entry_code =
1262                            entry.get("sCode").and_then(|v| v.as_str()).unwrap_or(&code);
1263                        let entry_msg = entry
1264                            .get("sMsg")
1265                            .and_then(|v| v.as_str())
1266                            .unwrap_or(&error_msg);
1267
1268                        if entry_code != "0" {
1269                            // Try to extract client order ID for targeted error events
1270                            if let Some(cl_ord_id_str) = entry
1271                                .get("clOrdId")
1272                                .and_then(|v| v.as_str())
1273                                .filter(|s| !s.is_empty())
1274                            {
1275                                tracing::error!(
1276                                    "Batch cancel failed for order {}: sCode={} sMsg={}",
1277                                    cl_ord_id_str,
1278                                    entry_code,
1279                                    entry_msg
1280                                );
1281                                // TODO: Emit OrderCancelRejected event once we track
1282                                // batch cancel metadata (client_order_id, trader_id, etc.)
1283                            } else {
1284                                tracing::error!(
1285                                    "Batch cancel entry[{}] failed: sCode={} sMsg={} data={:?}",
1286                                    idx,
1287                                    entry_code,
1288                                    entry_msg,
1289                                    entry
1290                                );
1291                            }
1292                        }
1293                    }
1294
1295                    // Emit generic error for the batch operation
1296                    let error = OKXWebSocketError {
1297                        code,
1298                        message: format!("Batch cancel failed: {}", error_msg),
1299                        conn_id: None,
1300                        timestamp: ts_event.as_u64(),
1301                    };
1302                    return Some(NautilusWsMessage::Error(error));
1303                }
1304                _ => tracing::warn!("Unhandled operation type for rejection: {op}"),
1305            }
1306        }
1307
1308        let error = OKXWebSocketError {
1309            code,
1310            message: error_msg,
1311            conn_id: None,
1312            timestamp: ts_event.as_u64(),
1313        };
1314        Some(NautilusWsMessage::Error(error))
1315    }
1316
1317    fn handle_book_data(
1318        &self,
1319        arg: OKXWebSocketArg,
1320        action: OKXBookAction,
1321        data: Vec<OKXBookMsg>,
1322        ts_init: UnixNanos,
1323    ) -> Option<NautilusWsMessage> {
1324        let Some(inst_id) = arg.inst_id else {
1325            tracing::error!("Instrument ID missing for book data event");
1326            return None;
1327        };
1328
1329        let inst = self.instruments_cache.get(&inst_id)?;
1330
1331        let instrument_id = inst.id();
1332        let price_precision = inst.price_precision();
1333        let size_precision = inst.size_precision();
1334
1335        match parse_book_msg_vec(
1336            data,
1337            &instrument_id,
1338            price_precision,
1339            size_precision,
1340            action,
1341            ts_init,
1342        ) {
1343            Ok(payloads) => Some(NautilusWsMessage::Data(payloads)),
1344            Err(e) => {
1345                tracing::error!("Failed to parse book message: {e}");
1346                None
1347            }
1348        }
1349    }
1350
1351    fn handle_account_data(
1352        &mut self,
1353        data: Value,
1354        ts_init: UnixNanos,
1355    ) -> Option<NautilusWsMessage> {
1356        match serde_json::from_value::<Vec<OKXAccount>>(data) {
1357            Ok(accounts) => {
1358                if let Some(account) = accounts.first() {
1359                    match parse_account_state(account, self.account_id, ts_init) {
1360                        Ok(account_state) => {
1361                            if let Some(last_account_state) = &self.last_account_state
1362                                && account_state.has_same_balances_and_margins(last_account_state)
1363                            {
1364                                return None;
1365                            }
1366                            self.last_account_state = Some(account_state.clone());
1367                            Some(NautilusWsMessage::AccountUpdate(account_state))
1368                        }
1369                        Err(e) => {
1370                            tracing::error!("Failed to parse account state: {e}");
1371                            None
1372                        }
1373                    }
1374                } else {
1375                    None
1376                }
1377            }
1378            Err(e) => {
1379                tracing::error!("Failed to parse account data: {e}");
1380                None
1381            }
1382        }
1383    }
1384
1385    fn handle_positions_data(&mut self, data: Value, ts_init: UnixNanos) {
1386        match serde_json::from_value::<Vec<OKXPosition>>(data) {
1387            Ok(positions) => {
1388                tracing::debug!("Received {} position update(s)", positions.len());
1389
1390                for position in positions {
1391                    let instrument_id =
1392                        match InstrumentId::from_as_ref(format!("{}.OKX", position.inst_id)) {
1393                            Ok(id) => id,
1394                            Err(e) => {
1395                                tracing::error!(
1396                                    "Failed to parse instrument ID from {}: {e}",
1397                                    position.inst_id
1398                                );
1399                                continue;
1400                            }
1401                        };
1402
1403                    let instrument = match self.instruments_cache.get(&position.inst_id) {
1404                        Some(inst) => inst,
1405                        None => {
1406                            tracing::warn!(
1407                                "Received position update for unknown instrument {}, skipping",
1408                                instrument_id
1409                            );
1410                            continue;
1411                        }
1412                    };
1413
1414                    let size_precision = instrument.size_precision();
1415
1416                    match parse_position_status_report(
1417                        position,
1418                        self.account_id,
1419                        instrument_id,
1420                        size_precision,
1421                        ts_init,
1422                    ) {
1423                        Ok(position_report) => {
1424                            self.pending_messages
1425                                .push_back(NautilusWsMessage::PositionUpdate(position_report));
1426                        }
1427                        Err(e) => {
1428                            tracing::error!(
1429                                "Failed to parse position status report for {}: {e}",
1430                                instrument_id
1431                            );
1432                        }
1433                    }
1434                }
1435            }
1436            Err(e) => {
1437                tracing::error!("Failed to parse positions data: {e}");
1438            }
1439        }
1440    }
1441
1442    fn handle_orders_data(&mut self, data: Value, ts_init: UnixNanos) -> Option<NautilusWsMessage> {
1443        let orders: Vec<OKXOrderMsg> = match serde_json::from_value(data) {
1444            Ok(orders) => orders,
1445            Err(e) => {
1446                tracing::error!("Failed to deserialize orders channel payload: {e}");
1447                return None;
1448            }
1449        };
1450
1451        tracing::debug!(
1452            "Received {} order message(s) from orders channel",
1453            orders.len()
1454        );
1455
1456        let mut exec_reports: Vec<ExecutionReport> = Vec::with_capacity(orders.len());
1457
1458        for msg in orders {
1459            tracing::debug!(
1460                "Processing order message: inst_id={}, cl_ord_id={}, state={:?}, exec_type={:?}",
1461                msg.inst_id,
1462                msg.cl_ord_id,
1463                msg.state,
1464                msg.exec_type
1465            );
1466
1467            if self.try_handle_post_only_auto_cancel(&msg, ts_init, &mut exec_reports) {
1468                continue;
1469            }
1470
1471            let raw_child = parse_client_order_id(&msg.cl_ord_id);
1472            let parent_from_msg = msg
1473                .algo_cl_ord_id
1474                .as_ref()
1475                .filter(|value| !value.is_empty())
1476                .map(ClientOrderId::new);
1477            let effective_client_id =
1478                self.register_client_order_aliases(&raw_child, &parent_from_msg);
1479
1480            match parse_order_msg(
1481                &msg,
1482                self.account_id,
1483                &self.instruments_cache,
1484                &self.fee_cache,
1485                &self.filled_qty_cache,
1486                ts_init,
1487            ) {
1488                Ok(report) => {
1489                    tracing::debug!("Successfully parsed execution report: {:?}", report);
1490
1491                    let is_duplicate_accepted =
1492                        if let ExecutionReport::Order(ref status_report) = report {
1493                            if status_report.order_status == OrderStatus::Accepted {
1494                                self.emitted_order_accepted
1495                                    .contains_key(&status_report.venue_order_id)
1496                            } else {
1497                                false
1498                            }
1499                        } else {
1500                            false
1501                        };
1502
1503                    if is_duplicate_accepted {
1504                        tracing::debug!(
1505                            "Skipping duplicate OrderAccepted for venue_order_id={}",
1506                            if let ExecutionReport::Order(ref r) = report {
1507                                r.venue_order_id.to_string()
1508                            } else {
1509                                "unknown".to_string()
1510                            }
1511                        );
1512                        continue;
1513                    }
1514
1515                    if let ExecutionReport::Order(ref status_report) = report
1516                        && status_report.order_status == OrderStatus::Accepted
1517                    {
1518                        self.emitted_order_accepted
1519                            .insert(status_report.venue_order_id, ());
1520                    }
1521
1522                    let adjusted =
1523                        self.adjust_execution_report(report, &effective_client_id, &raw_child);
1524
1525                    // Clean up tracking for terminal states
1526                    if let ExecutionReport::Order(ref status_report) = adjusted
1527                        && matches!(
1528                            status_report.order_status,
1529                            OrderStatus::Filled
1530                                | OrderStatus::Canceled
1531                                | OrderStatus::Expired
1532                                | OrderStatus::Rejected
1533                        )
1534                    {
1535                        self.emitted_order_accepted
1536                            .remove(&status_report.venue_order_id);
1537                    }
1538
1539                    self.update_caches_with_report(&adjusted);
1540                    exec_reports.push(adjusted);
1541                }
1542                Err(e) => tracing::error!("Failed to parse order message: {e}"),
1543            }
1544        }
1545
1546        if !exec_reports.is_empty() {
1547            tracing::debug!(
1548                "Pushing {} execution report(s) to message queue",
1549                exec_reports.len()
1550            );
1551            self.pending_messages
1552                .push_back(NautilusWsMessage::ExecutionReports(exec_reports));
1553        } else {
1554            tracing::debug!("No execution reports generated from order messages");
1555        }
1556
1557        self.pending_messages.pop_front()
1558    }
1559
1560    fn handle_algo_orders_data(
1561        &mut self,
1562        data: Value,
1563        ts_init: UnixNanos,
1564    ) -> Option<NautilusWsMessage> {
1565        let orders: Vec<OKXAlgoOrderMsg> = match serde_json::from_value(data) {
1566            Ok(orders) => orders,
1567            Err(e) => {
1568                tracing::error!("Failed to deserialize algo orders payload: {e}");
1569                return None;
1570            }
1571        };
1572
1573        let mut exec_reports: Vec<ExecutionReport> = Vec::with_capacity(orders.len());
1574
1575        for msg in orders {
1576            let raw_child = parse_client_order_id(&msg.cl_ord_id);
1577            let parent_from_msg = parse_client_order_id(&msg.algo_cl_ord_id);
1578            let effective_client_id =
1579                self.register_client_order_aliases(&raw_child, &parent_from_msg);
1580
1581            match parse_algo_order_msg(msg, self.account_id, &self.instruments_cache, ts_init) {
1582                Ok(report) => {
1583                    let adjusted =
1584                        self.adjust_execution_report(report, &effective_client_id, &raw_child);
1585                    self.update_caches_with_report(&adjusted);
1586                    exec_reports.push(adjusted);
1587                }
1588                Err(e) => {
1589                    tracing::error!("Failed to parse algo order message: {e}");
1590                }
1591            }
1592        }
1593
1594        if !exec_reports.is_empty() {
1595            Some(NautilusWsMessage::ExecutionReports(exec_reports))
1596        } else {
1597            None
1598        }
1599    }
1600
1601    fn handle_other_channel_data(
1602        &mut self,
1603        channel: OKXWsChannel,
1604        inst_id: Option<Ustr>,
1605        data: Value,
1606        ts_init: UnixNanos,
1607    ) -> Option<NautilusWsMessage> {
1608        let Some(inst_id) = inst_id else {
1609            tracing::error!("No instrument for channel {:?}", channel);
1610            return None;
1611        };
1612
1613        let Some(instrument) = self.instruments_cache.get(&inst_id) else {
1614            tracing::error!(
1615                "No instrument for channel {:?}, inst_id {:?}",
1616                channel,
1617                inst_id
1618            );
1619            return None;
1620        };
1621
1622        let instrument_id = instrument.id();
1623        let price_precision = instrument.price_precision();
1624        let size_precision = instrument.size_precision();
1625
1626        match parse_ws_message_data(
1627            &channel,
1628            data,
1629            &instrument_id,
1630            price_precision,
1631            size_precision,
1632            ts_init,
1633            &mut self.funding_rate_cache,
1634            &self.instruments_cache,
1635        ) {
1636            Ok(Some(msg)) => {
1637                if let NautilusWsMessage::Instrument(ref inst) = msg {
1638                    self.instruments_cache
1639                        .insert(inst.symbol().inner(), inst.as_ref().clone());
1640                }
1641                Some(msg)
1642            }
1643            Ok(None) => None,
1644            Err(e) => {
1645                tracing::error!("Error parsing message for channel {:?}: {e}", channel);
1646                None
1647            }
1648        }
1649    }
1650
1651    pub(crate) fn parse_raw_message(
1652        msg: tokio_tungstenite::tungstenite::Message,
1653    ) -> Option<OKXWsMessage> {
1654        match msg {
1655            tokio_tungstenite::tungstenite::Message::Text(text) => {
1656                if text == TEXT_PONG {
1657                    tracing::trace!("Received pong from OKX");
1658                    return None;
1659                }
1660                if text == TEXT_PING {
1661                    tracing::trace!("Received ping from OKX (text)");
1662                    return Some(OKXWsMessage::Ping);
1663                }
1664
1665                if text == RECONNECTED {
1666                    tracing::debug!("Received WebSocket reconnection signal");
1667                    return Some(OKXWsMessage::Reconnected);
1668                }
1669                tracing::trace!("Received WebSocket message: {text}");
1670
1671                match serde_json::from_str(&text) {
1672                    Ok(ws_event) => match &ws_event {
1673                        OKXWsMessage::Error { code, msg } => {
1674                            tracing::error!("WebSocket error: {code} - {msg}");
1675                            Some(ws_event)
1676                        }
1677                        OKXWsMessage::Login {
1678                            event,
1679                            code,
1680                            msg,
1681                            conn_id,
1682                        } => {
1683                            if code == "0" {
1684                                tracing::info!(conn_id = %conn_id, "WebSocket authenticated");
1685                            } else {
1686                                tracing::error!(event = %event, code = %code, error = %msg, "WebSocket authentication failed");
1687                            }
1688                            Some(ws_event)
1689                        }
1690                        OKXWsMessage::Subscription {
1691                            event,
1692                            arg,
1693                            conn_id,
1694                            ..
1695                        } => {
1696                            let channel_str = serde_json::to_string(&arg.channel)
1697                                .expect("Invalid OKX websocket channel")
1698                                .trim_matches('"')
1699                                .to_string();
1700                            tracing::debug!("{event}d: channel={channel_str}, conn_id={conn_id}");
1701                            Some(ws_event)
1702                        }
1703                        OKXWsMessage::ChannelConnCount {
1704                            event: _,
1705                            channel,
1706                            conn_count,
1707                            conn_id,
1708                        } => {
1709                            let channel_str = serde_json::to_string(&channel)
1710                                .expect("Invalid OKX websocket channel")
1711                                .trim_matches('"')
1712                                .to_string();
1713                            tracing::debug!(
1714                                "Channel connection status: channel={channel_str}, connections={conn_count}, conn_id={conn_id}",
1715                            );
1716                            None
1717                        }
1718                        OKXWsMessage::Ping => {
1719                            tracing::trace!("Ignoring ping event parsed from text payload");
1720                            None
1721                        }
1722                        OKXWsMessage::Data { .. } => Some(ws_event),
1723                        OKXWsMessage::BookData { .. } => Some(ws_event),
1724                        OKXWsMessage::OrderResponse {
1725                            id,
1726                            op,
1727                            code,
1728                            msg: _,
1729                            data,
1730                        } => {
1731                            if code == "0" {
1732                                tracing::debug!(
1733                                    "Order operation successful: id={:?}, op={op}, code={code}",
1734                                    id
1735                                );
1736
1737                                if let Some(order_data) = data.first() {
1738                                    let success_msg = order_data
1739                                        .get("sMsg")
1740                                        .and_then(|s| s.as_str())
1741                                        .unwrap_or("Order operation successful");
1742                                    tracing::debug!("Order success details: {success_msg}");
1743                                }
1744                            }
1745                            Some(ws_event)
1746                        }
1747                        OKXWsMessage::Reconnected => {
1748                            // This shouldn't happen as we handle RECONNECTED string directly
1749                            tracing::warn!("Unexpected Reconnected event from deserialization");
1750                            None
1751                        }
1752                    },
1753                    Err(e) => {
1754                        tracing::error!("Failed to parse message: {e}: {text}");
1755                        None
1756                    }
1757                }
1758            }
1759            Message::Ping(_payload) => {
1760                tracing::trace!("Received binary ping frame from OKX");
1761                Some(OKXWsMessage::Ping)
1762            }
1763            Message::Pong(payload) => {
1764                tracing::trace!("Received pong frame from OKX ({} bytes)", payload.len());
1765                None
1766            }
1767            Message::Binary(msg) => {
1768                tracing::debug!("Raw binary: {msg:?}");
1769                None
1770            }
1771            Message::Close(_) => {
1772                tracing::debug!("Received close message");
1773                None
1774            }
1775            msg => {
1776                tracing::warn!("Unexpected message: {msg}");
1777                None
1778            }
1779        }
1780    }
1781
1782    fn generate_unique_request_id(&self) -> String {
1783        self.request_id_counter
1784            .fetch_add(1, Ordering::SeqCst)
1785            .to_string()
1786    }
1787
1788    fn get_instrument_type_and_family_from_instrument(
1789        instrument: &InstrumentAny,
1790    ) -> anyhow::Result<(OKXInstrumentType, String)> {
1791        let inst_type = okx_instrument_type(instrument)?;
1792        let symbol = instrument.symbol().inner();
1793
1794        // Determine instrument family based on instrument type
1795        let inst_family = match instrument {
1796            InstrumentAny::CurrencyPair(_) => symbol.as_str().to_string(),
1797            InstrumentAny::CryptoPerpetual(_) => {
1798                // For SWAP: "BTC-USDT-SWAP" -> "BTC-USDT"
1799                symbol
1800                    .as_str()
1801                    .strip_suffix("-SWAP")
1802                    .unwrap_or(symbol.as_str())
1803                    .to_string()
1804            }
1805            InstrumentAny::CryptoFuture(_) => {
1806                // For FUTURES: "BTC-USDT-250328" -> "BTC-USDT"
1807                // Extract the base pair by removing date suffix
1808                let s = symbol.as_str();
1809                if let Some(idx) = s.rfind('-') {
1810                    s[..idx].to_string()
1811                } else {
1812                    s.to_string()
1813                }
1814            }
1815            _ => {
1816                anyhow::bail!("Unsupported instrument type for OKX");
1817            }
1818        };
1819
1820        Ok((inst_type, inst_family))
1821    }
1822
1823    async fn handle_mass_cancel(&mut self, instrument_id: InstrumentId) -> anyhow::Result<()> {
1824        let instrument = self
1825            .instruments_cache
1826            .get(&instrument_id.symbol.inner())
1827            .ok_or_else(|| anyhow::anyhow!("Unknown instrument {instrument_id}"))?;
1828
1829        let (inst_type, inst_family) =
1830            Self::get_instrument_type_and_family_from_instrument(instrument)?;
1831
1832        let params = WsMassCancelParams {
1833            inst_type,
1834            inst_family: Ustr::from(&inst_family),
1835        };
1836
1837        let args =
1838            vec![serde_json::to_value(params).map_err(|e| anyhow::anyhow!("JSON error: {e}"))?];
1839
1840        let request_id = self.generate_unique_request_id();
1841
1842        self.pending_mass_cancel_requests
1843            .insert(request_id.clone(), instrument_id);
1844
1845        let request = OKXWsRequest {
1846            id: Some(request_id.clone()),
1847            op: OKXWsOperation::MassCancel,
1848            exp_time: None,
1849            args,
1850        };
1851
1852        let payload = serde_json::to_string(&request)
1853            .map_err(|e| anyhow::anyhow!("Failed to serialize mass cancel request: {e}"))?;
1854
1855        match self
1856            .send_with_retry(payload, Some(vec![OKX_RATE_LIMIT_KEY_CANCEL.to_string()]))
1857            .await
1858        {
1859            Ok(()) => {
1860                tracing::debug!("Sent mass cancel for {instrument_id}");
1861                Ok(())
1862            }
1863            Err(e) => {
1864                tracing::error!(error = %e, "Failed to send mass cancel after retries");
1865
1866                self.pending_mass_cancel_requests.remove(&request_id);
1867
1868                let error = OKXWebSocketError {
1869                    code: "CLIENT_ERROR".to_string(),
1870                    message: format!("Mass cancel failed for {}: {}", instrument_id, e),
1871                    conn_id: None,
1872                    timestamp: self.clock.get_time_ns().as_u64(),
1873                };
1874                let _ = self.send(NautilusWsMessage::Error(error));
1875
1876                Err(anyhow::anyhow!("Failed to send mass cancel: {e}"))
1877            }
1878        }
1879    }
1880
1881    async fn handle_batch_cancel_orders(
1882        &self,
1883        args: Vec<Value>,
1884        request_id: String,
1885    ) -> anyhow::Result<()> {
1886        let request = OKXWsRequest {
1887            id: Some(request_id),
1888            op: OKXWsOperation::BatchCancelOrders,
1889            exp_time: None,
1890            args,
1891        };
1892
1893        let payload = serde_json::to_string(&request)
1894            .map_err(|e| anyhow::anyhow!("Failed to serialize batch cancel request: {e}"))?;
1895
1896        if let Some(client) = &self.inner {
1897            client
1898                .send_text(payload, Some(vec![OKX_RATE_LIMIT_KEY_CANCEL.to_string()]))
1899                .await
1900                .map_err(|e| anyhow::anyhow!("Failed to send batch cancel: {e}"))?;
1901            tracing::debug!("Sent batch cancel orders");
1902            Ok(())
1903        } else {
1904            Err(anyhow::anyhow!("No active WebSocket client"))
1905        }
1906    }
1907
1908    async fn handle_batch_place_orders(
1909        &self,
1910        args: Vec<Value>,
1911        request_id: String,
1912    ) -> anyhow::Result<()> {
1913        let request = OKXWsRequest {
1914            id: Some(request_id),
1915            op: OKXWsOperation::BatchOrders,
1916            exp_time: None,
1917            args,
1918        };
1919
1920        let payload = serde_json::to_string(&request)
1921            .map_err(|e| anyhow::anyhow!("Failed to serialize batch place request: {e}"))?;
1922
1923        if let Some(client) = &self.inner {
1924            client
1925                .send_text(payload, Some(vec![OKX_RATE_LIMIT_KEY_ORDER.to_string()]))
1926                .await
1927                .map_err(|e| anyhow::anyhow!("Failed to send batch place: {e}"))?;
1928            tracing::debug!("Sent batch place orders");
1929            Ok(())
1930        } else {
1931            Err(anyhow::anyhow!("No active WebSocket client"))
1932        }
1933    }
1934
1935    async fn handle_batch_amend_orders(
1936        &self,
1937        args: Vec<Value>,
1938        request_id: String,
1939    ) -> anyhow::Result<()> {
1940        let request = OKXWsRequest {
1941            id: Some(request_id),
1942            op: OKXWsOperation::BatchAmendOrders,
1943            exp_time: None,
1944            args,
1945        };
1946
1947        let payload = serde_json::to_string(&request)
1948            .map_err(|e| anyhow::anyhow!("Failed to serialize batch amend request: {e}"))?;
1949
1950        if let Some(client) = &self.inner {
1951            client
1952                .send_text(payload, Some(vec![OKX_RATE_LIMIT_KEY_AMEND.to_string()]))
1953                .await
1954                .map_err(|e| anyhow::anyhow!("Failed to send batch amend: {e}"))?;
1955            tracing::debug!("Sent batch amend orders");
1956            Ok(())
1957        } else {
1958            Err(anyhow::anyhow!("No active WebSocket client"))
1959        }
1960    }
1961
1962    async fn handle_subscribe(&self, args: Vec<OKXSubscriptionArg>) -> anyhow::Result<()> {
1963        for arg in &args {
1964            tracing::debug!(channel = ?arg.channel, inst_id = ?arg.inst_id, "Subscribing to channel");
1965        }
1966
1967        let message = OKXSubscription {
1968            op: OKXWsOperation::Subscribe,
1969            args,
1970        };
1971
1972        let json_txt = serde_json::to_string(&message)
1973            .map_err(|e| anyhow::anyhow!("Failed to serialize subscription: {e}"))?;
1974
1975        self.send_with_retry(
1976            json_txt,
1977            Some(vec![OKX_RATE_LIMIT_KEY_SUBSCRIPTION.to_string()]),
1978        )
1979        .await
1980        .map_err(|e| anyhow::anyhow!("Failed to send subscription after retries: {e}"))?;
1981        Ok(())
1982    }
1983
1984    async fn handle_unsubscribe(&self, args: Vec<OKXSubscriptionArg>) -> anyhow::Result<()> {
1985        for arg in &args {
1986            tracing::debug!(channel = ?arg.channel, inst_id = ?arg.inst_id, "Unsubscribing from channel");
1987        }
1988
1989        let message = OKXSubscription {
1990            op: OKXWsOperation::Unsubscribe,
1991            args,
1992        };
1993
1994        let json_txt = serde_json::to_string(&message)
1995            .map_err(|e| anyhow::anyhow!("Failed to serialize unsubscription: {e}"))?;
1996
1997        self.send_with_retry(
1998            json_txt,
1999            Some(vec![OKX_RATE_LIMIT_KEY_SUBSCRIPTION.to_string()]),
2000        )
2001        .await
2002        .map_err(|e| anyhow::anyhow!("Failed to send unsubscription after retries: {e}"))?;
2003        Ok(())
2004    }
2005
2006    async fn handle_place_order(
2007        &mut self,
2008        params: WsPostOrderParams,
2009        client_order_id: ClientOrderId,
2010        trader_id: TraderId,
2011        strategy_id: StrategyId,
2012        instrument_id: InstrumentId,
2013    ) -> anyhow::Result<()> {
2014        let request_id = self.generate_unique_request_id();
2015
2016        self.pending_place_requests.insert(
2017            request_id.clone(),
2018            (
2019                PendingOrderParams::Regular(params.clone()),
2020                client_order_id,
2021                trader_id,
2022                strategy_id,
2023                instrument_id,
2024            ),
2025        );
2026
2027        let request = OKXWsRequest {
2028            id: Some(request_id.clone()),
2029            op: OKXWsOperation::Order,
2030            exp_time: None,
2031            args: vec![params],
2032        };
2033
2034        let payload = serde_json::to_string(&request)
2035            .map_err(|e| anyhow::anyhow!("Failed to serialize place order request: {e}"))?;
2036
2037        match self
2038            .send_with_retry(payload, Some(vec![OKX_RATE_LIMIT_KEY_ORDER.to_string()]))
2039            .await
2040        {
2041            Ok(()) => {
2042                tracing::debug!("Sent place order request");
2043                Ok(())
2044            }
2045            Err(e) => {
2046                tracing::error!(error = %e, "Failed to send place order after retries");
2047
2048                self.pending_place_requests.remove(&request_id);
2049
2050                let ts_now = self.clock.get_time_ns();
2051                let rejected = OrderRejected::new(
2052                    trader_id,
2053                    strategy_id,
2054                    instrument_id,
2055                    client_order_id,
2056                    self.account_id,
2057                    Ustr::from(&format!("WebSocket send failed: {e}")),
2058                    UUID4::new(),
2059                    ts_now, // ts_event
2060                    ts_now, // ts_init
2061                    false,  // Not from reconciliation
2062                    false,  // Not due to post-only
2063                );
2064                let _ = self.send(NautilusWsMessage::OrderRejected(rejected));
2065
2066                Err(anyhow::anyhow!("Failed to send place order: {e}"))
2067            }
2068        }
2069    }
2070
2071    async fn handle_place_algo_order(
2072        &mut self,
2073        params: WsPostAlgoOrderParams,
2074        client_order_id: ClientOrderId,
2075        trader_id: TraderId,
2076        strategy_id: StrategyId,
2077        instrument_id: InstrumentId,
2078    ) -> anyhow::Result<()> {
2079        let request_id = self.generate_unique_request_id();
2080
2081        self.pending_place_requests.insert(
2082            request_id.clone(),
2083            (
2084                PendingOrderParams::Algo(params.clone()),
2085                client_order_id,
2086                trader_id,
2087                strategy_id,
2088                instrument_id,
2089            ),
2090        );
2091
2092        let request = OKXWsRequest {
2093            id: Some(request_id.clone()),
2094            op: OKXWsOperation::OrderAlgo,
2095            exp_time: None,
2096            args: vec![params],
2097        };
2098
2099        let payload = serde_json::to_string(&request)
2100            .map_err(|e| anyhow::anyhow!("Failed to serialize place algo order request: {e}"))?;
2101
2102        match self
2103            .send_with_retry(payload, Some(vec![OKX_RATE_LIMIT_KEY_ORDER.to_string()]))
2104            .await
2105        {
2106            Ok(()) => {
2107                tracing::debug!("Sent place algo order request");
2108                Ok(())
2109            }
2110            Err(e) => {
2111                tracing::error!(error = %e, "Failed to send place algo order after retries");
2112
2113                self.pending_place_requests.remove(&request_id);
2114
2115                let ts_now = self.clock.get_time_ns();
2116                let rejected = OrderRejected::new(
2117                    trader_id,
2118                    strategy_id,
2119                    instrument_id,
2120                    client_order_id,
2121                    self.account_id,
2122                    Ustr::from(&format!("WebSocket send failed: {e}")),
2123                    UUID4::new(),
2124                    ts_now, // ts_event
2125                    ts_now, // ts_init
2126                    false,  // Not from reconciliation
2127                    false,  // Not due to post-only
2128                );
2129                let _ = self.send(NautilusWsMessage::OrderRejected(rejected));
2130
2131                Err(anyhow::anyhow!("Failed to send place algo order: {e}"))
2132            }
2133        }
2134    }
2135
2136    async fn handle_cancel_order(
2137        &mut self,
2138        client_order_id: Option<ClientOrderId>,
2139        venue_order_id: Option<VenueOrderId>,
2140        instrument_id: InstrumentId,
2141        trader_id: TraderId,
2142        strategy_id: StrategyId,
2143    ) -> anyhow::Result<()> {
2144        let mut builder = WsCancelOrderParamsBuilder::default();
2145        builder.inst_id(instrument_id.symbol.as_str());
2146
2147        if let Some(venue_order_id) = venue_order_id {
2148            builder.ord_id(venue_order_id.as_str());
2149        }
2150
2151        if let Some(client_order_id) = client_order_id {
2152            builder.cl_ord_id(client_order_id.as_str());
2153        }
2154
2155        let params = builder
2156            .build()
2157            .map_err(|e| anyhow::anyhow!("Failed to build cancel params: {e}"))?;
2158
2159        let request_id = self.generate_unique_request_id();
2160
2161        // Track pending request if we have a client order ID
2162        if let Some(client_order_id) = client_order_id {
2163            self.pending_cancel_requests.insert(
2164                request_id.clone(),
2165                (
2166                    client_order_id,
2167                    trader_id,
2168                    strategy_id,
2169                    instrument_id,
2170                    venue_order_id,
2171                ),
2172            );
2173        }
2174
2175        let request = OKXWsRequest {
2176            id: Some(request_id.clone()),
2177            op: OKXWsOperation::CancelOrder,
2178            exp_time: None,
2179            args: vec![params],
2180        };
2181
2182        let payload = serde_json::to_string(&request)
2183            .map_err(|e| anyhow::anyhow!("Failed to serialize cancel request: {e}"))?;
2184
2185        match self
2186            .send_with_retry(payload, Some(vec![OKX_RATE_LIMIT_KEY_CANCEL.to_string()]))
2187            .await
2188        {
2189            Ok(()) => {
2190                tracing::debug!("Sent cancel order request");
2191                Ok(())
2192            }
2193            Err(e) => {
2194                tracing::error!(error = %e, "Failed to send cancel order after retries");
2195
2196                self.pending_cancel_requests.remove(&request_id);
2197
2198                if let Some(client_order_id) = client_order_id {
2199                    let ts_now = self.clock.get_time_ns();
2200                    let rejected = OrderCancelRejected::new(
2201                        trader_id,
2202                        strategy_id,
2203                        instrument_id,
2204                        client_order_id,
2205                        Ustr::from(&format!("WebSocket send failed: {e}")),
2206                        UUID4::new(),
2207                        ts_now, // ts_event
2208                        ts_now, // ts_init
2209                        false,  // Not from reconciliation
2210                        venue_order_id,
2211                        Some(self.account_id),
2212                    );
2213                    let _ = self.send(NautilusWsMessage::OrderCancelRejected(rejected));
2214                }
2215
2216                Err(anyhow::anyhow!("Failed to send cancel order: {e}"))
2217            }
2218        }
2219    }
2220
2221    async fn handle_amend_order(
2222        &mut self,
2223        params: WsAmendOrderParams,
2224        client_order_id: ClientOrderId,
2225        trader_id: TraderId,
2226        strategy_id: StrategyId,
2227        instrument_id: InstrumentId,
2228        venue_order_id: Option<VenueOrderId>,
2229    ) -> anyhow::Result<()> {
2230        let request_id = self.generate_unique_request_id();
2231
2232        self.pending_amend_requests.insert(
2233            request_id.clone(),
2234            (
2235                client_order_id,
2236                trader_id,
2237                strategy_id,
2238                instrument_id,
2239                venue_order_id,
2240            ),
2241        );
2242
2243        let request = OKXWsRequest {
2244            id: Some(request_id.clone()),
2245            op: OKXWsOperation::AmendOrder,
2246            exp_time: None,
2247            args: vec![params],
2248        };
2249
2250        let payload = serde_json::to_string(&request)
2251            .map_err(|e| anyhow::anyhow!("Failed to serialize amend order request: {e}"))?;
2252
2253        match self
2254            .send_with_retry(payload, Some(vec![OKX_RATE_LIMIT_KEY_AMEND.to_string()]))
2255            .await
2256        {
2257            Ok(()) => {
2258                tracing::debug!("Sent amend order request");
2259                Ok(())
2260            }
2261            Err(e) => {
2262                tracing::error!(error = %e, "Failed to send amend order after retries");
2263
2264                self.pending_amend_requests.remove(&request_id);
2265
2266                let ts_now = self.clock.get_time_ns();
2267                let rejected = OrderModifyRejected::new(
2268                    trader_id,
2269                    strategy_id,
2270                    instrument_id,
2271                    client_order_id,
2272                    Ustr::from(&format!("WebSocket send failed: {e}")),
2273                    UUID4::new(),
2274                    ts_now, // ts_event
2275                    ts_now, // ts_init
2276                    false,  // Not from reconciliation
2277                    venue_order_id,
2278                    Some(self.account_id),
2279                );
2280                let _ = self.send(NautilusWsMessage::OrderModifyRejected(rejected));
2281
2282                Err(anyhow::anyhow!("Failed to send amend order: {e}"))
2283            }
2284        }
2285    }
2286
2287    async fn handle_cancel_algo_order(
2288        &mut self,
2289        client_order_id: Option<ClientOrderId>,
2290        algo_order_id: Option<VenueOrderId>,
2291        instrument_id: InstrumentId,
2292        trader_id: TraderId,
2293        strategy_id: StrategyId,
2294    ) -> anyhow::Result<()> {
2295        let mut builder = WsCancelAlgoOrderParamsBuilder::default();
2296        builder.inst_id(instrument_id.symbol.as_str());
2297
2298        if let Some(client_order_id) = &client_order_id {
2299            builder.algo_cl_ord_id(client_order_id.as_str());
2300        }
2301
2302        if let Some(algo_id) = &algo_order_id {
2303            builder.algo_id(algo_id.as_str());
2304        }
2305
2306        let params = builder
2307            .build()
2308            .map_err(|e| anyhow::anyhow!("Failed to build cancel algo params: {e}"))?;
2309
2310        let request_id = self.generate_unique_request_id();
2311
2312        // Track pending cancellation if we have a client order ID
2313        if let Some(client_order_id) = client_order_id {
2314            self.pending_cancel_requests.insert(
2315                request_id.clone(),
2316                (client_order_id, trader_id, strategy_id, instrument_id, None),
2317            );
2318        }
2319
2320        let request = OKXWsRequest {
2321            id: Some(request_id.clone()),
2322            op: OKXWsOperation::CancelAlgos,
2323            exp_time: None,
2324            args: vec![params],
2325        };
2326
2327        let payload = serde_json::to_string(&request)
2328            .map_err(|e| anyhow::anyhow!("Failed to serialize cancel algo request: {e}"))?;
2329
2330        match self
2331            .send_with_retry(payload, Some(vec![OKX_RATE_LIMIT_KEY_CANCEL.to_string()]))
2332            .await
2333        {
2334            Ok(()) => {
2335                tracing::debug!("Sent cancel algo order request");
2336                Ok(())
2337            }
2338            Err(e) => {
2339                tracing::error!(error = %e, "Failed to send cancel algo order after retries");
2340
2341                self.pending_cancel_requests.remove(&request_id);
2342
2343                if let Some(client_order_id) = client_order_id {
2344                    let ts_now = self.clock.get_time_ns();
2345                    let rejected = OrderCancelRejected::new(
2346                        trader_id,
2347                        strategy_id,
2348                        instrument_id,
2349                        client_order_id,
2350                        Ustr::from(&format!("WebSocket send failed: {e}")),
2351                        UUID4::new(),
2352                        ts_now, // ts_event
2353                        ts_now, // ts_init
2354                        false,  // Not from reconciliation
2355                        None,
2356                        Some(self.account_id),
2357                    );
2358                    let _ = self.send(NautilusWsMessage::OrderCancelRejected(rejected));
2359                }
2360
2361                Err(anyhow::anyhow!("Failed to send cancel algo order: {e}"))
2362            }
2363        }
2364    }
2365}
2366
2367/// Returns `true` when an OKX error payload represents a post-only rejection.
2368pub fn is_post_only_rejection(code: &str, data: &[Value]) -> bool {
2369    if code == OKX_POST_ONLY_ERROR_CODE {
2370        return true;
2371    }
2372
2373    for entry in data {
2374        if let Some(s_code) = entry.get("sCode").and_then(|value| value.as_str())
2375            && s_code == OKX_POST_ONLY_ERROR_CODE
2376        {
2377            return true;
2378        }
2379
2380        if let Some(inner_code) = entry.get("code").and_then(|value| value.as_str())
2381            && inner_code == OKX_POST_ONLY_ERROR_CODE
2382        {
2383            return true;
2384        }
2385    }
2386
2387    false
2388}
2389
2390/// Determines if an OKX WebSocket error should trigger a retry.
2391fn should_retry_okx_error(error: &OKXWsError) -> bool {
2392    match error {
2393        OKXWsError::OkxError { error_code, .. } => should_retry_error_code(error_code),
2394        OKXWsError::TungsteniteError(_) => true, // Network errors are retryable
2395        OKXWsError::ClientError(msg) => {
2396            // Retry on timeout and connection errors (case-insensitive)
2397            let msg_lower = msg.to_lowercase();
2398            msg_lower.contains("timeout")
2399                || msg_lower.contains("timed out")
2400                || msg_lower.contains("connection")
2401                || msg_lower.contains("network")
2402        }
2403        OKXWsError::AuthenticationError(_)
2404        | OKXWsError::JsonError(_)
2405        | OKXWsError::ParsingError(_) => {
2406            // Don't retry authentication or parsing errors automatically
2407            false
2408        }
2409    }
2410}
2411
2412/// Creates a timeout error for the retry manager.
2413fn create_okx_timeout_error(msg: String) -> OKXWsError {
2414    OKXWsError::ClientError(msg)
2415}
2416
2417////////////////////////////////////////////////////////////////////////////////
2418// Tests
2419////////////////////////////////////////////////////////////////////////////////
2420
2421#[cfg(test)]
2422mod tests {
2423    use rstest::rstest;
2424
2425    #[rstest]
2426    fn test_is_post_only_rejection_detects_by_code() {
2427        assert!(super::is_post_only_rejection("51019", &[]));
2428    }
2429
2430    #[rstest]
2431    fn test_is_post_only_rejection_detects_by_inner_code() {
2432        let data = vec![serde_json::json!({
2433            "sCode": "51019"
2434        })];
2435        assert!(super::is_post_only_rejection("50000", &data));
2436    }
2437
2438    #[rstest]
2439    fn test_is_post_only_rejection_false_for_unrelated_error() {
2440        let data = vec![serde_json::json!({
2441            "sMsg": "Insufficient balance"
2442        })];
2443        assert!(!super::is_post_only_rejection("50000", &data));
2444    }
2445}