Skip to main content

nautilus_okx/websocket/
handler.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! WebSocket message handler for OKX.
17//!
18//! The handler runs in a dedicated Tokio task as the I/O boundary between the client
19//! orchestrator and the network layer. It exclusively owns the `WebSocketClient` and
20//! processes commands from the client via an unbounded channel, serializing them to JSON
21//! and sending via the WebSocket. Raw messages are received from the network, deserialized,
22//! and transformed into `NautilusWsMessage` events which are emitted back to the client.
23//!
24//! Responsibilities:
25//! - Command processing: Receives `HandlerCommand` from client, executes WebSocket operations.
26//! - Message transformation: Parses raw venue messages into Nautilus domain events.
27//! - Pending state tracking: Owns `AHashMap` for matching requests/responses (single-threaded).
28//! - Retry logic: Retries transient WebSocket send failures using `RetryManager`.
29//! - Error event emission: Emits `OrderRejected`, `OrderCancelRejected` when retries exhausted.
30
31use std::{
32    collections::VecDeque,
33    sync::{
34        Arc,
35        atomic::{AtomicBool, AtomicU64, Ordering},
36    },
37};
38
39use ahash::AHashMap;
40use dashmap::DashMap;
41use nautilus_common::cache::fifo::FifoCache;
42use nautilus_core::{AtomicTime, UUID4, nanos::UnixNanos, time::get_atomic_clock_realtime};
43use nautilus_model::{
44    enums::{OrderStatus, OrderType},
45    events::{
46        AccountState, OrderAccepted, OrderCancelRejected, OrderModifyRejected, OrderRejected,
47    },
48    identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
49    instruments::{Instrument, InstrumentAny},
50    types::{Money, Quantity},
51};
52use nautilus_network::{
53    RECONNECTED,
54    retry::{RetryManager, create_websocket_retry_manager},
55    websocket::{AuthTracker, SubscriptionState, TEXT_PING, TEXT_PONG, WebSocketClient},
56};
57use serde_json::Value;
58use tokio_tungstenite::tungstenite::Message;
59use ustr::Ustr;
60
61use super::{
62    enums::{OKXSubscriptionEvent, OKXWsChannel, OKXWsOperation},
63    error::OKXWsError,
64    messages::{
65        ExecutionReport, NautilusWsMessage, OKXAlgoOrderMsg, OKXBookMsg, OKXOrderMsg,
66        OKXSubscription, OKXSubscriptionArg, OKXWebSocketArg, OKXWebSocketError, OKXWsMessage,
67        OKXWsRequest, WsAmendOrderParams, WsCancelAlgoOrderParamsBuilder,
68        WsCancelOrderParamsBuilder, WsMassCancelParams, WsPostAlgoOrderParams, WsPostOrderParams,
69    },
70    parse::{
71        OrderStateSnapshot, ParsedOrderEvent, parse_algo_order_msg, parse_book_msg_vec,
72        parse_order_event, parse_order_msg, parse_ws_message_data,
73    },
74    subscription::topic_from_websocket_arg,
75};
76use crate::{
77    common::{
78        consts::{
79            OKX_POST_ONLY_CANCEL_REASON, OKX_POST_ONLY_CANCEL_SOURCE, OKX_POST_ONLY_ERROR_CODE,
80            should_retry_error_code,
81        },
82        enums::{
83            OKXBookAction, OKXInstrumentType, OKXOrderStatus, OKXSide, OKXTargetCurrency,
84            OKXTradeMode,
85        },
86        parse::{
87            determine_order_type, is_market_price, okx_instrument_type, parse_account_state,
88            parse_client_order_id, parse_millisecond_timestamp, parse_position_status_report,
89            parse_price, parse_quantity,
90        },
91    },
92    http::models::{OKXAccount, OKXPosition},
93    websocket::client::{
94        OKX_RATE_LIMIT_KEY_AMEND, OKX_RATE_LIMIT_KEY_CANCEL, OKX_RATE_LIMIT_KEY_ORDER,
95        OKX_RATE_LIMIT_KEY_SUBSCRIPTION,
96    },
97};
98
99/// Data cached for pending place requests to correlate with responses.
100type PlaceRequestData = (
101    PendingOrderParams,
102    ClientOrderId,
103    TraderId,
104    StrategyId,
105    InstrumentId,
106);
107
108/// Data cached for pending cancel requests to correlate with responses.
109type CancelRequestData = (
110    ClientOrderId,
111    TraderId,
112    StrategyId,
113    InstrumentId,
114    Option<VenueOrderId>,
115);
116
117/// Data cached for pending amend requests to correlate with responses.
118type AmendRequestData = (
119    ClientOrderId,
120    TraderId,
121    StrategyId,
122    InstrumentId,
123    Option<VenueOrderId>,
124);
125
126#[derive(Debug)]
127pub enum PendingOrderParams {
128    Regular(WsPostOrderParams),
129    Algo(WsPostAlgoOrderParams),
130}
131
132/// Commands sent from the outer client to the inner message handler.
133#[allow(
134    clippy::large_enum_variant,
135    reason = "Commands are ephemeral and immediately consumed"
136)]
137#[allow(missing_debug_implementations)]
138pub enum HandlerCommand {
139    SetClient(WebSocketClient),
140    Disconnect,
141    Authenticate {
142        payload: String,
143    },
144    InitializeInstruments(Vec<InstrumentAny>),
145    UpdateInstrument(InstrumentAny),
146    Subscribe {
147        args: Vec<OKXSubscriptionArg>,
148    },
149    Unsubscribe {
150        args: Vec<OKXSubscriptionArg>,
151    },
152    PlaceOrder {
153        params: WsPostOrderParams,
154        client_order_id: ClientOrderId,
155        trader_id: TraderId,
156        strategy_id: StrategyId,
157        instrument_id: InstrumentId,
158    },
159    PlaceAlgoOrder {
160        params: WsPostAlgoOrderParams,
161        client_order_id: ClientOrderId,
162        trader_id: TraderId,
163        strategy_id: StrategyId,
164        instrument_id: InstrumentId,
165    },
166    AmendOrder {
167        params: WsAmendOrderParams,
168        client_order_id: ClientOrderId,
169        trader_id: TraderId,
170        strategy_id: StrategyId,
171        instrument_id: InstrumentId,
172        venue_order_id: Option<VenueOrderId>,
173    },
174    CancelOrder {
175        client_order_id: Option<ClientOrderId>,
176        venue_order_id: Option<VenueOrderId>,
177        instrument_id: InstrumentId,
178        trader_id: TraderId,
179        strategy_id: StrategyId,
180    },
181    CancelAlgoOrder {
182        client_order_id: Option<ClientOrderId>,
183        algo_order_id: Option<VenueOrderId>,
184        instrument_id: InstrumentId,
185        trader_id: TraderId,
186        strategy_id: StrategyId,
187    },
188    MassCancel {
189        instrument_id: InstrumentId,
190    },
191    BatchPlaceOrders {
192        args: Vec<Value>,
193        request_id: String,
194    },
195    BatchAmendOrders {
196        args: Vec<Value>,
197        request_id: String,
198    },
199    BatchCancelOrders {
200        args: Vec<Value>,
201        request_id: String,
202    },
203}
204
205pub(super) struct OKXWsFeedHandler {
206    clock: &'static AtomicTime,
207    account_id: AccountId,
208    signal: Arc<AtomicBool>,
209    inner: Option<WebSocketClient>,
210    cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
211    raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
212    out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
213    auth_tracker: AuthTracker,
214    subscriptions_state: SubscriptionState,
215    retry_manager: RetryManager<OKXWsError>,
216    pending_place_requests: AHashMap<String, PlaceRequestData>,
217    pending_cancel_requests: AHashMap<String, CancelRequestData>,
218    pending_amend_requests: AHashMap<String, AmendRequestData>,
219    pending_mass_cancel_requests: AHashMap<String, InstrumentId>,
220    pending_messages: VecDeque<NautilusWsMessage>,
221    active_client_orders: Arc<DashMap<ClientOrderId, (TraderId, StrategyId, InstrumentId)>>,
222    client_id_aliases: Arc<DashMap<ClientOrderId, ClientOrderId>>,
223    inst_id_code_cache: Arc<DashMap<Ustr, u64>>,
224    emitted_accepted: FifoCache<VenueOrderId, 10_000>,
225    instruments_cache: AHashMap<Ustr, InstrumentAny>,
226    fee_cache: AHashMap<Ustr, Money>,
227    filled_qty_cache: AHashMap<Ustr, Quantity>,
228    order_state_cache: AHashMap<ClientOrderId, OrderStateSnapshot>,
229    funding_rate_cache: AHashMap<Ustr, (Ustr, u64)>, // Cache (funding_rate, funding_time) by inst_id
230    last_account_state: Option<AccountState>,
231    request_id_counter: AtomicU64,
232}
233
234impl OKXWsFeedHandler {
235    /// Creates a new [`OKXWsFeedHandler`] instance.
236    #[allow(clippy::too_many_arguments)]
237    pub fn new(
238        account_id: AccountId,
239        signal: Arc<AtomicBool>,
240        cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
241        raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
242        out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
243        active_client_orders: Arc<DashMap<ClientOrderId, (TraderId, StrategyId, InstrumentId)>>,
244        client_id_aliases: Arc<DashMap<ClientOrderId, ClientOrderId>>,
245        inst_id_code_cache: Arc<DashMap<Ustr, u64>>,
246        auth_tracker: AuthTracker,
247        subscriptions_state: SubscriptionState,
248    ) -> Self {
249        Self {
250            clock: get_atomic_clock_realtime(),
251            account_id,
252            signal,
253            inner: None,
254            cmd_rx,
255            raw_rx,
256            out_tx,
257            auth_tracker,
258            subscriptions_state,
259            retry_manager: create_websocket_retry_manager(),
260            pending_place_requests: AHashMap::new(),
261            pending_cancel_requests: AHashMap::new(),
262            pending_amend_requests: AHashMap::new(),
263            pending_mass_cancel_requests: AHashMap::new(),
264            pending_messages: VecDeque::new(),
265            active_client_orders,
266            client_id_aliases,
267            inst_id_code_cache,
268            emitted_accepted: FifoCache::new(),
269            instruments_cache: AHashMap::new(),
270            fee_cache: AHashMap::new(),
271            filled_qty_cache: AHashMap::new(),
272            order_state_cache: AHashMap::new(),
273            funding_rate_cache: AHashMap::new(),
274            last_account_state: None,
275            request_id_counter: AtomicU64::new(0),
276        }
277    }
278
279    pub(super) fn is_stopped(&self) -> bool {
280        self.signal.load(std::sync::atomic::Ordering::Acquire)
281    }
282
283    pub(super) fn send(&self, msg: NautilusWsMessage) -> Result<(), ()> {
284        self.out_tx.send(msg).map_err(|_| ())
285    }
286
287    async fn send_with_retry(
288        &self,
289        payload: String,
290        rate_limit_keys: Option<&[Ustr]>,
291    ) -> Result<(), OKXWsError> {
292        if let Some(client) = &self.inner {
293            let keys_owned: Option<Vec<Ustr>> = rate_limit_keys.map(|k| k.to_vec());
294            self.retry_manager
295                .execute_with_retry(
296                    "websocket_send",
297                    || {
298                        let payload = payload.clone();
299                        let keys = keys_owned.clone();
300                        async move {
301                            client
302                                .send_text(payload, keys.as_deref())
303                                .await
304                                .map_err(|e| OKXWsError::ClientError(format!("Send failed: {e}")))
305                        }
306                    },
307                    should_retry_okx_error,
308                    create_okx_timeout_error,
309                )
310                .await
311        } else {
312            Err(OKXWsError::ClientError(
313                "No active WebSocket client".to_string(),
314            ))
315        }
316    }
317
318    pub(super) async fn send_pong(&self) -> anyhow::Result<()> {
319        match self.send_with_retry(TEXT_PONG.to_string(), None).await {
320            Ok(()) => {
321                log::trace!("Sent pong response to OKX text ping");
322                Ok(())
323            }
324            Err(e) => {
325                log::warn!("Failed to send pong after retries: error={e}");
326                Err(anyhow::anyhow!("Failed to send pong: {e}"))
327            }
328        }
329    }
330
331    pub(super) async fn next(&mut self) -> Option<NautilusWsMessage> {
332        if let Some(message) = self.pending_messages.pop_front() {
333            return Some(message);
334        }
335
336        loop {
337            tokio::select! {
338                Some(cmd) = self.cmd_rx.recv() => {
339                    match cmd {
340                        HandlerCommand::SetClient(client) => {
341                            log::debug!("Handler received WebSocket client");
342                            self.inner = Some(client);
343                        }
344                        HandlerCommand::Disconnect => {
345                            log::debug!("Handler disconnecting WebSocket client");
346                            self.inner = None;
347                            return None;
348                        }
349                        HandlerCommand::Authenticate { payload } => {
350                            if let Err(e) = self.send_with_retry(payload, Some(OKX_RATE_LIMIT_KEY_SUBSCRIPTION.as_slice())).await {
351                                log::error!("Failed to send authentication message after retries: error={e}");
352                            }
353                        }
354                        HandlerCommand::InitializeInstruments(instruments) => {
355                            for inst in instruments {
356                                self.instruments_cache.insert(inst.symbol().inner(), inst);
357                            }
358                        }
359                        HandlerCommand::UpdateInstrument(inst) => {
360                            self.instruments_cache.insert(inst.symbol().inner(), inst);
361                        }
362                        HandlerCommand::Subscribe { args } => {
363                            if let Err(e) = self.handle_subscribe(args).await {
364                                log::error!("Failed to handle subscribe command: error={e}");
365                            }
366                        }
367                        HandlerCommand::Unsubscribe { args } => {
368                            if let Err(e) = self.handle_unsubscribe(args).await {
369                                log::error!("Failed to handle unsubscribe command: error={e}");
370                            }
371                        }
372                        HandlerCommand::CancelOrder {
373                            client_order_id,
374                            venue_order_id,
375                            instrument_id,
376                            trader_id,
377                            strategy_id,
378                        } => {
379                            if let Err(e) = self
380                                .handle_cancel_order(
381                                    client_order_id,
382                                    venue_order_id,
383                                    instrument_id,
384                                    trader_id,
385                                    strategy_id,
386                                )
387                                .await
388                            {
389                                log::error!("Failed to handle cancel order command: error={e}");
390                            }
391                        }
392                        HandlerCommand::CancelAlgoOrder {
393                            client_order_id,
394                            algo_order_id,
395                            instrument_id,
396                            trader_id,
397                            strategy_id,
398                        } => {
399                            if let Err(e) = self
400                                .handle_cancel_algo_order(
401                                    client_order_id,
402                                    algo_order_id,
403                                    instrument_id,
404                                    trader_id,
405                                    strategy_id,
406                                )
407                                .await
408                            {
409                                log::error!("Failed to handle cancel algo order command: error={e}");
410                            }
411                        }
412                        HandlerCommand::PlaceOrder {
413                            params,
414                            client_order_id,
415                            trader_id,
416                            strategy_id,
417                            instrument_id,
418                        } => {
419                            if let Err(e) = self
420                                .handle_place_order(
421                                    params,
422                                    client_order_id,
423                                    trader_id,
424                                    strategy_id,
425                                    instrument_id,
426                                )
427                                .await
428                            {
429                                log::error!("Failed to handle place order command: error={e}");
430                            }
431                        }
432                        HandlerCommand::PlaceAlgoOrder {
433                            params,
434                            client_order_id,
435                            trader_id,
436                            strategy_id,
437                            instrument_id,
438                        } => {
439                            if let Err(e) = self
440                                .handle_place_algo_order(
441                                    params,
442                                    client_order_id,
443                                    trader_id,
444                                    strategy_id,
445                                    instrument_id,
446                                )
447                                .await
448                            {
449                                log::error!("Failed to handle place algo order command: error={e}");
450                            }
451                        }
452                        HandlerCommand::AmendOrder {
453                            params,
454                            client_order_id,
455                            trader_id,
456                            strategy_id,
457                            instrument_id,
458                            venue_order_id,
459                        } => {
460                            if let Err(e) = self
461                                .handle_amend_order(
462                                    params,
463                                    client_order_id,
464                                    trader_id,
465                                    strategy_id,
466                                    instrument_id,
467                                    venue_order_id,
468                                )
469                                .await
470                            {
471                                log::error!("Failed to handle amend order command: error={e}");
472                            }
473                        }
474                        HandlerCommand::MassCancel { instrument_id } => {
475                            if let Err(e) = self.handle_mass_cancel(instrument_id).await {
476                                log::error!("Failed to handle mass cancel command: error={e}");
477                            }
478                        }
479                        HandlerCommand::BatchCancelOrders { args, request_id } => {
480                            if let Err(e) = self.handle_batch_cancel_orders(args, request_id).await {
481                                log::error!("Failed to handle batch cancel orders command: error={e}");
482                            }
483                        }
484                        HandlerCommand::BatchPlaceOrders { args, request_id } => {
485                            if let Err(e) = self.handle_batch_place_orders(args, request_id).await {
486                                log::error!("Failed to handle batch place orders command: error={e}");
487                            }
488                        }
489                        HandlerCommand::BatchAmendOrders { args, request_id } => {
490                            if let Err(e) = self.handle_batch_amend_orders(args, request_id).await {
491                                log::error!("Failed to handle batch amend orders command: error={e}");
492                            }
493                        }
494                    }
495                    // Continue processing following command
496                    continue;
497                }
498
499                () = tokio::time::sleep(std::time::Duration::from_millis(100)) => {
500                    if self.signal.load(std::sync::atomic::Ordering::Acquire) {
501                        log::debug!("Stop signal received during idle period");
502                        return None;
503                    }
504                    continue;
505                }
506
507                msg = self.raw_rx.recv() => {
508                    let event = match msg {
509                        Some(msg) => match Self::parse_raw_message(msg) {
510                            Some(event) => event,
511                            None => continue,
512                        },
513                        None => {
514                            log::debug!("WebSocket stream closed");
515                            return None;
516                        }
517                    };
518
519                    let ts_init = self.clock.get_time_ns();
520
521            match event {
522                OKXWsMessage::Ping => {
523                    if let Err(e) = self.send_pong().await {
524                        log::warn!("Failed to send pong response: error={e}");
525                    }
526                    continue;
527                }
528                OKXWsMessage::Login {
529                    code, msg, conn_id, ..
530                } => {
531                    if code == "0" {
532                        self.auth_tracker.succeed();
533
534                        // Must return immediately to deliver Authenticated message.
535                        // Using push_back() + continue blocks the select! loop and prevents
536                        // the spawn block from receiving this event, breaking reconnection flow.
537                        return Some(NautilusWsMessage::Authenticated);
538                    }
539
540                    log::error!("WebSocket authentication failed: error={msg}");
541                    self.auth_tracker.fail(msg.clone());
542
543                    let error = OKXWebSocketError {
544                        code,
545                        message: msg,
546                        conn_id: Some(conn_id),
547                        timestamp: self.clock.get_time_ns().as_u64(),
548                    };
549                    self.pending_messages
550                        .push_back(NautilusWsMessage::Error(error));
551                    continue;
552                }
553                OKXWsMessage::BookData { arg, action, data } => {
554                    if let Some(msg) = self.handle_book_data(arg, action, data, ts_init) {
555                        return Some(msg);
556                    }
557                    continue;
558                }
559                OKXWsMessage::OrderResponse {
560                    id,
561                    op,
562                    code,
563                    msg,
564                    data,
565                } => {
566                    if let Some(msg) = self.handle_order_response(id, op, code, msg, data, ts_init) {
567                        return Some(msg);
568                    }
569                    continue;
570                }
571                OKXWsMessage::Data { arg, data } => {
572                    let OKXWebSocketArg {
573                        channel, inst_id, ..
574                    } = arg;
575
576                    match channel {
577                        OKXWsChannel::Account => {
578                            if let Some(msg) = self.handle_account_data(data, ts_init) {
579                                return Some(msg);
580                            }
581                            continue;
582                        }
583                        OKXWsChannel::Positions => {
584                            self.handle_positions_data(data, ts_init);
585                            continue;
586                        }
587                        OKXWsChannel::Orders => {
588                            if let Some(msg) = self.handle_orders_data(data, ts_init) {
589                                return Some(msg);
590                            }
591                            continue;
592                        }
593                        OKXWsChannel::OrdersAlgo => {
594                            if let Some(msg) = self.handle_algo_orders_data(data, ts_init) {
595                                return Some(msg);
596                            }
597                            continue;
598                        }
599                        _ => {
600                            if let Some(msg) =
601                                self.handle_other_channel_data(channel, inst_id, data, ts_init)
602                            {
603                                return Some(msg);
604                            }
605                            continue;
606                        }
607                    }
608                }
609                OKXWsMessage::Error { code, msg } => {
610                    let error = OKXWebSocketError {
611                        code,
612                        message: msg,
613                        conn_id: None,
614                        timestamp: self.clock.get_time_ns().as_u64(),
615                    };
616                    return Some(NautilusWsMessage::Error(error));
617                }
618                OKXWsMessage::Reconnected => {
619                    return Some(NautilusWsMessage::Reconnected);
620                }
621                OKXWsMessage::Subscription {
622                    event,
623                    arg,
624                    code,
625                    msg,
626                    ..
627                } => {
628                    let topic = topic_from_websocket_arg(&arg);
629                    let success = code.as_deref().is_none_or(|c| c == "0");
630
631                    match event {
632                        OKXSubscriptionEvent::Subscribe => {
633                            if success {
634                                self.subscriptions_state.confirm_subscribe(&topic);
635                            } else {
636                                log::warn!("Subscription failed: topic={topic:?}, error={msg:?}, code={code:?}");
637                                self.subscriptions_state.mark_failure(&topic);
638                            }
639                        }
640                        OKXSubscriptionEvent::Unsubscribe => {
641                            if success {
642                                self.subscriptions_state.confirm_unsubscribe(&topic);
643                            } else {
644                                log::warn!("Unsubscription failed - restoring subscription: topic={topic:?}, error={msg:?}, code={code:?}");
645                                // Venue rejected unsubscribe, so we're still subscribed. Restore state:
646                                self.subscriptions_state.confirm_unsubscribe(&topic); // Clear pending_unsubscribe
647                                self.subscriptions_state.mark_subscribe(&topic);      // Mark as subscribing
648                                self.subscriptions_state.confirm_subscribe(&topic);   // Confirm subscription
649                            }
650                        }
651                    }
652
653                    continue;
654                }
655                OKXWsMessage::ChannelConnCount { .. } => continue,
656            }
657                }
658
659                // Handle shutdown - either channel closed or stream ended
660                else => {
661                    log::debug!("Handler shutting down: stream ended or command channel closed");
662                    return None;
663                }
664            }
665        }
666    }
667
668    pub(super) fn is_post_only_auto_cancel(msg: &OKXOrderMsg) -> bool {
669        if msg.state != OKXOrderStatus::Canceled {
670            return false;
671        }
672
673        let cancel_source_matches = matches!(
674            msg.cancel_source.as_deref(),
675            Some(source) if source == OKX_POST_ONLY_CANCEL_SOURCE
676        );
677
678        let reason_matches = matches!(
679            msg.cancel_source_reason.as_deref(),
680            Some(reason) if reason.contains("POST_ONLY")
681        );
682
683        if !(cancel_source_matches || reason_matches) {
684            return false;
685        }
686
687        msg.acc_fill_sz
688            .as_ref()
689            .is_none_or(|filled| filled == "0" || filled.is_empty())
690    }
691
692    fn try_handle_post_only_auto_cancel(
693        &mut self,
694        msg: &OKXOrderMsg,
695        ts_init: UnixNanos,
696        exec_reports: &mut Vec<ExecutionReport>,
697    ) -> bool {
698        if !Self::is_post_only_auto_cancel(msg) {
699            return false;
700        }
701
702        let Some(client_order_id) = parse_client_order_id(&msg.cl_ord_id) else {
703            return false;
704        };
705
706        let Some((_, (trader_id, strategy_id, instrument_id))) =
707            self.active_client_orders.remove(&client_order_id)
708        else {
709            return false;
710        };
711
712        self.client_id_aliases.remove(&client_order_id);
713
714        if !exec_reports.is_empty() {
715            let reports = std::mem::take(exec_reports);
716            self.pending_messages
717                .push_back(NautilusWsMessage::ExecutionReports(reports));
718        }
719
720        let reason = msg
721            .cancel_source_reason
722            .as_ref()
723            .filter(|reason| !reason.is_empty())
724            .map_or_else(
725                || Ustr::from(OKX_POST_ONLY_CANCEL_REASON),
726                |reason| Ustr::from(reason.as_str()),
727            );
728
729        let ts_event = parse_millisecond_timestamp(msg.u_time);
730        let rejected = OrderRejected::new(
731            trader_id,
732            strategy_id,
733            instrument_id,
734            client_order_id,
735            self.account_id,
736            reason,
737            UUID4::new(),
738            ts_event,
739            ts_init,
740            false,
741            true,
742        );
743
744        self.pending_messages
745            .push_back(NautilusWsMessage::OrderRejected(rejected));
746
747        true
748    }
749
750    fn register_client_order_aliases(
751        &self,
752        raw_child: &Option<ClientOrderId>,
753        parent_from_msg: &Option<ClientOrderId>,
754    ) -> Option<ClientOrderId> {
755        if let Some(parent) = parent_from_msg {
756            self.client_id_aliases.insert(*parent, *parent);
757            if let Some(child) = raw_child.as_ref().filter(|child| **child != *parent) {
758                self.client_id_aliases.insert(*child, *parent);
759            }
760            Some(*parent)
761        } else if let Some(child) = raw_child.as_ref() {
762            if let Some(mapped) = self.client_id_aliases.get(child) {
763                Some(*mapped.value())
764            } else {
765                self.client_id_aliases.insert(*child, *child);
766                Some(*child)
767            }
768        } else {
769            None
770        }
771    }
772
773    fn adjust_execution_report(
774        &self,
775        report: ExecutionReport,
776        effective_client_id: &Option<ClientOrderId>,
777        raw_child: &Option<ClientOrderId>,
778    ) -> ExecutionReport {
779        match report {
780            ExecutionReport::Order(status_report) => {
781                let mut adjusted = status_report;
782                let mut final_id = *effective_client_id;
783
784                if final_id.is_none() {
785                    final_id = adjusted.client_order_id;
786                }
787
788                if final_id.is_none()
789                    && let Some(child) = raw_child.as_ref()
790                    && let Some(mapped) = self.client_id_aliases.get(child)
791                {
792                    final_id = Some(*mapped.value());
793                }
794
795                if let Some(final_id_value) = final_id {
796                    if adjusted.client_order_id != Some(final_id_value) {
797                        adjusted = adjusted.with_client_order_id(final_id_value);
798                    }
799                    self.client_id_aliases
800                        .insert(final_id_value, final_id_value);
801
802                    if let Some(child) =
803                        raw_child.as_ref().filter(|child| **child != final_id_value)
804                    {
805                        adjusted = adjusted.with_linked_order_ids(vec![*child]);
806                    }
807                }
808
809                ExecutionReport::Order(adjusted)
810            }
811            ExecutionReport::Fill(mut fill_report) => {
812                let mut final_id = *effective_client_id;
813                if final_id.is_none() {
814                    final_id = fill_report.client_order_id;
815                }
816                if final_id.is_none()
817                    && let Some(child) = raw_child.as_ref()
818                    && let Some(mapped) = self.client_id_aliases.get(child)
819                {
820                    final_id = Some(*mapped.value());
821                }
822
823                if let Some(final_id_value) = final_id {
824                    fill_report.client_order_id = Some(final_id_value);
825                    self.client_id_aliases
826                        .insert(final_id_value, final_id_value);
827                }
828
829                ExecutionReport::Fill(fill_report)
830            }
831        }
832    }
833
834    fn update_caches_with_report(&mut self, report: &ExecutionReport) {
835        match report {
836            ExecutionReport::Fill(fill_report) => {
837                let order_id = fill_report.venue_order_id.inner();
838                let current_fee = self
839                    .fee_cache
840                    .get(&order_id)
841                    .copied()
842                    .unwrap_or_else(|| Money::new(0.0, fill_report.commission.currency));
843                let total_fee = current_fee + fill_report.commission;
844                self.fee_cache.insert(order_id, total_fee);
845
846                let current_filled_qty = self
847                    .filled_qty_cache
848                    .get(&order_id)
849                    .copied()
850                    .unwrap_or_else(|| Quantity::zero(fill_report.last_qty.precision));
851                let total_filled_qty = current_filled_qty + fill_report.last_qty;
852                self.filled_qty_cache.insert(order_id, total_filled_qty);
853            }
854            ExecutionReport::Order(status_report) => {
855                if matches!(status_report.order_status, OrderStatus::Filled) {
856                    self.fee_cache.remove(&status_report.venue_order_id.inner());
857                    self.filled_qty_cache
858                        .remove(&status_report.venue_order_id.inner());
859                }
860
861                if matches!(
862                    status_report.order_status,
863                    OrderStatus::Canceled
864                        | OrderStatus::Expired
865                        | OrderStatus::Filled
866                        | OrderStatus::Rejected,
867                ) {
868                    self.emitted_accepted.remove(&status_report.venue_order_id);
869                    if let Some(client_order_id) = status_report.client_order_id {
870                        self.order_state_cache.remove(&client_order_id);
871                        self.active_client_orders.remove(&client_order_id);
872                        self.client_id_aliases.remove(&client_order_id);
873                    }
874                    if let Some(linked) = &status_report.linked_order_ids {
875                        for child in linked {
876                            self.client_id_aliases.remove(child);
877                        }
878                    }
879                }
880            }
881        }
882    }
883
884    #[allow(clippy::too_many_lines)]
885    fn handle_order_response(
886        &mut self,
887        id: Option<String>,
888        op: OKXWsOperation,
889        code: String,
890        msg: String,
891        data: Vec<Value>,
892        ts_init: UnixNanos,
893    ) -> Option<NautilusWsMessage> {
894        if code == "0" {
895            log::debug!("Order operation successful: id={id:?} op={op} code={code}");
896
897            if op == OKXWsOperation::BatchCancelOrders {
898                log::debug!(
899                    "Batch cancel operation successful: id={id:?} cancel_count={}",
900                    data.len()
901                );
902
903                // Check for per-order errors even when top-level code is "0"
904                for (idx, entry) in data.iter().enumerate() {
905                    if let Some(entry_code) = entry.get("sCode").and_then(|v| v.as_str())
906                        && entry_code != "0"
907                    {
908                        let entry_msg = entry
909                            .get("sMsg")
910                            .and_then(|v| v.as_str())
911                            .unwrap_or("Unknown error");
912
913                        if let Some(cl_ord_id_str) = entry
914                            .get("clOrdId")
915                            .and_then(|v| v.as_str())
916                            .filter(|s| !s.is_empty())
917                        {
918                            log::error!(
919                                "Batch cancel partial failure for order {cl_ord_id_str}: sCode={entry_code} sMsg={entry_msg}"
920                            );
921                            // TODO: Emit OrderCancelRejected for this specific order
922                        } else {
923                            log::error!(
924                                "Batch cancel entry[{idx}] failed: sCode={entry_code} sMsg={entry_msg} data={entry:?}"
925                            );
926                        }
927                    }
928                }
929
930                return None;
931            } else if op == OKXWsOperation::MassCancel
932                && let Some(request_id) = &id
933                && let Some(instrument_id) = self.pending_mass_cancel_requests.remove(request_id)
934            {
935                log::debug!("Mass cancel operation successful for instrument: {instrument_id}");
936            } else if op == OKXWsOperation::Order
937                && let Some(request_id) = &id
938                && let Some((params, client_order_id, trader_id, strategy_id, instrument_id)) =
939                    self.pending_place_requests.remove(request_id)
940            {
941                let (venue_order_id, ts_accepted) = if let Some(first) = data.first() {
942                    let ord_id = first
943                        .get("ordId")
944                        .and_then(|v| v.as_str())
945                        .filter(|s| !s.is_empty())
946                        .map(VenueOrderId::new);
947
948                    let ts = first
949                        .get("ts")
950                        .and_then(|v| v.as_str())
951                        .and_then(|s| s.parse::<u64>().ok())
952                        .map_or_else(
953                            || self.clock.get_time_ns(),
954                            |ms| UnixNanos::from(ms * 1_000_000),
955                        );
956
957                    (ord_id, ts)
958                } else {
959                    (None, self.clock.get_time_ns())
960                };
961
962                if let Some(instrument) = self.instruments_cache.get(&instrument_id.symbol.inner())
963                {
964                    match params {
965                        PendingOrderParams::Regular(order_params) => {
966                            let order_type = determine_order_type(
967                                order_params.ord_type,
968                                order_params.px.as_deref().unwrap_or(""),
969                            );
970
971                            let is_explicit_quote_sized = order_params
972                                .tgt_ccy
973                                .is_some_and(|tgt| tgt == OKXTargetCurrency::QuoteCcy);
974
975                            // SPOT market BUY in cash mode with no tgt_ccy defaults to quote-sizing
976                            let is_implicit_quote_sized = order_params.tgt_ccy.is_none()
977                                && order_params.side == OKXSide::Buy
978                                && order_type == OrderType::Market
979                                && order_params.td_mode == OKXTradeMode::Cash
980                                && instrument.instrument_class().as_ref() == "SPOT";
981
982                            if is_explicit_quote_sized || is_implicit_quote_sized {
983                                // For quote-sized orders, sz is in quote currency (USDT),
984                                // not base currency (ETH). We can't accurately parse the
985                                // base quantity without the fill price, so we skip the
986                                // synthetic OrderAccepted and rely on the orders channel
987                                log::debug!(
988                                    "Skipping synthetic OrderAccepted for {} quote-sized order: client_order_id={client_order_id}, venue_order_id={venue_order_id:?}",
989                                    if is_explicit_quote_sized {
990                                        "explicit"
991                                    } else {
992                                        "implicit"
993                                    },
994                                );
995                                return None;
996                            }
997
998                            let Some(v_order_id) = venue_order_id else {
999                                log::error!(
1000                                    "No venue_order_id for accepted order: client_order_id={client_order_id}"
1001                                );
1002                                return None;
1003                            };
1004
1005                            // Check if already emitted from orders channel push
1006                            if self.emitted_accepted.contains(&v_order_id) {
1007                                log::debug!(
1008                                    "Skipping duplicate OrderAccepted from operation response for venue_order_id={v_order_id}"
1009                                );
1010                                return None;
1011                            }
1012                            self.emitted_accepted.add(v_order_id);
1013
1014                            let accepted = OrderAccepted::new(
1015                                trader_id,
1016                                strategy_id,
1017                                instrument_id,
1018                                client_order_id,
1019                                v_order_id,
1020                                self.account_id,
1021                                UUID4::new(),
1022                                ts_accepted,
1023                                ts_init,
1024                                false, // Not from reconciliation
1025                            );
1026
1027                            log::debug!(
1028                                "Order accepted: client_order_id={client_order_id}, venue_order_id={v_order_id}"
1029                            );
1030
1031                            return Some(NautilusWsMessage::OrderAccepted(accepted));
1032                        }
1033                        PendingOrderParams::Algo(_) => {
1034                            log::debug!(
1035                                "Algo order placement confirmed: client_order_id={client_order_id}, venue_order_id={venue_order_id:?}"
1036                            );
1037                        }
1038                    }
1039                } else {
1040                    log::error!("Instrument not found for accepted order: {instrument_id}");
1041                }
1042            }
1043
1044            if let Some(first) = data.first()
1045                && let Some(success_msg) = first.get("sMsg").and_then(|value| value.as_str())
1046            {
1047                log::debug!("Order details: {success_msg}");
1048            }
1049
1050            return None;
1051        }
1052
1053        let error_msg = data
1054            .first()
1055            .and_then(|d| d.get("sMsg"))
1056            .and_then(|s| s.as_str())
1057            .unwrap_or(&msg)
1058            .to_string();
1059
1060        if let Some(first) = data.first() {
1061            log::debug!(
1062                "Error data fields: {}",
1063                serde_json::to_string_pretty(first)
1064                    .unwrap_or_else(|_| "unable to serialize".to_string())
1065            );
1066        }
1067
1068        log::warn!("Order operation failed: id={id:?} op={op} code={code} msg={error_msg}");
1069
1070        let ts_event = self.clock.get_time_ns();
1071
1072        if let Some(request_id) = &id {
1073            match op {
1074                OKXWsOperation::Order => {
1075                    if let Some((_params, client_order_id, trader_id, strategy_id, instrument_id)) =
1076                        self.pending_place_requests.remove(request_id)
1077                    {
1078                        let due_post_only = is_post_only_rejection(code.as_str(), &data);
1079                        let rejected = OrderRejected::new(
1080                            trader_id,
1081                            strategy_id,
1082                            instrument_id,
1083                            client_order_id,
1084                            self.account_id,
1085                            Ustr::from(error_msg.as_str()),
1086                            UUID4::new(),
1087                            ts_event,
1088                            ts_init,
1089                            false, // Not from reconciliation
1090                            due_post_only,
1091                        );
1092
1093                        return Some(NautilusWsMessage::OrderRejected(rejected));
1094                    }
1095                }
1096                OKXWsOperation::CancelOrder => {
1097                    if let Some((
1098                        client_order_id,
1099                        trader_id,
1100                        strategy_id,
1101                        instrument_id,
1102                        venue_order_id,
1103                    )) = self.pending_cancel_requests.remove(request_id)
1104                    {
1105                        let rejected = OrderCancelRejected::new(
1106                            trader_id,
1107                            strategy_id,
1108                            instrument_id,
1109                            client_order_id,
1110                            Ustr::from(error_msg.as_str()),
1111                            UUID4::new(),
1112                            ts_event,
1113                            ts_init,
1114                            false, // Not from reconciliation
1115                            venue_order_id,
1116                            Some(self.account_id),
1117                        );
1118
1119                        return Some(NautilusWsMessage::OrderCancelRejected(rejected));
1120                    }
1121                }
1122                OKXWsOperation::AmendOrder => {
1123                    if let Some((
1124                        client_order_id,
1125                        trader_id,
1126                        strategy_id,
1127                        instrument_id,
1128                        venue_order_id,
1129                    )) = self.pending_amend_requests.remove(request_id)
1130                    {
1131                        let rejected = OrderModifyRejected::new(
1132                            trader_id,
1133                            strategy_id,
1134                            instrument_id,
1135                            client_order_id,
1136                            Ustr::from(error_msg.as_str()),
1137                            UUID4::new(),
1138                            ts_event,
1139                            ts_init,
1140                            false, // Not from reconciliation
1141                            venue_order_id,
1142                            Some(self.account_id),
1143                        );
1144
1145                        return Some(NautilusWsMessage::OrderModifyRejected(rejected));
1146                    }
1147                }
1148                OKXWsOperation::OrderAlgo => {
1149                    if let Some((_params, client_order_id, trader_id, strategy_id, instrument_id)) =
1150                        self.pending_place_requests.remove(request_id)
1151                    {
1152                        let due_post_only = is_post_only_rejection(code.as_str(), &data);
1153                        let rejected = OrderRejected::new(
1154                            trader_id,
1155                            strategy_id,
1156                            instrument_id,
1157                            client_order_id,
1158                            self.account_id,
1159                            Ustr::from(error_msg.as_str()),
1160                            UUID4::new(),
1161                            ts_event,
1162                            ts_init,
1163                            false, // Not from reconciliation
1164                            due_post_only,
1165                        );
1166
1167                        return Some(NautilusWsMessage::OrderRejected(rejected));
1168                    }
1169                }
1170                OKXWsOperation::CancelAlgos => {
1171                    if let Some((
1172                        client_order_id,
1173                        trader_id,
1174                        strategy_id,
1175                        instrument_id,
1176                        venue_order_id,
1177                    )) = self.pending_cancel_requests.remove(request_id)
1178                    {
1179                        let rejected = OrderCancelRejected::new(
1180                            trader_id,
1181                            strategy_id,
1182                            instrument_id,
1183                            client_order_id,
1184                            Ustr::from(error_msg.as_str()),
1185                            UUID4::new(),
1186                            ts_event,
1187                            ts_init,
1188                            false, // Not from reconciliation
1189                            venue_order_id,
1190                            Some(self.account_id),
1191                        );
1192
1193                        return Some(NautilusWsMessage::OrderCancelRejected(rejected));
1194                    }
1195                }
1196                OKXWsOperation::MassCancel => {
1197                    if let Some(instrument_id) =
1198                        self.pending_mass_cancel_requests.remove(request_id)
1199                    {
1200                        log::error!(
1201                            "Mass cancel operation failed for {instrument_id}: code={code} msg={error_msg}"
1202                        );
1203                        let error = OKXWebSocketError {
1204                            code,
1205                            message: format!("Mass cancel failed for {instrument_id}: {error_msg}"),
1206                            conn_id: None,
1207                            timestamp: ts_event.as_u64(),
1208                        };
1209                        return Some(NautilusWsMessage::Error(error));
1210                    } else {
1211                        log::error!("Mass cancel operation failed: code={code} msg={error_msg}");
1212                    }
1213                }
1214                OKXWsOperation::BatchCancelOrders => {
1215                    log::warn!(
1216                        "Batch cancel operation failed: id={id:?} code={code} msg={error_msg} data_count={}",
1217                        data.len()
1218                    );
1219
1220                    // Iterate through data array to check per-order errors
1221                    for (idx, entry) in data.iter().enumerate() {
1222                        let entry_code =
1223                            entry.get("sCode").and_then(|v| v.as_str()).unwrap_or(&code);
1224                        let entry_msg = entry
1225                            .get("sMsg")
1226                            .and_then(|v| v.as_str())
1227                            .unwrap_or(&error_msg);
1228
1229                        if entry_code != "0" {
1230                            // Try to extract client order ID for targeted error events
1231                            if let Some(cl_ord_id_str) = entry
1232                                .get("clOrdId")
1233                                .and_then(|v| v.as_str())
1234                                .filter(|s| !s.is_empty())
1235                            {
1236                                log::error!(
1237                                    "Batch cancel failed for order {cl_ord_id_str}: sCode={entry_code} sMsg={entry_msg}"
1238                                );
1239                                // TODO: Emit OrderCancelRejected event once we track
1240                                // batch cancel metadata (client_order_id, trader_id, etc.)
1241                            } else {
1242                                log::error!(
1243                                    "Batch cancel entry[{idx}] failed: sCode={entry_code} sMsg={entry_msg} data={entry:?}"
1244                                );
1245                            }
1246                        }
1247                    }
1248
1249                    // Emit generic error for the batch operation
1250                    let error = OKXWebSocketError {
1251                        code,
1252                        message: format!("Batch cancel failed: {error_msg}"),
1253                        conn_id: None,
1254                        timestamp: ts_event.as_u64(),
1255                    };
1256                    return Some(NautilusWsMessage::Error(error));
1257                }
1258                _ => log::warn!("Unhandled operation type for rejection: {op}"),
1259            }
1260        }
1261
1262        let error = OKXWebSocketError {
1263            code,
1264            message: error_msg,
1265            conn_id: None,
1266            timestamp: ts_event.as_u64(),
1267        };
1268        Some(NautilusWsMessage::Error(error))
1269    }
1270
1271    fn handle_book_data(
1272        &self,
1273        arg: OKXWebSocketArg,
1274        action: OKXBookAction,
1275        data: Vec<OKXBookMsg>,
1276        ts_init: UnixNanos,
1277    ) -> Option<NautilusWsMessage> {
1278        let Some(inst_id) = arg.inst_id else {
1279            log::error!("Instrument ID missing for book data event");
1280            return None;
1281        };
1282
1283        let inst = self.instruments_cache.get(&inst_id)?;
1284
1285        let instrument_id = inst.id();
1286        let price_precision = inst.price_precision();
1287        let size_precision = inst.size_precision();
1288
1289        match parse_book_msg_vec(
1290            data,
1291            &instrument_id,
1292            price_precision,
1293            size_precision,
1294            action,
1295            ts_init,
1296        ) {
1297            Ok(payloads) => Some(NautilusWsMessage::Data(payloads)),
1298            Err(e) => {
1299                log::error!("Failed to parse book message: {e}");
1300                None
1301            }
1302        }
1303    }
1304
1305    fn handle_account_data(
1306        &mut self,
1307        data: Value,
1308        ts_init: UnixNanos,
1309    ) -> Option<NautilusWsMessage> {
1310        let Value::Array(arr) = data else {
1311            log::error!("Account data is not an array");
1312            return None;
1313        };
1314
1315        let first = arr.into_iter().next()?;
1316
1317        let account: OKXAccount = match serde_json::from_value(first) {
1318            Ok(acc) => acc,
1319            Err(e) => {
1320                log::error!("Failed to parse account data: {e}");
1321                return None;
1322            }
1323        };
1324
1325        match parse_account_state(&account, self.account_id, ts_init) {
1326            Ok(account_state) => {
1327                if let Some(last_account_state) = &self.last_account_state
1328                    && account_state.has_same_balances_and_margins(last_account_state)
1329                {
1330                    return None;
1331                }
1332                self.last_account_state = Some(account_state.clone());
1333                Some(NautilusWsMessage::AccountUpdate(account_state))
1334            }
1335            Err(e) => {
1336                log::error!("Failed to parse account state: {e}");
1337                None
1338            }
1339        }
1340    }
1341
1342    fn handle_positions_data(&mut self, data: Value, ts_init: UnixNanos) {
1343        match serde_json::from_value::<Vec<OKXPosition>>(data) {
1344            Ok(positions) => {
1345                log::debug!("Received {} position update(s)", positions.len());
1346
1347                for position in positions {
1348                    let instrument = match self.instruments_cache.get(&position.inst_id) {
1349                        Some(inst) => inst,
1350                        None => {
1351                            log::warn!(
1352                                "Received position update for unknown instrument {}, skipping",
1353                                position.inst_id
1354                            );
1355                            continue;
1356                        }
1357                    };
1358
1359                    let instrument_id = instrument.id();
1360                    let size_precision = instrument.size_precision();
1361
1362                    match parse_position_status_report(
1363                        position,
1364                        self.account_id,
1365                        instrument_id,
1366                        size_precision,
1367                        ts_init,
1368                    ) {
1369                        Ok(position_report) => {
1370                            self.pending_messages
1371                                .push_back(NautilusWsMessage::PositionUpdate(position_report));
1372                        }
1373                        Err(e) => {
1374                            log::error!(
1375                                "Failed to parse position status report for {instrument_id}: {e}"
1376                            );
1377                        }
1378                    }
1379                }
1380            }
1381            Err(e) => {
1382                log::error!("Failed to parse positions data: {e}");
1383            }
1384        }
1385    }
1386
1387    fn handle_orders_data(&mut self, data: Value, ts_init: UnixNanos) -> Option<NautilusWsMessage> {
1388        let orders: Vec<OKXOrderMsg> = match serde_json::from_value(data) {
1389            Ok(orders) => orders,
1390            Err(e) => {
1391                log::error!("Failed to deserialize orders channel payload: {e}");
1392                return None;
1393            }
1394        };
1395
1396        log::debug!(
1397            "Received {} order message(s) from orders channel",
1398            orders.len()
1399        );
1400
1401        let mut exec_reports: Vec<ExecutionReport> = Vec::with_capacity(orders.len());
1402
1403        for msg in orders {
1404            log::debug!(
1405                "Processing order message: inst_id={}, cl_ord_id={}, state={:?}, exec_type={:?}",
1406                msg.inst_id,
1407                msg.cl_ord_id,
1408                msg.state,
1409                msg.exec_type
1410            );
1411
1412            if self.try_handle_post_only_auto_cancel(&msg, ts_init, &mut exec_reports) {
1413                continue;
1414            }
1415
1416            let raw_child = parse_client_order_id(&msg.cl_ord_id);
1417            let parent_from_msg = msg
1418                .algo_cl_ord_id
1419                .as_ref()
1420                .filter(|value| !value.is_empty())
1421                .map(ClientOrderId::new);
1422            let effective_client_id =
1423                self.register_client_order_aliases(&raw_child, &parent_from_msg);
1424
1425            let Some(instrument) = self.instruments_cache.get(&msg.inst_id) else {
1426                log::error!(
1427                    "No instrument found for inst_id: {inst_id}",
1428                    inst_id = msg.inst_id
1429                );
1430                continue;
1431            };
1432            let price_precision = instrument.price_precision();
1433            let size_precision = instrument.size_precision();
1434
1435            let order_metadata = effective_client_id
1436                .and_then(|cid| self.active_client_orders.get(&cid).map(|e| *e.value()));
1437
1438            let previous_fee = self.fee_cache.get(&msg.ord_id).copied();
1439            let previous_filled_qty = self.filled_qty_cache.get(&msg.ord_id).copied();
1440            let previous_state =
1441                effective_client_id.and_then(|cid| self.order_state_cache.get(&cid).cloned());
1442
1443            // SAFETY: order_metadata being Some implies effective_client_id is Some
1444            if let (Some((trader_id, strategy_id, _instrument_id)), Some(canonical_client_id)) =
1445                (order_metadata, effective_client_id)
1446            {
1447                match parse_order_event(
1448                    &msg,
1449                    canonical_client_id,
1450                    self.account_id,
1451                    trader_id,
1452                    strategy_id,
1453                    instrument,
1454                    previous_fee,
1455                    previous_filled_qty,
1456                    previous_state.as_ref(),
1457                    ts_init,
1458                ) {
1459                    Ok(event) => {
1460                        self.process_parsed_order_event(
1461                            event,
1462                            &msg,
1463                            price_precision,
1464                            size_precision,
1465                            canonical_client_id,
1466                            &raw_child,
1467                            &mut exec_reports,
1468                        );
1469                    }
1470                    Err(e) => log::error!("Failed to parse order event: {e}"),
1471                }
1472            } else {
1473                // External order or not tracked - use old parse_order_msg for backward compatibility
1474                match parse_order_msg(
1475                    &msg,
1476                    self.account_id,
1477                    &self.instruments_cache,
1478                    &self.fee_cache,
1479                    &self.filled_qty_cache,
1480                    ts_init,
1481                ) {
1482                    Ok(report) => {
1483                        log::debug!("Parsed external order as execution report: {report:?}");
1484                        let adjusted =
1485                            self.adjust_execution_report(report, &effective_client_id, &raw_child);
1486                        self.update_caches_with_report(&adjusted);
1487                        exec_reports.push(adjusted);
1488                    }
1489                    Err(e) => log::error!("Failed to parse order message: {e}"),
1490                }
1491            }
1492        }
1493
1494        if !exec_reports.is_empty() {
1495            log::debug!(
1496                "Pushing {count} execution report(s) to message queue",
1497                count = exec_reports.len()
1498            );
1499            self.pending_messages
1500                .push_back(NautilusWsMessage::ExecutionReports(exec_reports));
1501        }
1502
1503        self.pending_messages.pop_front()
1504    }
1505
1506    /// Processes a parsed order event and emits the appropriate message.
1507    #[allow(clippy::too_many_arguments)]
1508    fn process_parsed_order_event(
1509        &mut self,
1510        event: ParsedOrderEvent,
1511        msg: &OKXOrderMsg,
1512        price_precision: u8,
1513        size_precision: u8,
1514        canonical_client_id: ClientOrderId,
1515        raw_child: &Option<ClientOrderId>,
1516        exec_reports: &mut Vec<ExecutionReport>,
1517    ) {
1518        let venue_order_id = VenueOrderId::new(msg.ord_id);
1519
1520        match event {
1521            ParsedOrderEvent::Accepted(accepted) => {
1522                if self.emitted_accepted.contains(&venue_order_id) {
1523                    log::debug!(
1524                        "Skipping duplicate OrderAccepted for venue_order_id={venue_order_id}"
1525                    );
1526                    return;
1527                }
1528                self.emitted_accepted.add(venue_order_id);
1529                self.update_order_state_cache(
1530                    &canonical_client_id,
1531                    msg,
1532                    price_precision,
1533                    size_precision,
1534                );
1535
1536                self.pending_messages
1537                    .push_back(NautilusWsMessage::OrderAccepted(accepted));
1538            }
1539            ParsedOrderEvent::Canceled(canceled) => {
1540                self.cleanup_terminal_order(&canonical_client_id, &venue_order_id);
1541                self.pending_messages
1542                    .push_back(NautilusWsMessage::OrderCanceled(canceled));
1543            }
1544            ParsedOrderEvent::Expired(expired) => {
1545                self.cleanup_terminal_order(&canonical_client_id, &venue_order_id);
1546                self.pending_messages
1547                    .push_back(NautilusWsMessage::OrderExpired(expired));
1548            }
1549            ParsedOrderEvent::Triggered(triggered) => {
1550                self.update_order_state_cache(
1551                    &canonical_client_id,
1552                    msg,
1553                    price_precision,
1554                    size_precision,
1555                );
1556                self.pending_messages
1557                    .push_back(NautilusWsMessage::OrderTriggered(triggered));
1558            }
1559            ParsedOrderEvent::Updated(updated) => {
1560                self.update_order_state_cache(
1561                    &canonical_client_id,
1562                    msg,
1563                    price_precision,
1564                    size_precision,
1565                );
1566                self.pending_messages
1567                    .push_back(NautilusWsMessage::OrderUpdated(updated));
1568            }
1569            ParsedOrderEvent::Fill(fill_report) => {
1570                let effective_client_id = Some(canonical_client_id);
1571                let adjusted = self.adjust_execution_report(
1572                    ExecutionReport::Fill(fill_report),
1573                    &effective_client_id,
1574                    raw_child,
1575                );
1576                self.update_caches_with_report(&adjusted);
1577
1578                if msg.state == OKXOrderStatus::Filled {
1579                    self.cleanup_terminal_order(&canonical_client_id, &venue_order_id);
1580                }
1581
1582                exec_reports.push(adjusted);
1583            }
1584            ParsedOrderEvent::StatusOnly(status_report) => {
1585                let effective_client_id = Some(canonical_client_id);
1586                let adjusted = self.adjust_execution_report(
1587                    ExecutionReport::Order(*status_report),
1588                    &effective_client_id,
1589                    raw_child,
1590                );
1591                self.update_caches_with_report(&adjusted);
1592                exec_reports.push(adjusted);
1593            }
1594        }
1595    }
1596
1597    /// Updates the order state cache for detecting future updates.
1598    fn update_order_state_cache(
1599        &mut self,
1600        client_order_id: &ClientOrderId,
1601        msg: &OKXOrderMsg,
1602        price_precision: u8,
1603        size_precision: u8,
1604    ) {
1605        let venue_order_id = VenueOrderId::new(msg.ord_id);
1606        let quantity = parse_quantity(&msg.sz, size_precision).ok();
1607        let price = if is_market_price(&msg.px) {
1608            None
1609        } else {
1610            parse_price(&msg.px, price_precision).ok()
1611        };
1612
1613        if let Some(qty) = quantity {
1614            self.order_state_cache.insert(
1615                *client_order_id,
1616                OrderStateSnapshot {
1617                    venue_order_id,
1618                    quantity: qty,
1619                    price,
1620                },
1621            );
1622        }
1623    }
1624
1625    /// Cleans up tracking state for terminal orders.
1626    fn cleanup_terminal_order(
1627        &mut self,
1628        client_order_id: &ClientOrderId,
1629        venue_order_id: &VenueOrderId,
1630    ) {
1631        self.emitted_accepted.remove(venue_order_id);
1632        self.order_state_cache.remove(client_order_id);
1633        self.active_client_orders.remove(client_order_id);
1634        self.client_id_aliases.remove(client_order_id);
1635        self.client_id_aliases.retain(|_, v| *v != *client_order_id);
1636
1637        self.fee_cache.remove(&venue_order_id.inner());
1638        self.filled_qty_cache.remove(&venue_order_id.inner());
1639    }
1640
1641    fn handle_algo_orders_data(
1642        &mut self,
1643        data: Value,
1644        ts_init: UnixNanos,
1645    ) -> Option<NautilusWsMessage> {
1646        let orders: Vec<OKXAlgoOrderMsg> = match serde_json::from_value(data) {
1647            Ok(orders) => orders,
1648            Err(e) => {
1649                log::error!("Failed to deserialize algo orders payload: {e}");
1650                return None;
1651            }
1652        };
1653
1654        let mut exec_reports: Vec<ExecutionReport> = Vec::with_capacity(orders.len());
1655
1656        for msg in orders {
1657            let raw_child = parse_client_order_id(&msg.cl_ord_id);
1658            let parent_from_msg = parse_client_order_id(&msg.algo_cl_ord_id);
1659            let effective_client_id =
1660                self.register_client_order_aliases(&raw_child, &parent_from_msg);
1661
1662            match parse_algo_order_msg(msg, self.account_id, &self.instruments_cache, ts_init) {
1663                Ok(report) => {
1664                    let adjusted =
1665                        self.adjust_execution_report(report, &effective_client_id, &raw_child);
1666                    self.update_caches_with_report(&adjusted);
1667                    exec_reports.push(adjusted);
1668                }
1669                Err(e) => {
1670                    log::error!("Failed to parse algo order message: {e}");
1671                }
1672            }
1673        }
1674
1675        if exec_reports.is_empty() {
1676            None
1677        } else {
1678            Some(NautilusWsMessage::ExecutionReports(exec_reports))
1679        }
1680    }
1681
1682    fn handle_other_channel_data(
1683        &mut self,
1684        channel: OKXWsChannel,
1685        inst_id: Option<Ustr>,
1686        data: Value,
1687        ts_init: UnixNanos,
1688    ) -> Option<NautilusWsMessage> {
1689        let Some(inst_id) = inst_id else {
1690            log::error!("No instrument for channel {channel:?}");
1691            return None;
1692        };
1693
1694        let Some(instrument) = self.instruments_cache.get(&inst_id) else {
1695            log::error!("No instrument for channel {channel:?}, inst_id {inst_id:?}");
1696            return None;
1697        };
1698
1699        let instrument_id = instrument.id();
1700        let price_precision = instrument.price_precision();
1701        let size_precision = instrument.size_precision();
1702
1703        match parse_ws_message_data(
1704            &channel,
1705            data,
1706            &instrument_id,
1707            price_precision,
1708            size_precision,
1709            ts_init,
1710            &mut self.funding_rate_cache,
1711            &self.instruments_cache,
1712        ) {
1713            Ok(Some(msg)) => {
1714                if let NautilusWsMessage::Instrument(ref inst) = msg {
1715                    self.instruments_cache
1716                        .insert(inst.symbol().inner(), inst.as_ref().clone());
1717                }
1718                Some(msg)
1719            }
1720            Ok(None) => None,
1721            Err(e) => {
1722                log::error!("Error parsing message for channel {channel:?}: {e}");
1723                None
1724            }
1725        }
1726    }
1727
1728    pub(crate) fn parse_raw_message(
1729        msg: tokio_tungstenite::tungstenite::Message,
1730    ) -> Option<OKXWsMessage> {
1731        match msg {
1732            tokio_tungstenite::tungstenite::Message::Text(text) => {
1733                if text == TEXT_PONG {
1734                    log::trace!("Received pong from OKX");
1735                    return None;
1736                }
1737                if text == TEXT_PING {
1738                    log::trace!("Received ping from OKX (text)");
1739                    return Some(OKXWsMessage::Ping);
1740                }
1741
1742                if text == RECONNECTED {
1743                    log::debug!("Received WebSocket reconnection signal");
1744                    return Some(OKXWsMessage::Reconnected);
1745                }
1746                log::trace!("Received WebSocket message: {text}");
1747
1748                match serde_json::from_str(&text) {
1749                    Ok(ws_event) => match &ws_event {
1750                        OKXWsMessage::Error { code, msg } => {
1751                            log::error!("WebSocket error: {code} - {msg}");
1752                            Some(ws_event)
1753                        }
1754                        OKXWsMessage::Login {
1755                            event,
1756                            code,
1757                            msg,
1758                            conn_id,
1759                        } => {
1760                            if code == "0" {
1761                                log::info!("WebSocket authenticated: conn_id={conn_id}");
1762                            } else {
1763                                log::error!(
1764                                    "WebSocket authentication failed: event={event}, code={code}, error={msg}"
1765                                );
1766                            }
1767                            Some(ws_event)
1768                        }
1769                        OKXWsMessage::Subscription {
1770                            event,
1771                            arg,
1772                            conn_id,
1773                            ..
1774                        } => {
1775                            let channel_str = serde_json::to_string(&arg.channel)
1776                                .expect("Invalid OKX websocket channel")
1777                                .trim_matches('"')
1778                                .to_string();
1779                            log::debug!("{event}d: channel={channel_str}, conn_id={conn_id}");
1780                            Some(ws_event)
1781                        }
1782                        OKXWsMessage::ChannelConnCount {
1783                            event: _,
1784                            channel,
1785                            conn_count,
1786                            conn_id,
1787                        } => {
1788                            let channel_str = serde_json::to_string(&channel)
1789                                .expect("Invalid OKX websocket channel")
1790                                .trim_matches('"')
1791                                .to_string();
1792                            log::debug!(
1793                                "Channel connection status: channel={channel_str}, connections={conn_count}, conn_id={conn_id}",
1794                            );
1795                            None
1796                        }
1797                        OKXWsMessage::Ping => {
1798                            log::trace!("Ignoring ping event parsed from text payload");
1799                            None
1800                        }
1801                        OKXWsMessage::Data { .. } => Some(ws_event),
1802                        OKXWsMessage::BookData { .. } => Some(ws_event),
1803                        OKXWsMessage::OrderResponse {
1804                            id,
1805                            op,
1806                            code,
1807                            msg: _,
1808                            data,
1809                        } => {
1810                            if code == "0" {
1811                                log::debug!(
1812                                    "Order operation successful: id={id:?}, op={op}, code={code}"
1813                                );
1814
1815                                if let Some(order_data) = data.first() {
1816                                    let success_msg = order_data
1817                                        .get("sMsg")
1818                                        .and_then(|s| s.as_str())
1819                                        .unwrap_or("Order operation successful");
1820                                    log::debug!("Order success details: {success_msg}");
1821                                }
1822                            }
1823                            Some(ws_event)
1824                        }
1825                        OKXWsMessage::Reconnected => {
1826                            // This shouldn't happen as we handle RECONNECTED string directly
1827                            log::warn!("Unexpected Reconnected event from deserialization");
1828                            None
1829                        }
1830                    },
1831                    Err(e) => {
1832                        log::error!("Failed to parse message: {e}: {text}");
1833                        None
1834                    }
1835                }
1836            }
1837            Message::Ping(_payload) => {
1838                log::trace!("Received binary ping frame from OKX");
1839                Some(OKXWsMessage::Ping)
1840            }
1841            Message::Pong(payload) => {
1842                log::trace!("Received pong frame from OKX ({} bytes)", payload.len());
1843                None
1844            }
1845            Message::Binary(msg) => {
1846                log::debug!("Raw binary: {msg:?}");
1847                None
1848            }
1849            Message::Close(_) => {
1850                log::debug!("Received close message");
1851                None
1852            }
1853            msg => {
1854                log::warn!("Unexpected message: {msg}");
1855                None
1856            }
1857        }
1858    }
1859
1860    fn generate_unique_request_id(&self) -> String {
1861        self.request_id_counter
1862            .fetch_add(1, Ordering::SeqCst)
1863            .to_string()
1864    }
1865
1866    fn get_instrument_type_and_family_from_instrument(
1867        instrument: &InstrumentAny,
1868    ) -> anyhow::Result<(OKXInstrumentType, String)> {
1869        let inst_type = okx_instrument_type(instrument)?;
1870        let symbol = instrument.symbol().inner();
1871
1872        // Determine instrument family based on instrument type
1873        let inst_family = match instrument {
1874            InstrumentAny::CurrencyPair(_) => symbol.as_str().to_string(),
1875            InstrumentAny::CryptoPerpetual(_) => {
1876                // For SWAP: "BTC-USDT-SWAP" -> "BTC-USDT"
1877                symbol
1878                    .as_str()
1879                    .strip_suffix("-SWAP")
1880                    .unwrap_or(symbol.as_str())
1881                    .to_string()
1882            }
1883            InstrumentAny::CryptoFuture(_) => {
1884                // For FUTURES: "BTC-USDT-250328" -> "BTC-USDT"
1885                // Extract the base pair by removing date suffix
1886                let s = symbol.as_str();
1887                if let Some(idx) = s.rfind('-') {
1888                    s[..idx].to_string()
1889                } else {
1890                    s.to_string()
1891                }
1892            }
1893            _ => {
1894                anyhow::bail!("Unsupported instrument type for OKX");
1895            }
1896        };
1897
1898        Ok((inst_type, inst_family))
1899    }
1900
1901    async fn handle_mass_cancel(&mut self, instrument_id: InstrumentId) -> anyhow::Result<()> {
1902        let instrument = self
1903            .instruments_cache
1904            .get(&instrument_id.symbol.inner())
1905            .ok_or_else(|| anyhow::anyhow!("Unknown instrument {instrument_id}"))?;
1906
1907        let (inst_type, inst_family) =
1908            Self::get_instrument_type_and_family_from_instrument(instrument)?;
1909
1910        let params = WsMassCancelParams {
1911            inst_type,
1912            inst_family: Ustr::from(&inst_family),
1913        };
1914
1915        let args =
1916            vec![serde_json::to_value(params).map_err(|e| anyhow::anyhow!("JSON error: {e}"))?];
1917
1918        let request_id = self.generate_unique_request_id();
1919
1920        self.pending_mass_cancel_requests
1921            .insert(request_id.clone(), instrument_id);
1922
1923        let request = OKXWsRequest {
1924            id: Some(request_id.clone()),
1925            op: OKXWsOperation::MassCancel,
1926            exp_time: None,
1927            args,
1928        };
1929
1930        let payload = serde_json::to_string(&request)
1931            .map_err(|e| anyhow::anyhow!("Failed to serialize mass cancel request: {e}"))?;
1932
1933        match self
1934            .send_with_retry(payload, Some(OKX_RATE_LIMIT_KEY_CANCEL.as_slice()))
1935            .await
1936        {
1937            Ok(()) => {
1938                log::debug!("Sent mass cancel for {instrument_id}");
1939                Ok(())
1940            }
1941            Err(e) => {
1942                log::error!("Failed to send mass cancel after retries: error={e}");
1943
1944                self.pending_mass_cancel_requests.remove(&request_id);
1945
1946                let error = OKXWebSocketError {
1947                    code: "CLIENT_ERROR".to_string(),
1948                    message: format!("Mass cancel failed for {instrument_id}: {e}"),
1949                    conn_id: None,
1950                    timestamp: self.clock.get_time_ns().as_u64(),
1951                };
1952                let _ = self.send(NautilusWsMessage::Error(error));
1953
1954                Err(anyhow::anyhow!("Failed to send mass cancel: {e}"))
1955            }
1956        }
1957    }
1958
1959    async fn handle_batch_cancel_orders(
1960        &self,
1961        args: Vec<Value>,
1962        request_id: String,
1963    ) -> anyhow::Result<()> {
1964        let request = OKXWsRequest {
1965            id: Some(request_id),
1966            op: OKXWsOperation::BatchCancelOrders,
1967            exp_time: None,
1968            args,
1969        };
1970
1971        let payload = serde_json::to_string(&request)
1972            .map_err(|e| anyhow::anyhow!("Failed to serialize batch cancel request: {e}"))?;
1973
1974        if let Some(client) = &self.inner {
1975            client
1976                .send_text(payload, Some(OKX_RATE_LIMIT_KEY_CANCEL.as_slice()))
1977                .await
1978                .map_err(|e| anyhow::anyhow!("Failed to send batch cancel: {e}"))?;
1979            log::debug!("Sent batch cancel orders");
1980            Ok(())
1981        } else {
1982            Err(anyhow::anyhow!("No active WebSocket client"))
1983        }
1984    }
1985
1986    async fn handle_batch_place_orders(
1987        &self,
1988        args: Vec<Value>,
1989        request_id: String,
1990    ) -> anyhow::Result<()> {
1991        let request = OKXWsRequest {
1992            id: Some(request_id),
1993            op: OKXWsOperation::BatchOrders,
1994            exp_time: None,
1995            args,
1996        };
1997
1998        let payload = serde_json::to_string(&request)
1999            .map_err(|e| anyhow::anyhow!("Failed to serialize batch place request: {e}"))?;
2000
2001        if let Some(client) = &self.inner {
2002            client
2003                .send_text(payload, Some(OKX_RATE_LIMIT_KEY_ORDER.as_slice()))
2004                .await
2005                .map_err(|e| anyhow::anyhow!("Failed to send batch place: {e}"))?;
2006            log::debug!("Sent batch place orders");
2007            Ok(())
2008        } else {
2009            Err(anyhow::anyhow!("No active WebSocket client"))
2010        }
2011    }
2012
2013    async fn handle_batch_amend_orders(
2014        &self,
2015        args: Vec<Value>,
2016        request_id: String,
2017    ) -> anyhow::Result<()> {
2018        let request = OKXWsRequest {
2019            id: Some(request_id),
2020            op: OKXWsOperation::BatchAmendOrders,
2021            exp_time: None,
2022            args,
2023        };
2024
2025        let payload = serde_json::to_string(&request)
2026            .map_err(|e| anyhow::anyhow!("Failed to serialize batch amend request: {e}"))?;
2027
2028        if let Some(client) = &self.inner {
2029            client
2030                .send_text(payload, Some(OKX_RATE_LIMIT_KEY_AMEND.as_slice()))
2031                .await
2032                .map_err(|e| anyhow::anyhow!("Failed to send batch amend: {e}"))?;
2033            log::debug!("Sent batch amend orders");
2034            Ok(())
2035        } else {
2036            Err(anyhow::anyhow!("No active WebSocket client"))
2037        }
2038    }
2039
2040    async fn handle_subscribe(&self, args: Vec<OKXSubscriptionArg>) -> anyhow::Result<()> {
2041        for arg in &args {
2042            log::debug!(
2043                "Subscribing to channel: channel={:?}, inst_id={:?}",
2044                arg.channel,
2045                arg.inst_id
2046            );
2047        }
2048
2049        let message = OKXSubscription {
2050            op: OKXWsOperation::Subscribe,
2051            args,
2052        };
2053
2054        let json_txt = serde_json::to_string(&message)
2055            .map_err(|e| anyhow::anyhow!("Failed to serialize subscription: {e}"))?;
2056
2057        self.send_with_retry(json_txt, Some(OKX_RATE_LIMIT_KEY_SUBSCRIPTION.as_slice()))
2058            .await
2059            .map_err(|e| anyhow::anyhow!("Failed to send subscription after retries: {e}"))?;
2060        Ok(())
2061    }
2062
2063    async fn handle_unsubscribe(&self, args: Vec<OKXSubscriptionArg>) -> anyhow::Result<()> {
2064        for arg in &args {
2065            log::debug!(
2066                "Unsubscribing from channel: channel={:?}, inst_id={:?}",
2067                arg.channel,
2068                arg.inst_id
2069            );
2070        }
2071
2072        let message = OKXSubscription {
2073            op: OKXWsOperation::Unsubscribe,
2074            args,
2075        };
2076
2077        let json_txt = serde_json::to_string(&message)
2078            .map_err(|e| anyhow::anyhow!("Failed to serialize unsubscription: {e}"))?;
2079
2080        self.send_with_retry(json_txt, Some(OKX_RATE_LIMIT_KEY_SUBSCRIPTION.as_slice()))
2081            .await
2082            .map_err(|e| anyhow::anyhow!("Failed to send unsubscription after retries: {e}"))?;
2083        Ok(())
2084    }
2085
2086    async fn handle_place_order(
2087        &mut self,
2088        params: WsPostOrderParams,
2089        client_order_id: ClientOrderId,
2090        trader_id: TraderId,
2091        strategy_id: StrategyId,
2092        instrument_id: InstrumentId,
2093    ) -> anyhow::Result<()> {
2094        let request_id = self.generate_unique_request_id();
2095
2096        self.pending_place_requests.insert(
2097            request_id.clone(),
2098            (
2099                PendingOrderParams::Regular(params.clone()),
2100                client_order_id,
2101                trader_id,
2102                strategy_id,
2103                instrument_id,
2104            ),
2105        );
2106
2107        let request = OKXWsRequest {
2108            id: Some(request_id.clone()),
2109            op: OKXWsOperation::Order,
2110            exp_time: None,
2111            args: vec![params],
2112        };
2113
2114        let payload = serde_json::to_string(&request)
2115            .map_err(|e| anyhow::anyhow!("Failed to serialize place order request: {e}"))?;
2116
2117        match self
2118            .send_with_retry(payload, Some(OKX_RATE_LIMIT_KEY_ORDER.as_slice()))
2119            .await
2120        {
2121            Ok(()) => {
2122                log::debug!("Sent place order request");
2123                Ok(())
2124            }
2125            Err(e) => {
2126                log::error!("Failed to send place order after retries: error={e}");
2127
2128                self.pending_place_requests.remove(&request_id);
2129
2130                let ts_now = self.clock.get_time_ns();
2131                let rejected = OrderRejected::new(
2132                    trader_id,
2133                    strategy_id,
2134                    instrument_id,
2135                    client_order_id,
2136                    self.account_id,
2137                    Ustr::from(&format!("WebSocket send failed: {e}")),
2138                    UUID4::new(),
2139                    ts_now, // ts_event
2140                    ts_now, // ts_init
2141                    false,  // Not from reconciliation
2142                    false,  // Not due to post-only
2143                );
2144                let _ = self.send(NautilusWsMessage::OrderRejected(rejected));
2145
2146                Err(anyhow::anyhow!("Failed to send place order: {e}"))
2147            }
2148        }
2149    }
2150
2151    async fn handle_place_algo_order(
2152        &mut self,
2153        params: WsPostAlgoOrderParams,
2154        client_order_id: ClientOrderId,
2155        trader_id: TraderId,
2156        strategy_id: StrategyId,
2157        instrument_id: InstrumentId,
2158    ) -> anyhow::Result<()> {
2159        let request_id = self.generate_unique_request_id();
2160
2161        self.pending_place_requests.insert(
2162            request_id.clone(),
2163            (
2164                PendingOrderParams::Algo(params.clone()),
2165                client_order_id,
2166                trader_id,
2167                strategy_id,
2168                instrument_id,
2169            ),
2170        );
2171
2172        let request = OKXWsRequest {
2173            id: Some(request_id.clone()),
2174            op: OKXWsOperation::OrderAlgo,
2175            exp_time: None,
2176            args: vec![params],
2177        };
2178
2179        let payload = serde_json::to_string(&request)
2180            .map_err(|e| anyhow::anyhow!("Failed to serialize place algo order request: {e}"))?;
2181
2182        match self
2183            .send_with_retry(payload, Some(OKX_RATE_LIMIT_KEY_ORDER.as_slice()))
2184            .await
2185        {
2186            Ok(()) => {
2187                log::debug!("Sent place algo order request");
2188                Ok(())
2189            }
2190            Err(e) => {
2191                log::error!("Failed to send place algo order after retries: error={e}");
2192
2193                self.pending_place_requests.remove(&request_id);
2194
2195                let ts_now = self.clock.get_time_ns();
2196                let rejected = OrderRejected::new(
2197                    trader_id,
2198                    strategy_id,
2199                    instrument_id,
2200                    client_order_id,
2201                    self.account_id,
2202                    Ustr::from(&format!("WebSocket send failed: {e}")),
2203                    UUID4::new(),
2204                    ts_now, // ts_event
2205                    ts_now, // ts_init
2206                    false,  // Not from reconciliation
2207                    false,  // Not due to post-only
2208                );
2209                let _ = self.send(NautilusWsMessage::OrderRejected(rejected));
2210
2211                Err(anyhow::anyhow!("Failed to send place algo order: {e}"))
2212            }
2213        }
2214    }
2215
2216    async fn handle_cancel_order(
2217        &mut self,
2218        client_order_id: Option<ClientOrderId>,
2219        venue_order_id: Option<VenueOrderId>,
2220        instrument_id: InstrumentId,
2221        trader_id: TraderId,
2222        strategy_id: StrategyId,
2223    ) -> anyhow::Result<()> {
2224        let mut builder = WsCancelOrderParamsBuilder::default();
2225        builder.inst_id(instrument_id.symbol.as_str());
2226
2227        // Look up instIdCode from cache (required for WebSocket orders per OKX deprecation)
2228        if let Some(inst_id_code) = self.inst_id_code_cache.get(&instrument_id.symbol.inner()) {
2229            builder.inst_id_code(*inst_id_code.value());
2230        }
2231
2232        if let Some(venue_order_id) = venue_order_id {
2233            builder.ord_id(venue_order_id.as_str());
2234        }
2235
2236        if let Some(client_order_id) = client_order_id {
2237            builder.cl_ord_id(client_order_id.as_str());
2238        }
2239
2240        let params = builder
2241            .build()
2242            .map_err(|e| anyhow::anyhow!("Failed to build cancel params: {e}"))?;
2243
2244        let request_id = self.generate_unique_request_id();
2245
2246        // Track pending request if we have a client order ID
2247        if let Some(client_order_id) = client_order_id {
2248            self.pending_cancel_requests.insert(
2249                request_id.clone(),
2250                (
2251                    client_order_id,
2252                    trader_id,
2253                    strategy_id,
2254                    instrument_id,
2255                    venue_order_id,
2256                ),
2257            );
2258        }
2259
2260        let request = OKXWsRequest {
2261            id: Some(request_id.clone()),
2262            op: OKXWsOperation::CancelOrder,
2263            exp_time: None,
2264            args: vec![params],
2265        };
2266
2267        let payload = serde_json::to_string(&request)
2268            .map_err(|e| anyhow::anyhow!("Failed to serialize cancel request: {e}"))?;
2269
2270        match self
2271            .send_with_retry(payload, Some(OKX_RATE_LIMIT_KEY_CANCEL.as_slice()))
2272            .await
2273        {
2274            Ok(()) => {
2275                log::debug!("Sent cancel order request");
2276                Ok(())
2277            }
2278            Err(e) => {
2279                log::error!("Failed to send cancel order after retries: error={e}");
2280
2281                self.pending_cancel_requests.remove(&request_id);
2282
2283                if let Some(client_order_id) = client_order_id {
2284                    let ts_now = self.clock.get_time_ns();
2285                    let rejected = OrderCancelRejected::new(
2286                        trader_id,
2287                        strategy_id,
2288                        instrument_id,
2289                        client_order_id,
2290                        Ustr::from(&format!("WebSocket send failed: {e}")),
2291                        UUID4::new(),
2292                        ts_now, // ts_event
2293                        ts_now, // ts_init
2294                        false,  // Not from reconciliation
2295                        venue_order_id,
2296                        Some(self.account_id),
2297                    );
2298                    let _ = self.send(NautilusWsMessage::OrderCancelRejected(rejected));
2299                }
2300
2301                Err(anyhow::anyhow!("Failed to send cancel order: {e}"))
2302            }
2303        }
2304    }
2305
2306    async fn handle_amend_order(
2307        &mut self,
2308        params: WsAmendOrderParams,
2309        client_order_id: ClientOrderId,
2310        trader_id: TraderId,
2311        strategy_id: StrategyId,
2312        instrument_id: InstrumentId,
2313        venue_order_id: Option<VenueOrderId>,
2314    ) -> anyhow::Result<()> {
2315        let request_id = self.generate_unique_request_id();
2316
2317        self.pending_amend_requests.insert(
2318            request_id.clone(),
2319            (
2320                client_order_id,
2321                trader_id,
2322                strategy_id,
2323                instrument_id,
2324                venue_order_id,
2325            ),
2326        );
2327
2328        let request = OKXWsRequest {
2329            id: Some(request_id.clone()),
2330            op: OKXWsOperation::AmendOrder,
2331            exp_time: None,
2332            args: vec![params],
2333        };
2334
2335        let payload = serde_json::to_string(&request)
2336            .map_err(|e| anyhow::anyhow!("Failed to serialize amend order request: {e}"))?;
2337
2338        match self
2339            .send_with_retry(payload, Some(OKX_RATE_LIMIT_KEY_AMEND.as_slice()))
2340            .await
2341        {
2342            Ok(()) => {
2343                log::debug!("Sent amend order request");
2344                Ok(())
2345            }
2346            Err(e) => {
2347                log::error!("Failed to send amend order after retries: error={e}");
2348
2349                self.pending_amend_requests.remove(&request_id);
2350
2351                let ts_now = self.clock.get_time_ns();
2352                let rejected = OrderModifyRejected::new(
2353                    trader_id,
2354                    strategy_id,
2355                    instrument_id,
2356                    client_order_id,
2357                    Ustr::from(&format!("WebSocket send failed: {e}")),
2358                    UUID4::new(),
2359                    ts_now, // ts_event
2360                    ts_now, // ts_init
2361                    false,  // Not from reconciliation
2362                    venue_order_id,
2363                    Some(self.account_id),
2364                );
2365                let _ = self.send(NautilusWsMessage::OrderModifyRejected(rejected));
2366
2367                Err(anyhow::anyhow!("Failed to send amend order: {e}"))
2368            }
2369        }
2370    }
2371
2372    async fn handle_cancel_algo_order(
2373        &mut self,
2374        client_order_id: Option<ClientOrderId>,
2375        algo_order_id: Option<VenueOrderId>,
2376        instrument_id: InstrumentId,
2377        trader_id: TraderId,
2378        strategy_id: StrategyId,
2379    ) -> anyhow::Result<()> {
2380        let mut builder = WsCancelAlgoOrderParamsBuilder::default();
2381        builder.inst_id(instrument_id.symbol.as_str());
2382
2383        // Look up instIdCode from cache (required for WebSocket orders per OKX deprecation)
2384        if let Some(inst_id_code) = self.inst_id_code_cache.get(&instrument_id.symbol.inner()) {
2385            builder.inst_id_code(*inst_id_code.value());
2386        }
2387
2388        if let Some(client_order_id) = &client_order_id {
2389            builder.algo_cl_ord_id(client_order_id.as_str());
2390        }
2391
2392        if let Some(algo_id) = &algo_order_id {
2393            builder.algo_id(algo_id.as_str());
2394        }
2395
2396        let params = builder
2397            .build()
2398            .map_err(|e| anyhow::anyhow!("Failed to build cancel algo params: {e}"))?;
2399
2400        let request_id = self.generate_unique_request_id();
2401
2402        // Track pending cancellation if we have a client order ID
2403        if let Some(client_order_id) = client_order_id {
2404            self.pending_cancel_requests.insert(
2405                request_id.clone(),
2406                (client_order_id, trader_id, strategy_id, instrument_id, None),
2407            );
2408        }
2409
2410        let request = OKXWsRequest {
2411            id: Some(request_id.clone()),
2412            op: OKXWsOperation::CancelAlgos,
2413            exp_time: None,
2414            args: vec![params],
2415        };
2416
2417        let payload = serde_json::to_string(&request)
2418            .map_err(|e| anyhow::anyhow!("Failed to serialize cancel algo request: {e}"))?;
2419
2420        match self
2421            .send_with_retry(payload, Some(OKX_RATE_LIMIT_KEY_CANCEL.as_slice()))
2422            .await
2423        {
2424            Ok(()) => {
2425                log::debug!("Sent cancel algo order request");
2426                Ok(())
2427            }
2428            Err(e) => {
2429                log::error!("Failed to send cancel algo order after retries: error={e}");
2430
2431                self.pending_cancel_requests.remove(&request_id);
2432
2433                if let Some(client_order_id) = client_order_id {
2434                    let ts_now = self.clock.get_time_ns();
2435                    let rejected = OrderCancelRejected::new(
2436                        trader_id,
2437                        strategy_id,
2438                        instrument_id,
2439                        client_order_id,
2440                        Ustr::from(&format!("WebSocket send failed: {e}")),
2441                        UUID4::new(),
2442                        ts_now, // ts_event
2443                        ts_now, // ts_init
2444                        false,  // Not from reconciliation
2445                        None,
2446                        Some(self.account_id),
2447                    );
2448                    let _ = self.send(NautilusWsMessage::OrderCancelRejected(rejected));
2449                }
2450
2451                Err(anyhow::anyhow!("Failed to send cancel algo order: {e}"))
2452            }
2453        }
2454    }
2455}
2456
2457/// Returns `true` when an OKX error payload represents a post-only rejection.
2458pub fn is_post_only_rejection(code: &str, data: &[Value]) -> bool {
2459    if code == OKX_POST_ONLY_ERROR_CODE {
2460        return true;
2461    }
2462
2463    for entry in data {
2464        if let Some(s_code) = entry.get("sCode").and_then(|value| value.as_str())
2465            && s_code == OKX_POST_ONLY_ERROR_CODE
2466        {
2467            return true;
2468        }
2469
2470        if let Some(inner_code) = entry.get("code").and_then(|value| value.as_str())
2471            && inner_code == OKX_POST_ONLY_ERROR_CODE
2472        {
2473            return true;
2474        }
2475    }
2476
2477    false
2478}
2479
2480/// Case-insensitive substring check.
2481#[inline]
2482fn contains_ignore_ascii_case(haystack: &str, needle: &str) -> bool {
2483    haystack
2484        .as_bytes()
2485        .windows(needle.len())
2486        .any(|window| window.eq_ignore_ascii_case(needle.as_bytes()))
2487}
2488
2489/// Determines if an OKX WebSocket error should trigger a retry.
2490fn should_retry_okx_error(error: &OKXWsError) -> bool {
2491    match error {
2492        OKXWsError::OkxError { error_code, .. } => should_retry_error_code(error_code),
2493        OKXWsError::TungsteniteError(_) => true, // Network errors are retryable
2494        OKXWsError::ClientError(msg) => {
2495            // Retry on timeout and connection errors
2496            contains_ignore_ascii_case(msg, "timeout")
2497                || contains_ignore_ascii_case(msg, "timed out")
2498                || contains_ignore_ascii_case(msg, "connection")
2499                || contains_ignore_ascii_case(msg, "network")
2500        }
2501        OKXWsError::AuthenticationError(_)
2502        | OKXWsError::JsonError(_)
2503        | OKXWsError::ParsingError(_) => {
2504            // Don't retry authentication or parsing errors automatically
2505            false
2506        }
2507    }
2508}
2509
2510/// Creates a timeout error for the retry manager.
2511fn create_okx_timeout_error(msg: String) -> OKXWsError {
2512    OKXWsError::ClientError(msg)
2513}
2514
2515#[cfg(test)]
2516mod tests {
2517    use std::sync::{Arc, atomic::AtomicBool};
2518
2519    use dashmap::DashMap;
2520    use nautilus_model::{
2521        identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
2522        types::{Money, Quantity},
2523    };
2524    use nautilus_network::websocket::{AuthTracker, SubscriptionState};
2525    use rstest::rstest;
2526
2527    use super::{NautilusWsMessage, OKXWsFeedHandler};
2528    use crate::websocket::parse::OrderStateSnapshot;
2529
2530    const OKX_WS_TOPIC_DELIMITER: char = ':';
2531
2532    #[allow(clippy::type_complexity)]
2533    fn create_test_handler() -> (
2534        OKXWsFeedHandler,
2535        tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>,
2536        Arc<DashMap<ClientOrderId, (TraderId, StrategyId, InstrumentId)>>,
2537        Arc<DashMap<ClientOrderId, ClientOrderId>>,
2538    ) {
2539        let account_id = AccountId::new("OKX-001");
2540        let signal = Arc::new(AtomicBool::new(false));
2541        let (_cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
2542        let (_raw_tx, raw_rx) = tokio::sync::mpsc::unbounded_channel();
2543        let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel();
2544        let active_client_orders = Arc::new(DashMap::new());
2545        let client_id_aliases = Arc::new(DashMap::new());
2546        let inst_id_code_cache = Arc::new(DashMap::new());
2547        let auth_tracker = AuthTracker::new();
2548        let subscriptions_state = SubscriptionState::new(OKX_WS_TOPIC_DELIMITER);
2549
2550        let handler = OKXWsFeedHandler::new(
2551            account_id,
2552            signal,
2553            cmd_rx,
2554            raw_rx,
2555            out_tx,
2556            active_client_orders.clone(),
2557            client_id_aliases.clone(),
2558            inst_id_code_cache,
2559            auth_tracker,
2560            subscriptions_state,
2561        );
2562
2563        (handler, out_rx, active_client_orders, client_id_aliases)
2564    }
2565
2566    #[rstest]
2567    fn test_is_post_only_rejection_detects_by_code() {
2568        assert!(super::is_post_only_rejection("51019", &[]));
2569    }
2570
2571    #[rstest]
2572    fn test_is_post_only_rejection_detects_by_inner_code() {
2573        let data = vec![serde_json::json!({
2574            "sCode": "51019"
2575        })];
2576        assert!(super::is_post_only_rejection("50000", &data));
2577    }
2578
2579    #[rstest]
2580    fn test_is_post_only_rejection_false_for_unrelated_error() {
2581        let data = vec![serde_json::json!({
2582            "sMsg": "Insufficient balance"
2583        })];
2584        assert!(!super::is_post_only_rejection("50000", &data));
2585    }
2586
2587    #[rstest]
2588    fn test_handler_register_client_order_aliases_with_parent() {
2589        let (handler, _out_rx, _active, client_id_aliases) = create_test_handler();
2590
2591        let child = Some(ClientOrderId::new("CHILD-001"));
2592        let parent = Some(ClientOrderId::new("PARENT-001"));
2593
2594        let result = handler.register_client_order_aliases(&child, &parent);
2595
2596        assert_eq!(result, Some(ClientOrderId::new("PARENT-001")));
2597        assert!(client_id_aliases.contains_key(&ClientOrderId::new("PARENT-001")));
2598        assert!(client_id_aliases.contains_key(&ClientOrderId::new("CHILD-001")));
2599        assert_eq!(
2600            *client_id_aliases
2601                .get(&ClientOrderId::new("CHILD-001"))
2602                .unwrap(),
2603            ClientOrderId::new("PARENT-001")
2604        );
2605    }
2606
2607    #[rstest]
2608    fn test_handler_register_client_order_aliases_without_parent() {
2609        let (handler, _out_rx, _active, client_id_aliases) = create_test_handler();
2610
2611        let child = Some(ClientOrderId::new("ORDER-001"));
2612        let parent: Option<ClientOrderId> = None;
2613
2614        let result = handler.register_client_order_aliases(&child, &parent);
2615
2616        assert_eq!(result, Some(ClientOrderId::new("ORDER-001")));
2617        assert!(client_id_aliases.contains_key(&ClientOrderId::new("ORDER-001")));
2618        assert_eq!(
2619            *client_id_aliases
2620                .get(&ClientOrderId::new("ORDER-001"))
2621                .unwrap(),
2622            ClientOrderId::new("ORDER-001")
2623        );
2624    }
2625
2626    #[rstest]
2627    fn test_handler_cleanup_terminal_order_removes_all_state() {
2628        let (mut handler, _out_rx, active_client_orders, client_id_aliases) = create_test_handler();
2629
2630        let canonical = ClientOrderId::new("PARENT-001");
2631        let child = ClientOrderId::new("CHILD-001");
2632        let venue_id = VenueOrderId::new("VENUE-001");
2633        let trader_id = TraderId::new("TRADER-001");
2634        let strategy_id = StrategyId::new("STRATEGY-001");
2635        let instrument_id = InstrumentId::from("ETH-USDT-PERP.OKX");
2636
2637        active_client_orders.insert(canonical, (trader_id, strategy_id, instrument_id));
2638        client_id_aliases.insert(canonical, canonical);
2639        client_id_aliases.insert(child, canonical);
2640        handler
2641            .fee_cache
2642            .insert(venue_id.inner(), Money::from("0.001 USDT"));
2643        handler
2644            .filled_qty_cache
2645            .insert(venue_id.inner(), Quantity::from("1.0"));
2646        handler.order_state_cache.insert(
2647            canonical,
2648            OrderStateSnapshot {
2649                venue_order_id: venue_id,
2650                quantity: Quantity::from("1.0"),
2651                price: None,
2652            },
2653        );
2654
2655        handler.cleanup_terminal_order(&canonical, &venue_id);
2656
2657        assert!(!active_client_orders.contains_key(&canonical));
2658        assert!(!client_id_aliases.contains_key(&canonical));
2659        assert!(!client_id_aliases.contains_key(&child));
2660        assert!(!handler.fee_cache.contains_key(&venue_id.inner()));
2661        assert!(!handler.filled_qty_cache.contains_key(&venue_id.inner()));
2662        assert!(!handler.order_state_cache.contains_key(&canonical));
2663    }
2664
2665    #[rstest]
2666    fn test_handler_cleanup_terminal_order_removes_multiple_children() {
2667        let (mut handler, _out_rx, _active, client_id_aliases) = create_test_handler();
2668
2669        let canonical = ClientOrderId::new("PARENT-001");
2670        let child1 = ClientOrderId::new("CHILD-001");
2671        let child2 = ClientOrderId::new("CHILD-002");
2672        let child3 = ClientOrderId::new("CHILD-003");
2673        let venue_id = VenueOrderId::new("VENUE-001");
2674
2675        client_id_aliases.insert(canonical, canonical);
2676        client_id_aliases.insert(child1, canonical);
2677        client_id_aliases.insert(child2, canonical);
2678        client_id_aliases.insert(child3, canonical);
2679
2680        handler.cleanup_terminal_order(&canonical, &venue_id);
2681
2682        assert!(!client_id_aliases.contains_key(&canonical));
2683        assert!(!client_id_aliases.contains_key(&child1));
2684        assert!(!client_id_aliases.contains_key(&child2));
2685        assert!(!client_id_aliases.contains_key(&child3));
2686        assert!(client_id_aliases.is_empty());
2687    }
2688
2689    #[rstest]
2690    fn test_handler_cleanup_does_not_affect_other_orders() {
2691        let (mut handler, _out_rx, active_client_orders, client_id_aliases) = create_test_handler();
2692
2693        let canonical1 = ClientOrderId::new("PARENT-001");
2694        let child1 = ClientOrderId::new("CHILD-001");
2695        let venue_id1 = VenueOrderId::new("VENUE-001");
2696
2697        let canonical2 = ClientOrderId::new("PARENT-002");
2698        let child2 = ClientOrderId::new("CHILD-002");
2699        let venue_id2 = VenueOrderId::new("VENUE-002");
2700
2701        let trader_id = TraderId::new("TRADER-001");
2702        let strategy_id = StrategyId::new("STRATEGY-001");
2703        let instrument_id = InstrumentId::from("ETH-USDT-PERP.OKX");
2704
2705        active_client_orders.insert(canonical1, (trader_id, strategy_id, instrument_id));
2706        active_client_orders.insert(canonical2, (trader_id, strategy_id, instrument_id));
2707        client_id_aliases.insert(canonical1, canonical1);
2708        client_id_aliases.insert(child1, canonical1);
2709        client_id_aliases.insert(canonical2, canonical2);
2710        client_id_aliases.insert(child2, canonical2);
2711        handler
2712            .fee_cache
2713            .insert(venue_id1.inner(), Money::from("0.001 USDT"));
2714        handler
2715            .fee_cache
2716            .insert(venue_id2.inner(), Money::from("0.002 USDT"));
2717
2718        handler.cleanup_terminal_order(&canonical1, &venue_id1);
2719
2720        assert!(!active_client_orders.contains_key(&canonical1));
2721        assert!(!client_id_aliases.contains_key(&canonical1));
2722        assert!(!client_id_aliases.contains_key(&child1));
2723        assert!(!handler.fee_cache.contains_key(&venue_id1.inner()));
2724
2725        assert!(active_client_orders.contains_key(&canonical2));
2726        assert!(client_id_aliases.contains_key(&canonical2));
2727        assert!(client_id_aliases.contains_key(&child2));
2728        assert!(handler.fee_cache.contains_key(&venue_id2.inner()));
2729    }
2730
2731    mod channel_routing {
2732        use nautilus_core::nanos::UnixNanos;
2733        use nautilus_model::{
2734            identifiers::{InstrumentId, Symbol},
2735            instruments::{CryptoPerpetual, CurrencyPair, Instrument, InstrumentAny},
2736            types::{Currency, Price, Quantity},
2737        };
2738        use rstest::rstest;
2739        use ustr::Ustr;
2740
2741        use super::*;
2742        use crate::{
2743            common::{enums::OKXBookAction, testing::load_test_json},
2744            websocket::{enums::OKXWsChannel, messages::OKXWsMessage},
2745        };
2746
2747        fn create_spot_instrument() -> InstrumentAny {
2748            let instrument_id = InstrumentId::from("BTC-USDT.OKX");
2749            InstrumentAny::CurrencyPair(CurrencyPair::new(
2750                instrument_id,
2751                Symbol::from("BTC-USDT"),
2752                Currency::BTC(),
2753                Currency::USDT(),
2754                2,
2755                8,
2756                Price::from("0.01"),
2757                Quantity::from("0.00000001"),
2758                None, // multiplier
2759                None, // lot_size
2760                None, // max_quantity
2761                None, // min_quantity
2762                None, // max_notional
2763                None, // min_notional
2764                None, // max_price
2765                None, // min_price
2766                None, // margin_init
2767                None, // margin_maint
2768                None, // maker_fee
2769                None, // taker_fee
2770                UnixNanos::default(),
2771                UnixNanos::default(),
2772            ))
2773        }
2774
2775        fn create_swap_instrument() -> InstrumentAny {
2776            let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
2777            InstrumentAny::CryptoPerpetual(CryptoPerpetual::new(
2778                instrument_id,
2779                Symbol::from("BTC-USDT-SWAP"),
2780                Currency::BTC(),
2781                Currency::USDT(),
2782                Currency::USDT(),
2783                false,
2784                2,
2785                8,
2786                Price::from("0.01"),
2787                Quantity::from("0.00000001"),
2788                None,
2789                None,
2790                None,
2791                None,
2792                None,
2793                None,
2794                None,
2795                None,
2796                None,
2797                None,
2798                None,
2799                None,
2800                UnixNanos::default(),
2801                UnixNanos::default(),
2802            ))
2803        }
2804
2805        fn create_handler_with_instruments(instruments: Vec<InstrumentAny>) -> OKXWsFeedHandler {
2806            let (mut handler, _, _, _) = create_test_handler();
2807            for inst in instruments {
2808                handler
2809                    .instruments_cache
2810                    .insert(inst.symbol().inner(), inst);
2811            }
2812            handler
2813        }
2814
2815        #[rstest]
2816        fn test_parse_raw_message_ticker_channel() {
2817            let json = load_test_json("ws_tickers.json");
2818            let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
2819
2820            match msg {
2821                OKXWsMessage::Data { arg, data } => {
2822                    assert!(
2823                        matches!(arg.channel, OKXWsChannel::Tickers),
2824                        "Expected Tickers channel"
2825                    );
2826                    assert_eq!(arg.inst_id, Some(Ustr::from("BTC-USDT")));
2827                    assert!(data.is_array());
2828                }
2829                _ => panic!("Expected OKXWsMessage::Data variant"),
2830            }
2831        }
2832
2833        #[rstest]
2834        fn test_parse_raw_message_trades_channel() {
2835            let json = load_test_json("ws_trades.json");
2836            let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
2837
2838            match msg {
2839                OKXWsMessage::Data { arg, data } => {
2840                    assert!(
2841                        matches!(arg.channel, OKXWsChannel::Trades),
2842                        "Expected Trades channel"
2843                    );
2844                    assert_eq!(arg.inst_id, Some(Ustr::from("BTC-USD")));
2845                    assert!(data.is_array());
2846                }
2847                _ => panic!("Expected OKXWsMessage::Data variant"),
2848            }
2849        }
2850
2851        #[rstest]
2852        fn test_parse_raw_message_books_channel() {
2853            let json = load_test_json("ws_books_snapshot.json");
2854            let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
2855
2856            match msg {
2857                OKXWsMessage::BookData { arg, action, data } => {
2858                    assert!(
2859                        matches!(arg.channel, OKXWsChannel::Books),
2860                        "Expected Books channel"
2861                    );
2862                    assert_eq!(arg.inst_id, Some(Ustr::from("BTC-USDT")));
2863                    assert!(
2864                        matches!(action, OKXBookAction::Snapshot),
2865                        "Expected snapshot action"
2866                    );
2867                    assert!(!data.is_empty());
2868                }
2869                _ => panic!("Expected OKXWsMessage::BookData variant"),
2870            }
2871        }
2872
2873        #[rstest]
2874        fn test_parse_raw_message_candle_channel() {
2875            let json = load_test_json("ws_candle.json");
2876            let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
2877
2878            match msg {
2879                OKXWsMessage::Data { arg, data } => {
2880                    // Candle channel variant is Candle1Day for "candle1D"
2881                    assert!(
2882                        matches!(arg.channel, OKXWsChannel::Candle1Day),
2883                        "Expected Candle1Day channel, was {:?}",
2884                        arg.channel
2885                    );
2886                    assert_eq!(arg.inst_id, Some(Ustr::from("BTC-USDT")));
2887                    assert!(data.is_array());
2888                }
2889                _ => panic!("Expected OKXWsMessage::Data variant"),
2890            }
2891        }
2892
2893        #[rstest]
2894        fn test_parse_raw_message_funding_rate_channel() {
2895            let json = load_test_json("ws_funding_rate.json");
2896            let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
2897
2898            match msg {
2899                OKXWsMessage::Data { arg, data } => {
2900                    assert!(
2901                        matches!(arg.channel, OKXWsChannel::FundingRate),
2902                        "Expected FundingRate channel"
2903                    );
2904                    assert_eq!(arg.inst_id, Some(Ustr::from("BTC-USDT-SWAP")));
2905                    assert!(data.is_array());
2906                }
2907                _ => panic!("Expected OKXWsMessage::Data variant"),
2908            }
2909        }
2910
2911        #[rstest]
2912        fn test_parse_raw_message_bbo_tbt_channel() {
2913            let json = load_test_json("ws_bbo_tbt.json");
2914            let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
2915
2916            match msg {
2917                OKXWsMessage::Data { arg, data } => {
2918                    assert!(
2919                        matches!(arg.channel, OKXWsChannel::BboTbt),
2920                        "Expected BboTbt channel"
2921                    );
2922                    assert!(data.is_array());
2923                }
2924                _ => panic!("Expected OKXWsMessage::Data variant"),
2925            }
2926        }
2927
2928        #[rstest]
2929        fn test_handle_other_channel_data_tickers() {
2930            let mut handler = create_handler_with_instruments(vec![create_spot_instrument()]);
2931            let json = load_test_json("ws_tickers.json");
2932            let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
2933
2934            let OKXWsMessage::Data { arg, data } = msg else {
2935                panic!("Expected OKXWsMessage::Data");
2936            };
2937
2938            let ts_init = UnixNanos::from(1_000_000_000u64);
2939            let result = handler.handle_other_channel_data(arg.channel, arg.inst_id, data, ts_init);
2940
2941            assert!(result.is_some());
2942            match result.unwrap() {
2943                NautilusWsMessage::Data(payloads) => {
2944                    assert!(!payloads.is_empty(), "Should produce data payloads");
2945                }
2946                other => panic!("Expected NautilusWsMessage::Data, was {other:?}"),
2947            }
2948        }
2949
2950        #[rstest]
2951        fn test_handle_other_channel_data_trades() {
2952            // Create instrument with BTC-USD symbol (matches test data)
2953            let instrument_id = InstrumentId::from("BTC-USD.OKX");
2954            let instrument = InstrumentAny::CurrencyPair(CurrencyPair::new(
2955                instrument_id,
2956                Symbol::from("BTC-USD"),
2957                Currency::BTC(),
2958                Currency::USD(),
2959                1,
2960                8,
2961                Price::from("0.1"),
2962                Quantity::from("0.00000001"),
2963                None, // multiplier
2964                None, // lot_size
2965                None, // max_quantity
2966                None, // min_quantity
2967                None, // max_notional
2968                None, // min_notional
2969                None, // max_price
2970                None, // min_price
2971                None, // margin_init
2972                None, // margin_maint
2973                None, // maker_fee
2974                None, // taker_fee
2975                UnixNanos::default(),
2976                UnixNanos::default(),
2977            ));
2978
2979            let mut handler = create_handler_with_instruments(vec![instrument]);
2980            let json = load_test_json("ws_trades.json");
2981            let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
2982
2983            let OKXWsMessage::Data { arg, data } = msg else {
2984                panic!("Expected OKXWsMessage::Data");
2985            };
2986
2987            let ts_init = UnixNanos::from(1_000_000_000u64);
2988            let result = handler.handle_other_channel_data(arg.channel, arg.inst_id, data, ts_init);
2989
2990            assert!(result.is_some());
2991            match result.unwrap() {
2992                NautilusWsMessage::Data(payloads) => {
2993                    assert!(!payloads.is_empty(), "Should produce trade data payloads");
2994                }
2995                other => panic!("Expected NautilusWsMessage::Data, was {other:?}"),
2996            }
2997        }
2998
2999        #[rstest]
3000        fn test_handle_book_data_snapshot() {
3001            let handler = create_handler_with_instruments(vec![create_spot_instrument()]);
3002            let json = load_test_json("ws_books_snapshot.json");
3003            let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3004
3005            let OKXWsMessage::BookData { arg, action, data } = msg else {
3006                panic!("Expected OKXWsMessage::BookData");
3007            };
3008
3009            let ts_init = UnixNanos::from(1_000_000_000u64);
3010            let result = handler.handle_book_data(arg, action, data, ts_init);
3011
3012            assert!(result.is_some());
3013            match result.unwrap() {
3014                NautilusWsMessage::Data(payloads) => {
3015                    assert!(!payloads.is_empty(), "Should produce order book payloads");
3016                }
3017                other => panic!("Expected NautilusWsMessage::Data, was {other:?}"),
3018            }
3019        }
3020
3021        #[rstest]
3022        fn test_handle_book_data_update() {
3023            let handler = create_handler_with_instruments(vec![create_spot_instrument()]);
3024            let json = load_test_json("ws_books_update.json");
3025            let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3026
3027            let OKXWsMessage::BookData { arg, action, data } = msg else {
3028                panic!("Expected OKXWsMessage::BookData");
3029            };
3030
3031            let ts_init = UnixNanos::from(1_000_000_000u64);
3032            let result = handler.handle_book_data(arg, action, data, ts_init);
3033
3034            assert!(result.is_some());
3035            match result.unwrap() {
3036                NautilusWsMessage::Data(payloads) => {
3037                    assert!(
3038                        !payloads.is_empty(),
3039                        "Should produce order book delta payloads"
3040                    );
3041                }
3042                other => panic!("Expected NautilusWsMessage::Data, was {other:?}"),
3043            }
3044        }
3045
3046        #[rstest]
3047        fn test_handle_other_channel_data_candles() {
3048            let mut handler = create_handler_with_instruments(vec![create_spot_instrument()]);
3049            let json = load_test_json("ws_candle.json");
3050            let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3051
3052            let OKXWsMessage::Data { arg, data } = msg else {
3053                panic!("Expected OKXWsMessage::Data");
3054            };
3055
3056            let ts_init = UnixNanos::from(1_000_000_000u64);
3057            let result = handler.handle_other_channel_data(arg.channel, arg.inst_id, data, ts_init);
3058
3059            assert!(result.is_some());
3060            match result.unwrap() {
3061                NautilusWsMessage::Data(payloads) => {
3062                    assert!(!payloads.is_empty(), "Should produce bar data payloads");
3063                }
3064                other => panic!("Expected NautilusWsMessage::Data, was {other:?}"),
3065            }
3066        }
3067
3068        #[rstest]
3069        fn test_handle_other_channel_data_funding_rate() {
3070            let mut handler = create_handler_with_instruments(vec![create_swap_instrument()]);
3071            let json = load_test_json("ws_funding_rate.json");
3072            let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3073
3074            let OKXWsMessage::Data { arg, data } = msg else {
3075                panic!("Expected OKXWsMessage::Data");
3076            };
3077
3078            let ts_init = UnixNanos::from(1_000_000_000u64);
3079            let result = handler.handle_other_channel_data(arg.channel, arg.inst_id, data, ts_init);
3080
3081            // Funding rate returns FundingRates variant when rate changes
3082            assert!(result.is_none() || matches!(result, Some(NautilusWsMessage::FundingRates(_))));
3083        }
3084
3085        #[rstest]
3086        fn test_handle_account_data_parses_successfully() {
3087            let mut handler = create_handler_with_instruments(vec![]);
3088            let json = load_test_json("ws_account.json");
3089            let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3090
3091            let OKXWsMessage::Data { data, .. } = msg else {
3092                panic!("Expected OKXWsMessage::Data");
3093            };
3094
3095            let ts_init = UnixNanos::from(1_000_000_000u64);
3096            let result = handler.handle_account_data(data, ts_init);
3097
3098            assert!(result.is_some());
3099            match result.unwrap() {
3100                NautilusWsMessage::AccountUpdate(account_state) => {
3101                    assert!(
3102                        !account_state.balances.is_empty(),
3103                        "Should have balance data"
3104                    );
3105                }
3106                other => panic!("Expected NautilusWsMessage::AccountUpdate, was {other:?}"),
3107            }
3108        }
3109
3110        #[rstest]
3111        fn test_handle_other_channel_data_missing_instrument() {
3112            let mut handler = create_handler_with_instruments(vec![]);
3113            let json = load_test_json("ws_tickers.json");
3114            let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3115
3116            let OKXWsMessage::Data { arg, data } = msg else {
3117                panic!("Expected OKXWsMessage::Data");
3118            };
3119
3120            let ts_init = UnixNanos::from(1_000_000_000u64);
3121            let result = handler.handle_other_channel_data(arg.channel, arg.inst_id, data, ts_init);
3122
3123            // Should return None when instrument is not in cache
3124            assert!(result.is_none());
3125        }
3126
3127        #[rstest]
3128        fn test_handle_book_data_missing_instrument() {
3129            let handler = create_handler_with_instruments(vec![]);
3130            let json = load_test_json("ws_books_snapshot.json");
3131            let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3132
3133            let OKXWsMessage::BookData { arg, action, data } = msg else {
3134                panic!("Expected OKXWsMessage::BookData");
3135            };
3136
3137            let ts_init = UnixNanos::from(1_000_000_000u64);
3138            let result = handler.handle_book_data(arg, action, data, ts_init);
3139
3140            // Should return None when instrument is not in cache
3141            assert!(result.is_none());
3142        }
3143    }
3144}