nautilus_okx/websocket/
handler.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! WebSocket message handler for OKX.
17//!
18//! The handler runs in a dedicated Tokio task as the I/O boundary between the client
19//! orchestrator and the network layer. It exclusively owns the `WebSocketClient` and
20//! processes commands from the client via an unbounded channel, serializing them to JSON
21//! and sending via the WebSocket. Raw messages are received from the network, deserialized,
22//! and transformed into `NautilusWsMessage` events which are emitted back to the client.
23//!
24//! Key responsibilities:
25//! - Command processing: Receives `HandlerCommand` from client, executes WebSocket operations.
26//! - Message transformation: Parses raw venue messages into Nautilus domain events.
27//! - Pending state tracking: Owns `AHashMap` for matching requests/responses (single-threaded).
28//! - Retry logic: Retries transient WebSocket send failures using `RetryManager`.
29//! - Error event emission: Emits `OrderRejected`, `OrderCancelRejected` when retries exhausted.
30
31use std::{
32    collections::VecDeque,
33    sync::{
34        Arc,
35        atomic::{AtomicBool, AtomicU64, Ordering},
36    },
37};
38
39use ahash::AHashMap;
40use dashmap::DashMap;
41use nautilus_core::{AtomicTime, UUID4, nanos::UnixNanos, time::get_atomic_clock_realtime};
42use nautilus_model::{
43    enums::{OrderStatus, OrderType},
44    events::{
45        AccountState, OrderAccepted, OrderCancelRejected, OrderModifyRejected, OrderRejected,
46    },
47    identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
48    instruments::{Instrument, InstrumentAny},
49    types::{Money, Quantity},
50};
51use nautilus_network::{
52    RECONNECTED,
53    retry::{RetryManager, create_websocket_retry_manager},
54    websocket::{AuthTracker, SubscriptionState, TEXT_PING, TEXT_PONG, WebSocketClient},
55};
56use serde_json::Value;
57use tokio_tungstenite::tungstenite::Message;
58use ustr::Ustr;
59
60use super::{
61    enums::{OKXSubscriptionEvent, OKXWsChannel, OKXWsOperation},
62    error::OKXWsError,
63    messages::{
64        ExecutionReport, NautilusWsMessage, OKXAlgoOrderMsg, OKXBookMsg, OKXOrderMsg,
65        OKXSubscription, OKXSubscriptionArg, OKXWebSocketArg, OKXWebSocketError, OKXWsMessage,
66        OKXWsRequest, WsAmendOrderParams, WsCancelAlgoOrderParamsBuilder,
67        WsCancelOrderParamsBuilder, WsMassCancelParams, WsPostAlgoOrderParams, WsPostOrderParams,
68    },
69    parse::{
70        OrderStateSnapshot, ParsedOrderEvent, parse_algo_order_msg, parse_book_msg_vec,
71        parse_order_event, parse_order_msg, parse_ws_message_data,
72    },
73    subscription::topic_from_websocket_arg,
74};
75use crate::{
76    common::{
77        consts::{
78            OKX_POST_ONLY_CANCEL_REASON, OKX_POST_ONLY_CANCEL_SOURCE, OKX_POST_ONLY_ERROR_CODE,
79            should_retry_error_code,
80        },
81        enums::{
82            OKXBookAction, OKXInstrumentType, OKXOrderStatus, OKXSide, OKXTargetCurrency,
83            OKXTradeMode,
84        },
85        parse::{
86            determine_order_type, is_market_price, okx_instrument_type, parse_account_state,
87            parse_client_order_id, parse_millisecond_timestamp, parse_position_status_report,
88            parse_price, parse_quantity,
89        },
90    },
91    http::models::{OKXAccount, OKXPosition},
92    websocket::client::{
93        OKX_RATE_LIMIT_KEY_AMEND, OKX_RATE_LIMIT_KEY_CANCEL, OKX_RATE_LIMIT_KEY_ORDER,
94        OKX_RATE_LIMIT_KEY_SUBSCRIPTION,
95    },
96};
97
98/// Data cached for pending place requests to correlate with responses.
99type PlaceRequestData = (
100    PendingOrderParams,
101    ClientOrderId,
102    TraderId,
103    StrategyId,
104    InstrumentId,
105);
106
107/// Data cached for pending cancel requests to correlate with responses.
108type CancelRequestData = (
109    ClientOrderId,
110    TraderId,
111    StrategyId,
112    InstrumentId,
113    Option<VenueOrderId>,
114);
115
116/// Data cached for pending amend requests to correlate with responses.
117type AmendRequestData = (
118    ClientOrderId,
119    TraderId,
120    StrategyId,
121    InstrumentId,
122    Option<VenueOrderId>,
123);
124
125#[derive(Debug)]
126pub enum PendingOrderParams {
127    Regular(WsPostOrderParams),
128    Algo(WsPostAlgoOrderParams),
129}
130
131/// Commands sent from the outer client to the inner message handler.
132#[allow(
133    clippy::large_enum_variant,
134    reason = "Commands are ephemeral and immediately consumed"
135)]
136#[allow(missing_debug_implementations)]
137pub enum HandlerCommand {
138    SetClient(WebSocketClient),
139    Disconnect,
140    Authenticate {
141        payload: String,
142    },
143    InitializeInstruments(Vec<InstrumentAny>),
144    UpdateInstrument(InstrumentAny),
145    Subscribe {
146        args: Vec<OKXSubscriptionArg>,
147    },
148    Unsubscribe {
149        args: Vec<OKXSubscriptionArg>,
150    },
151    PlaceOrder {
152        params: WsPostOrderParams,
153        client_order_id: ClientOrderId,
154        trader_id: TraderId,
155        strategy_id: StrategyId,
156        instrument_id: InstrumentId,
157    },
158    PlaceAlgoOrder {
159        params: WsPostAlgoOrderParams,
160        client_order_id: ClientOrderId,
161        trader_id: TraderId,
162        strategy_id: StrategyId,
163        instrument_id: InstrumentId,
164    },
165    AmendOrder {
166        params: WsAmendOrderParams,
167        client_order_id: ClientOrderId,
168        trader_id: TraderId,
169        strategy_id: StrategyId,
170        instrument_id: InstrumentId,
171        venue_order_id: Option<VenueOrderId>,
172    },
173    CancelOrder {
174        client_order_id: Option<ClientOrderId>,
175        venue_order_id: Option<VenueOrderId>,
176        instrument_id: InstrumentId,
177        trader_id: TraderId,
178        strategy_id: StrategyId,
179    },
180    CancelAlgoOrder {
181        client_order_id: Option<ClientOrderId>,
182        algo_order_id: Option<VenueOrderId>,
183        instrument_id: InstrumentId,
184        trader_id: TraderId,
185        strategy_id: StrategyId,
186    },
187    MassCancel {
188        instrument_id: InstrumentId,
189    },
190    BatchPlaceOrders {
191        args: Vec<Value>,
192        request_id: String,
193    },
194    BatchAmendOrders {
195        args: Vec<Value>,
196        request_id: String,
197    },
198    BatchCancelOrders {
199        args: Vec<Value>,
200        request_id: String,
201    },
202}
203
204pub(super) struct OKXWsFeedHandler {
205    clock: &'static AtomicTime,
206    account_id: AccountId,
207    signal: Arc<AtomicBool>,
208    inner: Option<WebSocketClient>,
209    cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
210    raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
211    out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
212    auth_tracker: AuthTracker,
213    subscriptions_state: SubscriptionState,
214    retry_manager: RetryManager<OKXWsError>,
215    pending_place_requests: AHashMap<String, PlaceRequestData>,
216    pending_cancel_requests: AHashMap<String, CancelRequestData>,
217    pending_amend_requests: AHashMap<String, AmendRequestData>,
218    pending_mass_cancel_requests: AHashMap<String, InstrumentId>,
219    pending_messages: VecDeque<NautilusWsMessage>,
220    active_client_orders: Arc<DashMap<ClientOrderId, (TraderId, StrategyId, InstrumentId)>>,
221    client_id_aliases: Arc<DashMap<ClientOrderId, ClientOrderId>>,
222    emitted_order_accepted: AHashMap<VenueOrderId, ()>,
223    instruments_cache: AHashMap<Ustr, InstrumentAny>,
224    fee_cache: AHashMap<Ustr, Money>,           // Key is order ID
225    filled_qty_cache: AHashMap<Ustr, Quantity>, // Key is order ID
226    order_state_cache: AHashMap<ClientOrderId, OrderStateSnapshot>,
227    funding_rate_cache: AHashMap<Ustr, (Ustr, u64)>, // Cache (funding_rate, funding_time) by inst_id
228    last_account_state: Option<AccountState>,
229    request_id_counter: AtomicU64,
230}
231
232impl OKXWsFeedHandler {
233    /// Creates a new [`OKXWsFeedHandler`] instance.
234    #[allow(clippy::too_many_arguments)]
235    pub fn new(
236        account_id: AccountId,
237        signal: Arc<AtomicBool>,
238        cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
239        raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
240        out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
241        active_client_orders: Arc<DashMap<ClientOrderId, (TraderId, StrategyId, InstrumentId)>>,
242        client_id_aliases: Arc<DashMap<ClientOrderId, ClientOrderId>>,
243        auth_tracker: AuthTracker,
244        subscriptions_state: SubscriptionState,
245    ) -> Self {
246        Self {
247            clock: get_atomic_clock_realtime(),
248            account_id,
249            signal,
250            inner: None,
251            cmd_rx,
252            raw_rx,
253            out_tx,
254            auth_tracker,
255            subscriptions_state,
256            retry_manager: create_websocket_retry_manager(),
257            pending_place_requests: AHashMap::new(),
258            pending_cancel_requests: AHashMap::new(),
259            pending_amend_requests: AHashMap::new(),
260            pending_mass_cancel_requests: AHashMap::new(),
261            pending_messages: VecDeque::new(),
262            active_client_orders,
263            client_id_aliases,
264            emitted_order_accepted: AHashMap::new(),
265            instruments_cache: AHashMap::new(),
266            fee_cache: AHashMap::new(),
267            filled_qty_cache: AHashMap::new(),
268            order_state_cache: AHashMap::new(),
269            funding_rate_cache: AHashMap::new(),
270            last_account_state: None,
271            request_id_counter: AtomicU64::new(0),
272        }
273    }
274
275    pub(super) fn is_stopped(&self) -> bool {
276        self.signal.load(std::sync::atomic::Ordering::Acquire)
277    }
278
279    pub(super) fn send(&self, msg: NautilusWsMessage) -> Result<(), ()> {
280        self.out_tx.send(msg).map_err(|_| ())
281    }
282
283    async fn send_with_retry(
284        &self,
285        payload: String,
286        rate_limit_keys: Option<Vec<String>>,
287    ) -> Result<(), OKXWsError> {
288        if let Some(client) = &self.inner {
289            self.retry_manager
290                .execute_with_retry(
291                    "websocket_send",
292                    || {
293                        let payload = payload.clone();
294                        let keys = rate_limit_keys.clone();
295                        async move {
296                            client
297                                .send_text(payload, keys)
298                                .await
299                                .map_err(|e| OKXWsError::ClientError(format!("Send failed: {e}")))
300                        }
301                    },
302                    should_retry_okx_error,
303                    create_okx_timeout_error,
304                )
305                .await
306        } else {
307            Err(OKXWsError::ClientError(
308                "No active WebSocket client".to_string(),
309            ))
310        }
311    }
312
313    pub(super) async fn send_pong(&self) -> anyhow::Result<()> {
314        match self.send_with_retry(TEXT_PONG.to_string(), None).await {
315            Ok(()) => {
316                tracing::trace!("Sent pong response to OKX text ping");
317                Ok(())
318            }
319            Err(e) => {
320                tracing::warn!(error = %e, "Failed to send pong after retries");
321                Err(anyhow::anyhow!("Failed to send pong: {e}"))
322            }
323        }
324    }
325
326    pub(super) async fn next(&mut self) -> Option<NautilusWsMessage> {
327        if let Some(message) = self.pending_messages.pop_front() {
328            return Some(message);
329        }
330
331        loop {
332            tokio::select! {
333                Some(cmd) = self.cmd_rx.recv() => {
334                    match cmd {
335                        HandlerCommand::SetClient(client) => {
336                            tracing::debug!("Handler received WebSocket client");
337                            self.inner = Some(client);
338                        }
339                        HandlerCommand::Disconnect => {
340                            tracing::debug!("Handler disconnecting WebSocket client");
341                            self.inner = None;
342                            return None;
343                        }
344                        HandlerCommand::Authenticate { payload } => {
345                            if let Err(e) = self.send_with_retry(payload, Some(vec![OKX_RATE_LIMIT_KEY_SUBSCRIPTION.to_string()])).await {
346                                tracing::error!(error = %e, "Failed to send authentication message after retries");
347                            }
348                        }
349                        HandlerCommand::InitializeInstruments(instruments) => {
350                            for inst in instruments {
351                                self.instruments_cache.insert(inst.symbol().inner(), inst);
352                            }
353                        }
354                        HandlerCommand::UpdateInstrument(inst) => {
355                            self.instruments_cache.insert(inst.symbol().inner(), inst);
356                        }
357                        HandlerCommand::Subscribe { args } => {
358                            if let Err(e) = self.handle_subscribe(args).await {
359                                tracing::error!(error = %e, "Failed to handle subscribe command");
360                            }
361                        }
362                        HandlerCommand::Unsubscribe { args } => {
363                            if let Err(e) = self.handle_unsubscribe(args).await {
364                                tracing::error!(error = %e, "Failed to handle unsubscribe command");
365                            }
366                        }
367                        HandlerCommand::CancelOrder {
368                            client_order_id,
369                            venue_order_id,
370                            instrument_id,
371                            trader_id,
372                            strategy_id,
373                        } => {
374                            if let Err(e) = self
375                                .handle_cancel_order(
376                                    client_order_id,
377                                    venue_order_id,
378                                    instrument_id,
379                                    trader_id,
380                                    strategy_id,
381                                )
382                                .await
383                            {
384                                tracing::error!(error = %e, "Failed to handle cancel order command");
385                            }
386                        }
387                        HandlerCommand::CancelAlgoOrder {
388                            client_order_id,
389                            algo_order_id,
390                            instrument_id,
391                            trader_id,
392                            strategy_id,
393                        } => {
394                            if let Err(e) = self
395                                .handle_cancel_algo_order(
396                                    client_order_id,
397                                    algo_order_id,
398                                    instrument_id,
399                                    trader_id,
400                                    strategy_id,
401                                )
402                                .await
403                            {
404                                tracing::error!(error = %e, "Failed to handle cancel algo order command");
405                            }
406                        }
407                        HandlerCommand::PlaceOrder {
408                            params,
409                            client_order_id,
410                            trader_id,
411                            strategy_id,
412                            instrument_id,
413                        } => {
414                            if let Err(e) = self
415                                .handle_place_order(
416                                    params,
417                                    client_order_id,
418                                    trader_id,
419                                    strategy_id,
420                                    instrument_id,
421                                )
422                                .await
423                            {
424                                tracing::error!(error = %e, "Failed to handle place order command");
425                            }
426                        }
427                        HandlerCommand::PlaceAlgoOrder {
428                            params,
429                            client_order_id,
430                            trader_id,
431                            strategy_id,
432                            instrument_id,
433                        } => {
434                            if let Err(e) = self
435                                .handle_place_algo_order(
436                                    params,
437                                    client_order_id,
438                                    trader_id,
439                                    strategy_id,
440                                    instrument_id,
441                                )
442                                .await
443                            {
444                                tracing::error!(error = %e, "Failed to handle place algo order command");
445                            }
446                        }
447                        HandlerCommand::AmendOrder {
448                            params,
449                            client_order_id,
450                            trader_id,
451                            strategy_id,
452                            instrument_id,
453                            venue_order_id,
454                        } => {
455                            if let Err(e) = self
456                                .handle_amend_order(
457                                    params,
458                                    client_order_id,
459                                    trader_id,
460                                    strategy_id,
461                                    instrument_id,
462                                    venue_order_id,
463                                )
464                                .await
465                            {
466                                tracing::error!(error = %e, "Failed to handle amend order command");
467                            }
468                        }
469                        HandlerCommand::MassCancel { instrument_id } => {
470                            if let Err(e) = self.handle_mass_cancel(instrument_id).await {
471                                tracing::error!(error = %e, "Failed to handle mass cancel command");
472                            }
473                        }
474                        HandlerCommand::BatchCancelOrders { args, request_id } => {
475                            if let Err(e) = self.handle_batch_cancel_orders(args, request_id).await {
476                                tracing::error!(error = %e, "Failed to handle batch cancel orders command");
477                            }
478                        }
479                        HandlerCommand::BatchPlaceOrders { args, request_id } => {
480                            if let Err(e) = self.handle_batch_place_orders(args, request_id).await {
481                                tracing::error!(error = %e, "Failed to handle batch place orders command");
482                            }
483                        }
484                        HandlerCommand::BatchAmendOrders { args, request_id } => {
485                            if let Err(e) = self.handle_batch_amend_orders(args, request_id).await {
486                                tracing::error!(error = %e, "Failed to handle batch amend orders command");
487                            }
488                        }
489                    }
490                    // Continue processing following command
491                    continue;
492                }
493
494                _ = tokio::time::sleep(std::time::Duration::from_millis(100)) => {
495                    if self.signal.load(std::sync::atomic::Ordering::Acquire) {
496                        tracing::debug!("Stop signal received during idle period");
497                        return None;
498                    }
499                    continue;
500                }
501
502                msg = self.raw_rx.recv() => {
503                    let event = match msg {
504                        Some(msg) => match Self::parse_raw_message(msg) {
505                            Some(event) => event,
506                            None => continue,
507                        },
508                        None => {
509                            tracing::debug!("WebSocket stream closed");
510                            return None;
511                        }
512                    };
513
514                    let ts_init = self.clock.get_time_ns();
515
516            match event {
517                OKXWsMessage::Ping => {
518                    if let Err(e) = self.send_pong().await {
519                        tracing::warn!(error = %e, "Failed to send pong response");
520                    }
521                    continue;
522                }
523                OKXWsMessage::Login {
524                    code, msg, conn_id, ..
525                } => {
526                    if code == "0" {
527                        self.auth_tracker.succeed();
528
529                        // Must return immediately to deliver Authenticated message.
530                        // Using push_back() + continue blocks the select! loop and prevents
531                        // the spawn block from receiving this event, breaking reconnection flow.
532                        return Some(NautilusWsMessage::Authenticated);
533                    }
534
535                    tracing::error!(error = %msg, "WebSocket authentication failed");
536                    self.auth_tracker.fail(msg.clone());
537
538                    let error = OKXWebSocketError {
539                        code,
540                        message: msg,
541                        conn_id: Some(conn_id),
542                        timestamp: self.clock.get_time_ns().as_u64(),
543                    };
544                    self.pending_messages
545                        .push_back(NautilusWsMessage::Error(error));
546                    continue;
547                }
548                OKXWsMessage::BookData { arg, action, data } => {
549                    if let Some(msg) = self.handle_book_data(arg, action, data, ts_init) {
550                        return Some(msg);
551                    }
552                    continue;
553                }
554                OKXWsMessage::OrderResponse {
555                    id,
556                    op,
557                    code,
558                    msg,
559                    data,
560                } => {
561                    if let Some(msg) = self.handle_order_response(id, op, code, msg, data, ts_init) {
562                        return Some(msg);
563                    }
564                    continue;
565                }
566                OKXWsMessage::Data { arg, data } => {
567                    let OKXWebSocketArg {
568                        channel, inst_id, ..
569                    } = arg;
570
571                    match channel {
572                        OKXWsChannel::Account => {
573                            if let Some(msg) = self.handle_account_data(data, ts_init) {
574                                return Some(msg);
575                            }
576                            continue;
577                        }
578                        OKXWsChannel::Positions => {
579                            self.handle_positions_data(data, ts_init);
580                            continue;
581                        }
582                        OKXWsChannel::Orders => {
583                            if let Some(msg) = self.handle_orders_data(data, ts_init) {
584                                return Some(msg);
585                            }
586                            continue;
587                        }
588                        OKXWsChannel::OrdersAlgo => {
589                            if let Some(msg) = self.handle_algo_orders_data(data, ts_init) {
590                                return Some(msg);
591                            }
592                            continue;
593                        }
594                        _ => {
595                            if let Some(msg) =
596                                self.handle_other_channel_data(channel, inst_id, data, ts_init)
597                            {
598                                return Some(msg);
599                            }
600                            continue;
601                        }
602                    }
603                }
604                OKXWsMessage::Error { code, msg } => {
605                    let error = OKXWebSocketError {
606                        code,
607                        message: msg,
608                        conn_id: None,
609                        timestamp: self.clock.get_time_ns().as_u64(),
610                    };
611                    return Some(NautilusWsMessage::Error(error));
612                }
613                OKXWsMessage::Reconnected => {
614                    return Some(NautilusWsMessage::Reconnected);
615                }
616                OKXWsMessage::Subscription {
617                    event,
618                    arg,
619                    code,
620                    msg,
621                    ..
622                } => {
623                    let topic = topic_from_websocket_arg(&arg);
624                    let success = code.as_deref().is_none_or(|c| c == "0");
625
626                    match event {
627                        OKXSubscriptionEvent::Subscribe => {
628                            if success {
629                                self.subscriptions_state.confirm_subscribe(&topic);
630                            } else {
631                                tracing::warn!(?topic, error = ?msg, code = ?code, "Subscription failed");
632                                self.subscriptions_state.mark_failure(&topic);
633                            }
634                        }
635                        OKXSubscriptionEvent::Unsubscribe => {
636                            if success {
637                                self.subscriptions_state.confirm_unsubscribe(&topic);
638                            } else {
639                                tracing::warn!(?topic, error = ?msg, code = ?code, "Unsubscription failed - restoring subscription");
640                                // Venue rejected unsubscribe, so we're still subscribed. Restore state:
641                                self.subscriptions_state.confirm_unsubscribe(&topic); // Clear pending_unsubscribe
642                                self.subscriptions_state.mark_subscribe(&topic);      // Mark as subscribing
643                                self.subscriptions_state.confirm_subscribe(&topic);   // Confirm subscription
644                            }
645                        }
646                    }
647
648                    continue;
649                }
650                OKXWsMessage::ChannelConnCount { .. } => continue,
651            }
652                }
653
654                // Handle shutdown - either channel closed or stream ended
655                else => {
656                    tracing::debug!("Handler shutting down: stream ended or command channel closed");
657                    return None;
658                }
659            }
660        }
661    }
662
663    pub(super) fn is_post_only_auto_cancel(msg: &OKXOrderMsg) -> bool {
664        if msg.state != OKXOrderStatus::Canceled {
665            return false;
666        }
667
668        let cancel_source_matches = matches!(
669            msg.cancel_source.as_deref(),
670            Some(source) if source == OKX_POST_ONLY_CANCEL_SOURCE
671        );
672
673        let reason_matches = matches!(
674            msg.cancel_source_reason.as_deref(),
675            Some(reason) if reason.contains("POST_ONLY")
676        );
677
678        if !(cancel_source_matches || reason_matches) {
679            return false;
680        }
681
682        msg.acc_fill_sz
683            .as_ref()
684            .is_none_or(|filled| filled == "0" || filled.is_empty())
685    }
686
687    fn try_handle_post_only_auto_cancel(
688        &mut self,
689        msg: &OKXOrderMsg,
690        ts_init: UnixNanos,
691        exec_reports: &mut Vec<ExecutionReport>,
692    ) -> bool {
693        if !Self::is_post_only_auto_cancel(msg) {
694            return false;
695        }
696
697        let Some(client_order_id) = parse_client_order_id(&msg.cl_ord_id) else {
698            return false;
699        };
700
701        let Some((_, (trader_id, strategy_id, instrument_id))) =
702            self.active_client_orders.remove(&client_order_id)
703        else {
704            return false;
705        };
706
707        self.client_id_aliases.remove(&client_order_id);
708
709        if !exec_reports.is_empty() {
710            let reports = std::mem::take(exec_reports);
711            self.pending_messages
712                .push_back(NautilusWsMessage::ExecutionReports(reports));
713        }
714
715        let reason = msg
716            .cancel_source_reason
717            .as_ref()
718            .filter(|reason| !reason.is_empty())
719            .map_or_else(
720                || Ustr::from(OKX_POST_ONLY_CANCEL_REASON),
721                |reason| Ustr::from(reason.as_str()),
722            );
723
724        let ts_event = parse_millisecond_timestamp(msg.u_time);
725        let rejected = OrderRejected::new(
726            trader_id,
727            strategy_id,
728            instrument_id,
729            client_order_id,
730            self.account_id,
731            reason,
732            UUID4::new(),
733            ts_event,
734            ts_init,
735            false,
736            true,
737        );
738
739        self.pending_messages
740            .push_back(NautilusWsMessage::OrderRejected(rejected));
741
742        true
743    }
744
745    fn register_client_order_aliases(
746        &self,
747        raw_child: &Option<ClientOrderId>,
748        parent_from_msg: &Option<ClientOrderId>,
749    ) -> Option<ClientOrderId> {
750        if let Some(parent) = parent_from_msg {
751            self.client_id_aliases.insert(*parent, *parent);
752            if let Some(child) = raw_child.as_ref().filter(|child| **child != *parent) {
753                self.client_id_aliases.insert(*child, *parent);
754            }
755            Some(*parent)
756        } else if let Some(child) = raw_child.as_ref() {
757            if let Some(mapped) = self.client_id_aliases.get(child) {
758                Some(*mapped.value())
759            } else {
760                self.client_id_aliases.insert(*child, *child);
761                Some(*child)
762            }
763        } else {
764            None
765        }
766    }
767
768    fn adjust_execution_report(
769        &self,
770        report: ExecutionReport,
771        effective_client_id: &Option<ClientOrderId>,
772        raw_child: &Option<ClientOrderId>,
773    ) -> ExecutionReport {
774        match report {
775            ExecutionReport::Order(status_report) => {
776                let mut adjusted = status_report;
777                let mut final_id = *effective_client_id;
778
779                if final_id.is_none() {
780                    final_id = adjusted.client_order_id;
781                }
782
783                if final_id.is_none()
784                    && let Some(child) = raw_child.as_ref()
785                    && let Some(mapped) = self.client_id_aliases.get(child)
786                {
787                    final_id = Some(*mapped.value());
788                }
789
790                if let Some(final_id_value) = final_id {
791                    if adjusted.client_order_id != Some(final_id_value) {
792                        adjusted = adjusted.with_client_order_id(final_id_value);
793                    }
794                    self.client_id_aliases
795                        .insert(final_id_value, final_id_value);
796
797                    if let Some(child) =
798                        raw_child.as_ref().filter(|child| **child != final_id_value)
799                    {
800                        adjusted = adjusted.with_linked_order_ids(vec![*child]);
801                    }
802                }
803
804                ExecutionReport::Order(adjusted)
805            }
806            ExecutionReport::Fill(mut fill_report) => {
807                let mut final_id = *effective_client_id;
808                if final_id.is_none() {
809                    final_id = fill_report.client_order_id;
810                }
811                if final_id.is_none()
812                    && let Some(child) = raw_child.as_ref()
813                    && let Some(mapped) = self.client_id_aliases.get(child)
814                {
815                    final_id = Some(*mapped.value());
816                }
817
818                if let Some(final_id_value) = final_id {
819                    fill_report.client_order_id = Some(final_id_value);
820                    self.client_id_aliases
821                        .insert(final_id_value, final_id_value);
822                }
823
824                ExecutionReport::Fill(fill_report)
825            }
826        }
827    }
828
829    fn update_caches_with_report(&mut self, report: &ExecutionReport) {
830        match report {
831            ExecutionReport::Fill(fill_report) => {
832                let order_id = fill_report.venue_order_id.inner();
833                let current_fee = self
834                    .fee_cache
835                    .get(&order_id)
836                    .copied()
837                    .unwrap_or_else(|| Money::new(0.0, fill_report.commission.currency));
838                let total_fee = current_fee + fill_report.commission;
839                self.fee_cache.insert(order_id, total_fee);
840
841                let current_filled_qty = self
842                    .filled_qty_cache
843                    .get(&order_id)
844                    .copied()
845                    .unwrap_or_else(|| Quantity::zero(fill_report.last_qty.precision));
846                let total_filled_qty = current_filled_qty + fill_report.last_qty;
847                self.filled_qty_cache.insert(order_id, total_filled_qty);
848            }
849            ExecutionReport::Order(status_report) => {
850                if matches!(status_report.order_status, OrderStatus::Filled) {
851                    self.fee_cache.remove(&status_report.venue_order_id.inner());
852                    self.filled_qty_cache
853                        .remove(&status_report.venue_order_id.inner());
854                }
855
856                if matches!(
857                    status_report.order_status,
858                    OrderStatus::Canceled
859                        | OrderStatus::Expired
860                        | OrderStatus::Filled
861                        | OrderStatus::Rejected,
862                ) {
863                    self.emitted_order_accepted
864                        .remove(&status_report.venue_order_id);
865                    if let Some(client_order_id) = status_report.client_order_id {
866                        self.order_state_cache.remove(&client_order_id);
867                        self.active_client_orders.remove(&client_order_id);
868                        self.client_id_aliases.remove(&client_order_id);
869                    }
870                    if let Some(linked) = &status_report.linked_order_ids {
871                        for child in linked {
872                            self.client_id_aliases.remove(child);
873                        }
874                    }
875                }
876            }
877        }
878    }
879
880    #[allow(clippy::too_many_lines)]
881    fn handle_order_response(
882        &mut self,
883        id: Option<String>,
884        op: OKXWsOperation,
885        code: String,
886        msg: String,
887        data: Vec<Value>,
888        ts_init: UnixNanos,
889    ) -> Option<NautilusWsMessage> {
890        if code == "0" {
891            tracing::debug!("Order operation successful: id={id:?} op={op} code={code}");
892
893            if op == OKXWsOperation::BatchCancelOrders {
894                tracing::debug!(
895                    "Batch cancel operation successful: id={id:?} cancel_count={}",
896                    data.len()
897                );
898
899                // Check for per-order errors even when top-level code is "0"
900                for (idx, entry) in data.iter().enumerate() {
901                    if let Some(entry_code) = entry.get("sCode").and_then(|v| v.as_str())
902                        && entry_code != "0"
903                    {
904                        let entry_msg = entry
905                            .get("sMsg")
906                            .and_then(|v| v.as_str())
907                            .unwrap_or("Unknown error");
908
909                        if let Some(cl_ord_id_str) = entry
910                            .get("clOrdId")
911                            .and_then(|v| v.as_str())
912                            .filter(|s| !s.is_empty())
913                        {
914                            tracing::error!(
915                                "Batch cancel partial failure for order {}: sCode={} sMsg={}",
916                                cl_ord_id_str,
917                                entry_code,
918                                entry_msg
919                            );
920                            // TODO: Emit OrderCancelRejected for this specific order
921                        } else {
922                            tracing::error!(
923                                "Batch cancel entry[{}] failed: sCode={} sMsg={} data={:?}",
924                                idx,
925                                entry_code,
926                                entry_msg,
927                                entry
928                            );
929                        }
930                    }
931                }
932
933                return None;
934            } else if op == OKXWsOperation::MassCancel
935                && let Some(request_id) = &id
936                && let Some(instrument_id) = self.pending_mass_cancel_requests.remove(request_id)
937            {
938                tracing::debug!(
939                    "Mass cancel operation successful for instrument: {}",
940                    instrument_id
941                );
942            } else if op == OKXWsOperation::Order
943                && let Some(request_id) = &id
944                && let Some((params, client_order_id, trader_id, strategy_id, instrument_id)) =
945                    self.pending_place_requests.remove(request_id)
946            {
947                let (venue_order_id, ts_accepted) = if let Some(first) = data.first() {
948                    let ord_id = first
949                        .get("ordId")
950                        .and_then(|v| v.as_str())
951                        .filter(|s| !s.is_empty())
952                        .map(VenueOrderId::new);
953
954                    let ts = first
955                        .get("ts")
956                        .and_then(|v| v.as_str())
957                        .and_then(|s| s.parse::<u64>().ok())
958                        .map_or_else(
959                            || self.clock.get_time_ns(),
960                            |ms| UnixNanos::from(ms * 1_000_000),
961                        );
962
963                    (ord_id, ts)
964                } else {
965                    (None, self.clock.get_time_ns())
966                };
967
968                if let Some(instrument) = self.instruments_cache.get(&instrument_id.symbol.inner())
969                {
970                    match params {
971                        PendingOrderParams::Regular(order_params) => {
972                            let order_type = determine_order_type(
973                                order_params.ord_type,
974                                order_params.px.as_deref().unwrap_or(""),
975                            );
976
977                            let is_explicit_quote_sized = order_params
978                                .tgt_ccy
979                                .is_some_and(|tgt| tgt == OKXTargetCurrency::QuoteCcy);
980
981                            // SPOT market BUY in cash mode with no tgt_ccy defaults to quote-sizing
982                            let is_implicit_quote_sized = order_params.tgt_ccy.is_none()
983                                && order_params.side == OKXSide::Buy
984                                && order_type == OrderType::Market
985                                && order_params.td_mode == OKXTradeMode::Cash
986                                && instrument.instrument_class().as_ref() == "SPOT";
987
988                            if is_explicit_quote_sized || is_implicit_quote_sized {
989                                // For quote-sized orders, sz is in quote currency (USDT),
990                                // not base currency (ETH). We can't accurately parse the
991                                // base quantity without the fill price, so we skip the
992                                // synthetic OrderAccepted and rely on the orders channel
993                                tracing::debug!(
994                                    "Skipping synthetic OrderAccepted for {} quote-sized order: client_order_id={client_order_id}, venue_order_id={venue_order_id:?}",
995                                    if is_explicit_quote_sized {
996                                        "explicit"
997                                    } else {
998                                        "implicit"
999                                    },
1000                                );
1001                                return None;
1002                            }
1003
1004                            let Some(v_order_id) = venue_order_id else {
1005                                tracing::error!(
1006                                    "No venue_order_id for accepted order: client_order_id={client_order_id}"
1007                                );
1008                                return None;
1009                            };
1010
1011                            // Check if already emitted from orders channel push
1012                            if self.emitted_order_accepted.contains_key(&v_order_id) {
1013                                tracing::debug!(
1014                                    "Skipping duplicate OrderAccepted from operation response for venue_order_id={v_order_id}"
1015                                );
1016                                return None;
1017                            }
1018                            self.emitted_order_accepted.insert(v_order_id, ());
1019
1020                            let accepted = OrderAccepted::new(
1021                                trader_id,
1022                                strategy_id,
1023                                instrument_id,
1024                                client_order_id,
1025                                v_order_id,
1026                                self.account_id,
1027                                UUID4::new(),
1028                                ts_accepted,
1029                                ts_init,
1030                                false, // Not from reconciliation
1031                            );
1032
1033                            tracing::debug!(
1034                                "Order accepted: client_order_id={client_order_id}, venue_order_id={v_order_id}"
1035                            );
1036
1037                            return Some(NautilusWsMessage::OrderAccepted(accepted));
1038                        }
1039                        PendingOrderParams::Algo(_) => {
1040                            tracing::debug!(
1041                                "Algo order placement confirmed: client_order_id={client_order_id}, venue_order_id={:?}",
1042                                venue_order_id
1043                            );
1044                        }
1045                    }
1046                } else {
1047                    tracing::error!("Instrument not found for accepted order: {instrument_id}");
1048                }
1049            }
1050
1051            if let Some(first) = data.first()
1052                && let Some(success_msg) = first.get("sMsg").and_then(|value| value.as_str())
1053            {
1054                tracing::debug!("Order details: {success_msg}");
1055            }
1056
1057            return None;
1058        }
1059
1060        let error_msg = data
1061            .first()
1062            .and_then(|d| d.get("sMsg"))
1063            .and_then(|s| s.as_str())
1064            .unwrap_or(&msg)
1065            .to_string();
1066
1067        if let Some(first) = data.first() {
1068            tracing::debug!(
1069                "Error data fields: {}",
1070                serde_json::to_string_pretty(first)
1071                    .unwrap_or_else(|_| "unable to serialize".to_string())
1072            );
1073        }
1074
1075        tracing::warn!("Order operation failed: id={id:?} op={op} code={code} msg={error_msg}");
1076
1077        let ts_event = self.clock.get_time_ns();
1078
1079        if let Some(request_id) = &id {
1080            match op {
1081                OKXWsOperation::Order => {
1082                    if let Some((_params, client_order_id, trader_id, strategy_id, instrument_id)) =
1083                        self.pending_place_requests.remove(request_id)
1084                    {
1085                        let due_post_only = is_post_only_rejection(code.as_str(), &data);
1086                        let rejected = OrderRejected::new(
1087                            trader_id,
1088                            strategy_id,
1089                            instrument_id,
1090                            client_order_id,
1091                            self.account_id,
1092                            Ustr::from(error_msg.as_str()),
1093                            UUID4::new(),
1094                            ts_event,
1095                            ts_init,
1096                            false, // Not from reconciliation
1097                            due_post_only,
1098                        );
1099
1100                        return Some(NautilusWsMessage::OrderRejected(rejected));
1101                    }
1102                }
1103                OKXWsOperation::CancelOrder => {
1104                    if let Some((
1105                        client_order_id,
1106                        trader_id,
1107                        strategy_id,
1108                        instrument_id,
1109                        venue_order_id,
1110                    )) = self.pending_cancel_requests.remove(request_id)
1111                    {
1112                        let rejected = OrderCancelRejected::new(
1113                            trader_id,
1114                            strategy_id,
1115                            instrument_id,
1116                            client_order_id,
1117                            Ustr::from(error_msg.as_str()),
1118                            UUID4::new(),
1119                            ts_event,
1120                            ts_init,
1121                            false, // Not from reconciliation
1122                            venue_order_id,
1123                            Some(self.account_id),
1124                        );
1125
1126                        return Some(NautilusWsMessage::OrderCancelRejected(rejected));
1127                    }
1128                }
1129                OKXWsOperation::AmendOrder => {
1130                    if let Some((
1131                        client_order_id,
1132                        trader_id,
1133                        strategy_id,
1134                        instrument_id,
1135                        venue_order_id,
1136                    )) = self.pending_amend_requests.remove(request_id)
1137                    {
1138                        let rejected = OrderModifyRejected::new(
1139                            trader_id,
1140                            strategy_id,
1141                            instrument_id,
1142                            client_order_id,
1143                            Ustr::from(error_msg.as_str()),
1144                            UUID4::new(),
1145                            ts_event,
1146                            ts_init,
1147                            false, // Not from reconciliation
1148                            venue_order_id,
1149                            Some(self.account_id),
1150                        );
1151
1152                        return Some(NautilusWsMessage::OrderModifyRejected(rejected));
1153                    }
1154                }
1155                OKXWsOperation::OrderAlgo => {
1156                    if let Some((_params, client_order_id, trader_id, strategy_id, instrument_id)) =
1157                        self.pending_place_requests.remove(request_id)
1158                    {
1159                        let due_post_only = is_post_only_rejection(code.as_str(), &data);
1160                        let rejected = OrderRejected::new(
1161                            trader_id,
1162                            strategy_id,
1163                            instrument_id,
1164                            client_order_id,
1165                            self.account_id,
1166                            Ustr::from(error_msg.as_str()),
1167                            UUID4::new(),
1168                            ts_event,
1169                            ts_init,
1170                            false, // Not from reconciliation
1171                            due_post_only,
1172                        );
1173
1174                        return Some(NautilusWsMessage::OrderRejected(rejected));
1175                    }
1176                }
1177                OKXWsOperation::CancelAlgos => {
1178                    if let Some((
1179                        client_order_id,
1180                        trader_id,
1181                        strategy_id,
1182                        instrument_id,
1183                        venue_order_id,
1184                    )) = self.pending_cancel_requests.remove(request_id)
1185                    {
1186                        let rejected = OrderCancelRejected::new(
1187                            trader_id,
1188                            strategy_id,
1189                            instrument_id,
1190                            client_order_id,
1191                            Ustr::from(error_msg.as_str()),
1192                            UUID4::new(),
1193                            ts_event,
1194                            ts_init,
1195                            false, // Not from reconciliation
1196                            venue_order_id,
1197                            Some(self.account_id),
1198                        );
1199
1200                        return Some(NautilusWsMessage::OrderCancelRejected(rejected));
1201                    }
1202                }
1203                OKXWsOperation::MassCancel => {
1204                    if let Some(instrument_id) =
1205                        self.pending_mass_cancel_requests.remove(request_id)
1206                    {
1207                        tracing::error!(
1208                            "Mass cancel operation failed for {}: code={code} msg={error_msg}",
1209                            instrument_id
1210                        );
1211                        let error = OKXWebSocketError {
1212                            code,
1213                            message: format!("Mass cancel failed for {instrument_id}: {error_msg}"),
1214                            conn_id: None,
1215                            timestamp: ts_event.as_u64(),
1216                        };
1217                        return Some(NautilusWsMessage::Error(error));
1218                    } else {
1219                        tracing::error!(
1220                            "Mass cancel operation failed: code={code} msg={error_msg}"
1221                        );
1222                    }
1223                }
1224                OKXWsOperation::BatchCancelOrders => {
1225                    tracing::warn!(
1226                        "Batch cancel operation failed: id={id:?} code={code} msg={error_msg} data_count={}",
1227                        data.len()
1228                    );
1229
1230                    // Iterate through data array to check per-order errors
1231                    for (idx, entry) in data.iter().enumerate() {
1232                        let entry_code =
1233                            entry.get("sCode").and_then(|v| v.as_str()).unwrap_or(&code);
1234                        let entry_msg = entry
1235                            .get("sMsg")
1236                            .and_then(|v| v.as_str())
1237                            .unwrap_or(&error_msg);
1238
1239                        if entry_code != "0" {
1240                            // Try to extract client order ID for targeted error events
1241                            if let Some(cl_ord_id_str) = entry
1242                                .get("clOrdId")
1243                                .and_then(|v| v.as_str())
1244                                .filter(|s| !s.is_empty())
1245                            {
1246                                tracing::error!(
1247                                    "Batch cancel failed for order {}: sCode={} sMsg={}",
1248                                    cl_ord_id_str,
1249                                    entry_code,
1250                                    entry_msg
1251                                );
1252                                // TODO: Emit OrderCancelRejected event once we track
1253                                // batch cancel metadata (client_order_id, trader_id, etc.)
1254                            } else {
1255                                tracing::error!(
1256                                    "Batch cancel entry[{}] failed: sCode={} sMsg={} data={:?}",
1257                                    idx,
1258                                    entry_code,
1259                                    entry_msg,
1260                                    entry
1261                                );
1262                            }
1263                        }
1264                    }
1265
1266                    // Emit generic error for the batch operation
1267                    let error = OKXWebSocketError {
1268                        code,
1269                        message: format!("Batch cancel failed: {error_msg}"),
1270                        conn_id: None,
1271                        timestamp: ts_event.as_u64(),
1272                    };
1273                    return Some(NautilusWsMessage::Error(error));
1274                }
1275                _ => tracing::warn!("Unhandled operation type for rejection: {op}"),
1276            }
1277        }
1278
1279        let error = OKXWebSocketError {
1280            code,
1281            message: error_msg,
1282            conn_id: None,
1283            timestamp: ts_event.as_u64(),
1284        };
1285        Some(NautilusWsMessage::Error(error))
1286    }
1287
1288    fn handle_book_data(
1289        &self,
1290        arg: OKXWebSocketArg,
1291        action: OKXBookAction,
1292        data: Vec<OKXBookMsg>,
1293        ts_init: UnixNanos,
1294    ) -> Option<NautilusWsMessage> {
1295        let Some(inst_id) = arg.inst_id else {
1296            tracing::error!("Instrument ID missing for book data event");
1297            return None;
1298        };
1299
1300        let inst = self.instruments_cache.get(&inst_id)?;
1301
1302        let instrument_id = inst.id();
1303        let price_precision = inst.price_precision();
1304        let size_precision = inst.size_precision();
1305
1306        match parse_book_msg_vec(
1307            data,
1308            &instrument_id,
1309            price_precision,
1310            size_precision,
1311            action,
1312            ts_init,
1313        ) {
1314            Ok(payloads) => Some(NautilusWsMessage::Data(payloads)),
1315            Err(e) => {
1316                tracing::error!("Failed to parse book message: {e}");
1317                None
1318            }
1319        }
1320    }
1321
1322    fn handle_account_data(
1323        &mut self,
1324        data: Value,
1325        ts_init: UnixNanos,
1326    ) -> Option<NautilusWsMessage> {
1327        let Value::Array(arr) = data else {
1328            tracing::error!("Account data is not an array");
1329            return None;
1330        };
1331
1332        let first = arr.into_iter().next()?;
1333
1334        let account: OKXAccount = match serde_json::from_value(first) {
1335            Ok(acc) => acc,
1336            Err(e) => {
1337                tracing::error!("Failed to parse account data: {e}");
1338                return None;
1339            }
1340        };
1341
1342        match parse_account_state(&account, self.account_id, ts_init) {
1343            Ok(account_state) => {
1344                if let Some(last_account_state) = &self.last_account_state
1345                    && account_state.has_same_balances_and_margins(last_account_state)
1346                {
1347                    return None;
1348                }
1349                self.last_account_state = Some(account_state.clone());
1350                Some(NautilusWsMessage::AccountUpdate(account_state))
1351            }
1352            Err(e) => {
1353                tracing::error!("Failed to parse account state: {e}");
1354                None
1355            }
1356        }
1357    }
1358
1359    fn handle_positions_data(&mut self, data: Value, ts_init: UnixNanos) {
1360        match serde_json::from_value::<Vec<OKXPosition>>(data) {
1361            Ok(positions) => {
1362                tracing::debug!("Received {} position update(s)", positions.len());
1363
1364                for position in positions {
1365                    let instrument = match self.instruments_cache.get(&position.inst_id) {
1366                        Some(inst) => inst,
1367                        None => {
1368                            tracing::warn!(
1369                                "Received position update for unknown instrument {}, skipping",
1370                                position.inst_id
1371                            );
1372                            continue;
1373                        }
1374                    };
1375
1376                    let instrument_id = instrument.id();
1377                    let size_precision = instrument.size_precision();
1378
1379                    match parse_position_status_report(
1380                        position,
1381                        self.account_id,
1382                        instrument_id,
1383                        size_precision,
1384                        ts_init,
1385                    ) {
1386                        Ok(position_report) => {
1387                            self.pending_messages
1388                                .push_back(NautilusWsMessage::PositionUpdate(position_report));
1389                        }
1390                        Err(e) => {
1391                            tracing::error!(
1392                                "Failed to parse position status report for {}: {e}",
1393                                instrument_id
1394                            );
1395                        }
1396                    }
1397                }
1398            }
1399            Err(e) => {
1400                tracing::error!("Failed to parse positions data: {e}");
1401            }
1402        }
1403    }
1404
1405    fn handle_orders_data(&mut self, data: Value, ts_init: UnixNanos) -> Option<NautilusWsMessage> {
1406        let orders: Vec<OKXOrderMsg> = match serde_json::from_value(data) {
1407            Ok(orders) => orders,
1408            Err(e) => {
1409                tracing::error!("Failed to deserialize orders channel payload: {e}");
1410                return None;
1411            }
1412        };
1413
1414        tracing::debug!(
1415            "Received {} order message(s) from orders channel",
1416            orders.len()
1417        );
1418
1419        let mut exec_reports: Vec<ExecutionReport> = Vec::with_capacity(orders.len());
1420
1421        for msg in orders {
1422            tracing::debug!(
1423                "Processing order message: inst_id={}, cl_ord_id={}, state={:?}, exec_type={:?}",
1424                msg.inst_id,
1425                msg.cl_ord_id,
1426                msg.state,
1427                msg.exec_type
1428            );
1429
1430            if self.try_handle_post_only_auto_cancel(&msg, ts_init, &mut exec_reports) {
1431                continue;
1432            }
1433
1434            let raw_child = parse_client_order_id(&msg.cl_ord_id);
1435            let parent_from_msg = msg
1436                .algo_cl_ord_id
1437                .as_ref()
1438                .filter(|value| !value.is_empty())
1439                .map(ClientOrderId::new);
1440            let effective_client_id =
1441                self.register_client_order_aliases(&raw_child, &parent_from_msg);
1442
1443            let Some(instrument) = self.instruments_cache.get(&msg.inst_id) else {
1444                tracing::error!(
1445                    "No instrument found for inst_id: {inst_id}",
1446                    inst_id = msg.inst_id
1447                );
1448                continue;
1449            };
1450            let price_precision = instrument.price_precision();
1451            let size_precision = instrument.size_precision();
1452
1453            let order_metadata = effective_client_id
1454                .and_then(|cid| self.active_client_orders.get(&cid).map(|e| *e.value()));
1455
1456            let previous_fee = self.fee_cache.get(&msg.ord_id).copied();
1457            let previous_filled_qty = self.filled_qty_cache.get(&msg.ord_id).copied();
1458            let previous_state =
1459                effective_client_id.and_then(|cid| self.order_state_cache.get(&cid).cloned());
1460
1461            // SAFETY: order_metadata being Some implies effective_client_id is Some
1462            if let (Some((trader_id, strategy_id, _instrument_id)), Some(canonical_client_id)) =
1463                (order_metadata, effective_client_id)
1464            {
1465                match parse_order_event(
1466                    &msg,
1467                    canonical_client_id,
1468                    self.account_id,
1469                    trader_id,
1470                    strategy_id,
1471                    instrument,
1472                    previous_fee,
1473                    previous_filled_qty,
1474                    previous_state.as_ref(),
1475                    ts_init,
1476                ) {
1477                    Ok(event) => {
1478                        self.process_parsed_order_event(
1479                            event,
1480                            &msg,
1481                            price_precision,
1482                            size_precision,
1483                            canonical_client_id,
1484                            &raw_child,
1485                            &mut exec_reports,
1486                        );
1487                    }
1488                    Err(e) => tracing::error!("Failed to parse order event: {e}"),
1489                }
1490            } else {
1491                // External order or not tracked - use old parse_order_msg for backward compatibility
1492                match parse_order_msg(
1493                    &msg,
1494                    self.account_id,
1495                    &self.instruments_cache,
1496                    &self.fee_cache,
1497                    &self.filled_qty_cache,
1498                    ts_init,
1499                ) {
1500                    Ok(report) => {
1501                        tracing::debug!("Parsed external order as execution report: {report:?}");
1502                        let adjusted =
1503                            self.adjust_execution_report(report, &effective_client_id, &raw_child);
1504                        self.update_caches_with_report(&adjusted);
1505                        exec_reports.push(adjusted);
1506                    }
1507                    Err(e) => tracing::error!("Failed to parse order message: {e}"),
1508                }
1509            }
1510        }
1511
1512        if !exec_reports.is_empty() {
1513            tracing::debug!(
1514                "Pushing {count} execution report(s) to message queue",
1515                count = exec_reports.len()
1516            );
1517            self.pending_messages
1518                .push_back(NautilusWsMessage::ExecutionReports(exec_reports));
1519        }
1520
1521        self.pending_messages.pop_front()
1522    }
1523
1524    /// Processes a parsed order event and emits the appropriate message.
1525    #[allow(clippy::too_many_arguments)]
1526    fn process_parsed_order_event(
1527        &mut self,
1528        event: ParsedOrderEvent,
1529        msg: &OKXOrderMsg,
1530        price_precision: u8,
1531        size_precision: u8,
1532        canonical_client_id: ClientOrderId,
1533        raw_child: &Option<ClientOrderId>,
1534        exec_reports: &mut Vec<ExecutionReport>,
1535    ) {
1536        let venue_order_id = VenueOrderId::new(msg.ord_id);
1537
1538        match event {
1539            ParsedOrderEvent::Accepted(accepted) => {
1540                if self.emitted_order_accepted.contains_key(&venue_order_id) {
1541                    tracing::debug!(
1542                        "Skipping duplicate OrderAccepted for venue_order_id={venue_order_id}"
1543                    );
1544                    return;
1545                }
1546                self.emitted_order_accepted.insert(venue_order_id, ());
1547                self.update_order_state_cache(
1548                    &canonical_client_id,
1549                    msg,
1550                    price_precision,
1551                    size_precision,
1552                );
1553
1554                self.pending_messages
1555                    .push_back(NautilusWsMessage::OrderAccepted(accepted));
1556            }
1557            ParsedOrderEvent::Canceled(canceled) => {
1558                self.cleanup_terminal_order(&canonical_client_id, &venue_order_id);
1559                self.pending_messages
1560                    .push_back(NautilusWsMessage::OrderCanceled(canceled));
1561            }
1562            ParsedOrderEvent::Expired(expired) => {
1563                self.cleanup_terminal_order(&canonical_client_id, &venue_order_id);
1564                self.pending_messages
1565                    .push_back(NautilusWsMessage::OrderExpired(expired));
1566            }
1567            ParsedOrderEvent::Triggered(triggered) => {
1568                self.update_order_state_cache(
1569                    &canonical_client_id,
1570                    msg,
1571                    price_precision,
1572                    size_precision,
1573                );
1574                self.pending_messages
1575                    .push_back(NautilusWsMessage::OrderTriggered(triggered));
1576            }
1577            ParsedOrderEvent::Updated(updated) => {
1578                self.update_order_state_cache(
1579                    &canonical_client_id,
1580                    msg,
1581                    price_precision,
1582                    size_precision,
1583                );
1584                self.pending_messages
1585                    .push_back(NautilusWsMessage::OrderUpdated(updated));
1586            }
1587            ParsedOrderEvent::Fill(fill_report) => {
1588                let effective_client_id = Some(canonical_client_id);
1589                let adjusted = self.adjust_execution_report(
1590                    ExecutionReport::Fill(fill_report),
1591                    &effective_client_id,
1592                    raw_child,
1593                );
1594                self.update_caches_with_report(&adjusted);
1595
1596                if msg.state == OKXOrderStatus::Filled {
1597                    self.cleanup_terminal_order(&canonical_client_id, &venue_order_id);
1598                }
1599
1600                exec_reports.push(adjusted);
1601            }
1602            ParsedOrderEvent::StatusOnly(status_report) => {
1603                let effective_client_id = Some(canonical_client_id);
1604                let adjusted = self.adjust_execution_report(
1605                    ExecutionReport::Order(*status_report),
1606                    &effective_client_id,
1607                    raw_child,
1608                );
1609                self.update_caches_with_report(&adjusted);
1610                exec_reports.push(adjusted);
1611            }
1612        }
1613    }
1614
1615    /// Updates the order state cache for detecting future updates.
1616    fn update_order_state_cache(
1617        &mut self,
1618        client_order_id: &ClientOrderId,
1619        msg: &OKXOrderMsg,
1620        price_precision: u8,
1621        size_precision: u8,
1622    ) {
1623        let venue_order_id = VenueOrderId::new(msg.ord_id);
1624        let quantity = parse_quantity(&msg.sz, size_precision).ok();
1625        let price = if !is_market_price(&msg.px) {
1626            parse_price(&msg.px, price_precision).ok()
1627        } else {
1628            None
1629        };
1630
1631        if let Some(qty) = quantity {
1632            self.order_state_cache.insert(
1633                *client_order_id,
1634                OrderStateSnapshot {
1635                    venue_order_id,
1636                    quantity: qty,
1637                    price,
1638                },
1639            );
1640        }
1641    }
1642
1643    /// Cleans up tracking state for terminal orders.
1644    fn cleanup_terminal_order(
1645        &mut self,
1646        client_order_id: &ClientOrderId,
1647        venue_order_id: &VenueOrderId,
1648    ) {
1649        self.emitted_order_accepted.remove(venue_order_id);
1650        self.order_state_cache.remove(client_order_id);
1651        self.active_client_orders.remove(client_order_id);
1652        self.client_id_aliases.remove(client_order_id);
1653        self.client_id_aliases.retain(|_, v| *v != *client_order_id);
1654
1655        self.fee_cache.remove(&venue_order_id.inner());
1656        self.filled_qty_cache.remove(&venue_order_id.inner());
1657    }
1658
1659    fn handle_algo_orders_data(
1660        &mut self,
1661        data: Value,
1662        ts_init: UnixNanos,
1663    ) -> Option<NautilusWsMessage> {
1664        let orders: Vec<OKXAlgoOrderMsg> = match serde_json::from_value(data) {
1665            Ok(orders) => orders,
1666            Err(e) => {
1667                tracing::error!("Failed to deserialize algo orders payload: {e}");
1668                return None;
1669            }
1670        };
1671
1672        let mut exec_reports: Vec<ExecutionReport> = Vec::with_capacity(orders.len());
1673
1674        for msg in orders {
1675            let raw_child = parse_client_order_id(&msg.cl_ord_id);
1676            let parent_from_msg = parse_client_order_id(&msg.algo_cl_ord_id);
1677            let effective_client_id =
1678                self.register_client_order_aliases(&raw_child, &parent_from_msg);
1679
1680            match parse_algo_order_msg(msg, self.account_id, &self.instruments_cache, ts_init) {
1681                Ok(report) => {
1682                    let adjusted =
1683                        self.adjust_execution_report(report, &effective_client_id, &raw_child);
1684                    self.update_caches_with_report(&adjusted);
1685                    exec_reports.push(adjusted);
1686                }
1687                Err(e) => {
1688                    tracing::error!("Failed to parse algo order message: {e}");
1689                }
1690            }
1691        }
1692
1693        if !exec_reports.is_empty() {
1694            Some(NautilusWsMessage::ExecutionReports(exec_reports))
1695        } else {
1696            None
1697        }
1698    }
1699
1700    fn handle_other_channel_data(
1701        &mut self,
1702        channel: OKXWsChannel,
1703        inst_id: Option<Ustr>,
1704        data: Value,
1705        ts_init: UnixNanos,
1706    ) -> Option<NautilusWsMessage> {
1707        let Some(inst_id) = inst_id else {
1708            tracing::error!("No instrument for channel {:?}", channel);
1709            return None;
1710        };
1711
1712        let Some(instrument) = self.instruments_cache.get(&inst_id) else {
1713            tracing::error!(
1714                "No instrument for channel {:?}, inst_id {:?}",
1715                channel,
1716                inst_id
1717            );
1718            return None;
1719        };
1720
1721        let instrument_id = instrument.id();
1722        let price_precision = instrument.price_precision();
1723        let size_precision = instrument.size_precision();
1724
1725        match parse_ws_message_data(
1726            &channel,
1727            data,
1728            &instrument_id,
1729            price_precision,
1730            size_precision,
1731            ts_init,
1732            &mut self.funding_rate_cache,
1733            &self.instruments_cache,
1734        ) {
1735            Ok(Some(msg)) => {
1736                if let NautilusWsMessage::Instrument(ref inst) = msg {
1737                    self.instruments_cache
1738                        .insert(inst.symbol().inner(), inst.as_ref().clone());
1739                }
1740                Some(msg)
1741            }
1742            Ok(None) => None,
1743            Err(e) => {
1744                tracing::error!("Error parsing message for channel {:?}: {e}", channel);
1745                None
1746            }
1747        }
1748    }
1749
1750    pub(crate) fn parse_raw_message(
1751        msg: tokio_tungstenite::tungstenite::Message,
1752    ) -> Option<OKXWsMessage> {
1753        match msg {
1754            tokio_tungstenite::tungstenite::Message::Text(text) => {
1755                if text == TEXT_PONG {
1756                    tracing::trace!("Received pong from OKX");
1757                    return None;
1758                }
1759                if text == TEXT_PING {
1760                    tracing::trace!("Received ping from OKX (text)");
1761                    return Some(OKXWsMessage::Ping);
1762                }
1763
1764                if text == RECONNECTED {
1765                    tracing::debug!("Received WebSocket reconnection signal");
1766                    return Some(OKXWsMessage::Reconnected);
1767                }
1768                tracing::trace!("Received WebSocket message: {text}");
1769
1770                match serde_json::from_str(&text) {
1771                    Ok(ws_event) => match &ws_event {
1772                        OKXWsMessage::Error { code, msg } => {
1773                            tracing::error!("WebSocket error: {code} - {msg}");
1774                            Some(ws_event)
1775                        }
1776                        OKXWsMessage::Login {
1777                            event,
1778                            code,
1779                            msg,
1780                            conn_id,
1781                        } => {
1782                            if code == "0" {
1783                                tracing::info!(conn_id = %conn_id, "WebSocket authenticated");
1784                            } else {
1785                                tracing::error!(event = %event, code = %code, error = %msg, "WebSocket authentication failed");
1786                            }
1787                            Some(ws_event)
1788                        }
1789                        OKXWsMessage::Subscription {
1790                            event,
1791                            arg,
1792                            conn_id,
1793                            ..
1794                        } => {
1795                            let channel_str = serde_json::to_string(&arg.channel)
1796                                .expect("Invalid OKX websocket channel")
1797                                .trim_matches('"')
1798                                .to_string();
1799                            tracing::debug!("{event}d: channel={channel_str}, conn_id={conn_id}");
1800                            Some(ws_event)
1801                        }
1802                        OKXWsMessage::ChannelConnCount {
1803                            event: _,
1804                            channel,
1805                            conn_count,
1806                            conn_id,
1807                        } => {
1808                            let channel_str = serde_json::to_string(&channel)
1809                                .expect("Invalid OKX websocket channel")
1810                                .trim_matches('"')
1811                                .to_string();
1812                            tracing::debug!(
1813                                "Channel connection status: channel={channel_str}, connections={conn_count}, conn_id={conn_id}",
1814                            );
1815                            None
1816                        }
1817                        OKXWsMessage::Ping => {
1818                            tracing::trace!("Ignoring ping event parsed from text payload");
1819                            None
1820                        }
1821                        OKXWsMessage::Data { .. } => Some(ws_event),
1822                        OKXWsMessage::BookData { .. } => Some(ws_event),
1823                        OKXWsMessage::OrderResponse {
1824                            id,
1825                            op,
1826                            code,
1827                            msg: _,
1828                            data,
1829                        } => {
1830                            if code == "0" {
1831                                tracing::debug!(
1832                                    "Order operation successful: id={:?}, op={op}, code={code}",
1833                                    id
1834                                );
1835
1836                                if let Some(order_data) = data.first() {
1837                                    let success_msg = order_data
1838                                        .get("sMsg")
1839                                        .and_then(|s| s.as_str())
1840                                        .unwrap_or("Order operation successful");
1841                                    tracing::debug!("Order success details: {success_msg}");
1842                                }
1843                            }
1844                            Some(ws_event)
1845                        }
1846                        OKXWsMessage::Reconnected => {
1847                            // This shouldn't happen as we handle RECONNECTED string directly
1848                            tracing::warn!("Unexpected Reconnected event from deserialization");
1849                            None
1850                        }
1851                    },
1852                    Err(e) => {
1853                        tracing::error!("Failed to parse message: {e}: {text}");
1854                        None
1855                    }
1856                }
1857            }
1858            Message::Ping(_payload) => {
1859                tracing::trace!("Received binary ping frame from OKX");
1860                Some(OKXWsMessage::Ping)
1861            }
1862            Message::Pong(payload) => {
1863                tracing::trace!("Received pong frame from OKX ({} bytes)", payload.len());
1864                None
1865            }
1866            Message::Binary(msg) => {
1867                tracing::debug!("Raw binary: {msg:?}");
1868                None
1869            }
1870            Message::Close(_) => {
1871                tracing::debug!("Received close message");
1872                None
1873            }
1874            msg => {
1875                tracing::warn!("Unexpected message: {msg}");
1876                None
1877            }
1878        }
1879    }
1880
1881    fn generate_unique_request_id(&self) -> String {
1882        self.request_id_counter
1883            .fetch_add(1, Ordering::SeqCst)
1884            .to_string()
1885    }
1886
1887    fn get_instrument_type_and_family_from_instrument(
1888        instrument: &InstrumentAny,
1889    ) -> anyhow::Result<(OKXInstrumentType, String)> {
1890        let inst_type = okx_instrument_type(instrument)?;
1891        let symbol = instrument.symbol().inner();
1892
1893        // Determine instrument family based on instrument type
1894        let inst_family = match instrument {
1895            InstrumentAny::CurrencyPair(_) => symbol.as_str().to_string(),
1896            InstrumentAny::CryptoPerpetual(_) => {
1897                // For SWAP: "BTC-USDT-SWAP" -> "BTC-USDT"
1898                symbol
1899                    .as_str()
1900                    .strip_suffix("-SWAP")
1901                    .unwrap_or(symbol.as_str())
1902                    .to_string()
1903            }
1904            InstrumentAny::CryptoFuture(_) => {
1905                // For FUTURES: "BTC-USDT-250328" -> "BTC-USDT"
1906                // Extract the base pair by removing date suffix
1907                let s = symbol.as_str();
1908                if let Some(idx) = s.rfind('-') {
1909                    s[..idx].to_string()
1910                } else {
1911                    s.to_string()
1912                }
1913            }
1914            _ => {
1915                anyhow::bail!("Unsupported instrument type for OKX");
1916            }
1917        };
1918
1919        Ok((inst_type, inst_family))
1920    }
1921
1922    async fn handle_mass_cancel(&mut self, instrument_id: InstrumentId) -> anyhow::Result<()> {
1923        let instrument = self
1924            .instruments_cache
1925            .get(&instrument_id.symbol.inner())
1926            .ok_or_else(|| anyhow::anyhow!("Unknown instrument {instrument_id}"))?;
1927
1928        let (inst_type, inst_family) =
1929            Self::get_instrument_type_and_family_from_instrument(instrument)?;
1930
1931        let params = WsMassCancelParams {
1932            inst_type,
1933            inst_family: Ustr::from(&inst_family),
1934        };
1935
1936        let args =
1937            vec![serde_json::to_value(params).map_err(|e| anyhow::anyhow!("JSON error: {e}"))?];
1938
1939        let request_id = self.generate_unique_request_id();
1940
1941        self.pending_mass_cancel_requests
1942            .insert(request_id.clone(), instrument_id);
1943
1944        let request = OKXWsRequest {
1945            id: Some(request_id.clone()),
1946            op: OKXWsOperation::MassCancel,
1947            exp_time: None,
1948            args,
1949        };
1950
1951        let payload = serde_json::to_string(&request)
1952            .map_err(|e| anyhow::anyhow!("Failed to serialize mass cancel request: {e}"))?;
1953
1954        match self
1955            .send_with_retry(payload, Some(vec![OKX_RATE_LIMIT_KEY_CANCEL.to_string()]))
1956            .await
1957        {
1958            Ok(()) => {
1959                tracing::debug!("Sent mass cancel for {instrument_id}");
1960                Ok(())
1961            }
1962            Err(e) => {
1963                tracing::error!(error = %e, "Failed to send mass cancel after retries");
1964
1965                self.pending_mass_cancel_requests.remove(&request_id);
1966
1967                let error = OKXWebSocketError {
1968                    code: "CLIENT_ERROR".to_string(),
1969                    message: format!("Mass cancel failed for {instrument_id}: {e}"),
1970                    conn_id: None,
1971                    timestamp: self.clock.get_time_ns().as_u64(),
1972                };
1973                let _ = self.send(NautilusWsMessage::Error(error));
1974
1975                Err(anyhow::anyhow!("Failed to send mass cancel: {e}"))
1976            }
1977        }
1978    }
1979
1980    async fn handle_batch_cancel_orders(
1981        &self,
1982        args: Vec<Value>,
1983        request_id: String,
1984    ) -> anyhow::Result<()> {
1985        let request = OKXWsRequest {
1986            id: Some(request_id),
1987            op: OKXWsOperation::BatchCancelOrders,
1988            exp_time: None,
1989            args,
1990        };
1991
1992        let payload = serde_json::to_string(&request)
1993            .map_err(|e| anyhow::anyhow!("Failed to serialize batch cancel request: {e}"))?;
1994
1995        if let Some(client) = &self.inner {
1996            client
1997                .send_text(payload, Some(vec![OKX_RATE_LIMIT_KEY_CANCEL.to_string()]))
1998                .await
1999                .map_err(|e| anyhow::anyhow!("Failed to send batch cancel: {e}"))?;
2000            tracing::debug!("Sent batch cancel orders");
2001            Ok(())
2002        } else {
2003            Err(anyhow::anyhow!("No active WebSocket client"))
2004        }
2005    }
2006
2007    async fn handle_batch_place_orders(
2008        &self,
2009        args: Vec<Value>,
2010        request_id: String,
2011    ) -> anyhow::Result<()> {
2012        let request = OKXWsRequest {
2013            id: Some(request_id),
2014            op: OKXWsOperation::BatchOrders,
2015            exp_time: None,
2016            args,
2017        };
2018
2019        let payload = serde_json::to_string(&request)
2020            .map_err(|e| anyhow::anyhow!("Failed to serialize batch place request: {e}"))?;
2021
2022        if let Some(client) = &self.inner {
2023            client
2024                .send_text(payload, Some(vec![OKX_RATE_LIMIT_KEY_ORDER.to_string()]))
2025                .await
2026                .map_err(|e| anyhow::anyhow!("Failed to send batch place: {e}"))?;
2027            tracing::debug!("Sent batch place orders");
2028            Ok(())
2029        } else {
2030            Err(anyhow::anyhow!("No active WebSocket client"))
2031        }
2032    }
2033
2034    async fn handle_batch_amend_orders(
2035        &self,
2036        args: Vec<Value>,
2037        request_id: String,
2038    ) -> anyhow::Result<()> {
2039        let request = OKXWsRequest {
2040            id: Some(request_id),
2041            op: OKXWsOperation::BatchAmendOrders,
2042            exp_time: None,
2043            args,
2044        };
2045
2046        let payload = serde_json::to_string(&request)
2047            .map_err(|e| anyhow::anyhow!("Failed to serialize batch amend request: {e}"))?;
2048
2049        if let Some(client) = &self.inner {
2050            client
2051                .send_text(payload, Some(vec![OKX_RATE_LIMIT_KEY_AMEND.to_string()]))
2052                .await
2053                .map_err(|e| anyhow::anyhow!("Failed to send batch amend: {e}"))?;
2054            tracing::debug!("Sent batch amend orders");
2055            Ok(())
2056        } else {
2057            Err(anyhow::anyhow!("No active WebSocket client"))
2058        }
2059    }
2060
2061    async fn handle_subscribe(&self, args: Vec<OKXSubscriptionArg>) -> anyhow::Result<()> {
2062        for arg in &args {
2063            tracing::debug!(channel = ?arg.channel, inst_id = ?arg.inst_id, "Subscribing to channel");
2064        }
2065
2066        let message = OKXSubscription {
2067            op: OKXWsOperation::Subscribe,
2068            args,
2069        };
2070
2071        let json_txt = serde_json::to_string(&message)
2072            .map_err(|e| anyhow::anyhow!("Failed to serialize subscription: {e}"))?;
2073
2074        self.send_with_retry(
2075            json_txt,
2076            Some(vec![OKX_RATE_LIMIT_KEY_SUBSCRIPTION.to_string()]),
2077        )
2078        .await
2079        .map_err(|e| anyhow::anyhow!("Failed to send subscription after retries: {e}"))?;
2080        Ok(())
2081    }
2082
2083    async fn handle_unsubscribe(&self, args: Vec<OKXSubscriptionArg>) -> anyhow::Result<()> {
2084        for arg in &args {
2085            tracing::debug!(channel = ?arg.channel, inst_id = ?arg.inst_id, "Unsubscribing from channel");
2086        }
2087
2088        let message = OKXSubscription {
2089            op: OKXWsOperation::Unsubscribe,
2090            args,
2091        };
2092
2093        let json_txt = serde_json::to_string(&message)
2094            .map_err(|e| anyhow::anyhow!("Failed to serialize unsubscription: {e}"))?;
2095
2096        self.send_with_retry(
2097            json_txt,
2098            Some(vec![OKX_RATE_LIMIT_KEY_SUBSCRIPTION.to_string()]),
2099        )
2100        .await
2101        .map_err(|e| anyhow::anyhow!("Failed to send unsubscription after retries: {e}"))?;
2102        Ok(())
2103    }
2104
2105    async fn handle_place_order(
2106        &mut self,
2107        params: WsPostOrderParams,
2108        client_order_id: ClientOrderId,
2109        trader_id: TraderId,
2110        strategy_id: StrategyId,
2111        instrument_id: InstrumentId,
2112    ) -> anyhow::Result<()> {
2113        let request_id = self.generate_unique_request_id();
2114
2115        self.pending_place_requests.insert(
2116            request_id.clone(),
2117            (
2118                PendingOrderParams::Regular(params.clone()),
2119                client_order_id,
2120                trader_id,
2121                strategy_id,
2122                instrument_id,
2123            ),
2124        );
2125
2126        let request = OKXWsRequest {
2127            id: Some(request_id.clone()),
2128            op: OKXWsOperation::Order,
2129            exp_time: None,
2130            args: vec![params],
2131        };
2132
2133        let payload = serde_json::to_string(&request)
2134            .map_err(|e| anyhow::anyhow!("Failed to serialize place order request: {e}"))?;
2135
2136        match self
2137            .send_with_retry(payload, Some(vec![OKX_RATE_LIMIT_KEY_ORDER.to_string()]))
2138            .await
2139        {
2140            Ok(()) => {
2141                tracing::debug!("Sent place order request");
2142                Ok(())
2143            }
2144            Err(e) => {
2145                tracing::error!(error = %e, "Failed to send place order after retries");
2146
2147                self.pending_place_requests.remove(&request_id);
2148
2149                let ts_now = self.clock.get_time_ns();
2150                let rejected = OrderRejected::new(
2151                    trader_id,
2152                    strategy_id,
2153                    instrument_id,
2154                    client_order_id,
2155                    self.account_id,
2156                    Ustr::from(&format!("WebSocket send failed: {e}")),
2157                    UUID4::new(),
2158                    ts_now, // ts_event
2159                    ts_now, // ts_init
2160                    false,  // Not from reconciliation
2161                    false,  // Not due to post-only
2162                );
2163                let _ = self.send(NautilusWsMessage::OrderRejected(rejected));
2164
2165                Err(anyhow::anyhow!("Failed to send place order: {e}"))
2166            }
2167        }
2168    }
2169
2170    async fn handle_place_algo_order(
2171        &mut self,
2172        params: WsPostAlgoOrderParams,
2173        client_order_id: ClientOrderId,
2174        trader_id: TraderId,
2175        strategy_id: StrategyId,
2176        instrument_id: InstrumentId,
2177    ) -> anyhow::Result<()> {
2178        let request_id = self.generate_unique_request_id();
2179
2180        self.pending_place_requests.insert(
2181            request_id.clone(),
2182            (
2183                PendingOrderParams::Algo(params.clone()),
2184                client_order_id,
2185                trader_id,
2186                strategy_id,
2187                instrument_id,
2188            ),
2189        );
2190
2191        let request = OKXWsRequest {
2192            id: Some(request_id.clone()),
2193            op: OKXWsOperation::OrderAlgo,
2194            exp_time: None,
2195            args: vec![params],
2196        };
2197
2198        let payload = serde_json::to_string(&request)
2199            .map_err(|e| anyhow::anyhow!("Failed to serialize place algo order request: {e}"))?;
2200
2201        match self
2202            .send_with_retry(payload, Some(vec![OKX_RATE_LIMIT_KEY_ORDER.to_string()]))
2203            .await
2204        {
2205            Ok(()) => {
2206                tracing::debug!("Sent place algo order request");
2207                Ok(())
2208            }
2209            Err(e) => {
2210                tracing::error!(error = %e, "Failed to send place algo order after retries");
2211
2212                self.pending_place_requests.remove(&request_id);
2213
2214                let ts_now = self.clock.get_time_ns();
2215                let rejected = OrderRejected::new(
2216                    trader_id,
2217                    strategy_id,
2218                    instrument_id,
2219                    client_order_id,
2220                    self.account_id,
2221                    Ustr::from(&format!("WebSocket send failed: {e}")),
2222                    UUID4::new(),
2223                    ts_now, // ts_event
2224                    ts_now, // ts_init
2225                    false,  // Not from reconciliation
2226                    false,  // Not due to post-only
2227                );
2228                let _ = self.send(NautilusWsMessage::OrderRejected(rejected));
2229
2230                Err(anyhow::anyhow!("Failed to send place algo order: {e}"))
2231            }
2232        }
2233    }
2234
2235    async fn handle_cancel_order(
2236        &mut self,
2237        client_order_id: Option<ClientOrderId>,
2238        venue_order_id: Option<VenueOrderId>,
2239        instrument_id: InstrumentId,
2240        trader_id: TraderId,
2241        strategy_id: StrategyId,
2242    ) -> anyhow::Result<()> {
2243        let mut builder = WsCancelOrderParamsBuilder::default();
2244        builder.inst_id(instrument_id.symbol.as_str());
2245
2246        if let Some(venue_order_id) = venue_order_id {
2247            builder.ord_id(venue_order_id.as_str());
2248        }
2249
2250        if let Some(client_order_id) = client_order_id {
2251            builder.cl_ord_id(client_order_id.as_str());
2252        }
2253
2254        let params = builder
2255            .build()
2256            .map_err(|e| anyhow::anyhow!("Failed to build cancel params: {e}"))?;
2257
2258        let request_id = self.generate_unique_request_id();
2259
2260        // Track pending request if we have a client order ID
2261        if let Some(client_order_id) = client_order_id {
2262            self.pending_cancel_requests.insert(
2263                request_id.clone(),
2264                (
2265                    client_order_id,
2266                    trader_id,
2267                    strategy_id,
2268                    instrument_id,
2269                    venue_order_id,
2270                ),
2271            );
2272        }
2273
2274        let request = OKXWsRequest {
2275            id: Some(request_id.clone()),
2276            op: OKXWsOperation::CancelOrder,
2277            exp_time: None,
2278            args: vec![params],
2279        };
2280
2281        let payload = serde_json::to_string(&request)
2282            .map_err(|e| anyhow::anyhow!("Failed to serialize cancel request: {e}"))?;
2283
2284        match self
2285            .send_with_retry(payload, Some(vec![OKX_RATE_LIMIT_KEY_CANCEL.to_string()]))
2286            .await
2287        {
2288            Ok(()) => {
2289                tracing::debug!("Sent cancel order request");
2290                Ok(())
2291            }
2292            Err(e) => {
2293                tracing::error!(error = %e, "Failed to send cancel order after retries");
2294
2295                self.pending_cancel_requests.remove(&request_id);
2296
2297                if let Some(client_order_id) = client_order_id {
2298                    let ts_now = self.clock.get_time_ns();
2299                    let rejected = OrderCancelRejected::new(
2300                        trader_id,
2301                        strategy_id,
2302                        instrument_id,
2303                        client_order_id,
2304                        Ustr::from(&format!("WebSocket send failed: {e}")),
2305                        UUID4::new(),
2306                        ts_now, // ts_event
2307                        ts_now, // ts_init
2308                        false,  // Not from reconciliation
2309                        venue_order_id,
2310                        Some(self.account_id),
2311                    );
2312                    let _ = self.send(NautilusWsMessage::OrderCancelRejected(rejected));
2313                }
2314
2315                Err(anyhow::anyhow!("Failed to send cancel order: {e}"))
2316            }
2317        }
2318    }
2319
2320    async fn handle_amend_order(
2321        &mut self,
2322        params: WsAmendOrderParams,
2323        client_order_id: ClientOrderId,
2324        trader_id: TraderId,
2325        strategy_id: StrategyId,
2326        instrument_id: InstrumentId,
2327        venue_order_id: Option<VenueOrderId>,
2328    ) -> anyhow::Result<()> {
2329        let request_id = self.generate_unique_request_id();
2330
2331        self.pending_amend_requests.insert(
2332            request_id.clone(),
2333            (
2334                client_order_id,
2335                trader_id,
2336                strategy_id,
2337                instrument_id,
2338                venue_order_id,
2339            ),
2340        );
2341
2342        let request = OKXWsRequest {
2343            id: Some(request_id.clone()),
2344            op: OKXWsOperation::AmendOrder,
2345            exp_time: None,
2346            args: vec![params],
2347        };
2348
2349        let payload = serde_json::to_string(&request)
2350            .map_err(|e| anyhow::anyhow!("Failed to serialize amend order request: {e}"))?;
2351
2352        match self
2353            .send_with_retry(payload, Some(vec![OKX_RATE_LIMIT_KEY_AMEND.to_string()]))
2354            .await
2355        {
2356            Ok(()) => {
2357                tracing::debug!("Sent amend order request");
2358                Ok(())
2359            }
2360            Err(e) => {
2361                tracing::error!(error = %e, "Failed to send amend order after retries");
2362
2363                self.pending_amend_requests.remove(&request_id);
2364
2365                let ts_now = self.clock.get_time_ns();
2366                let rejected = OrderModifyRejected::new(
2367                    trader_id,
2368                    strategy_id,
2369                    instrument_id,
2370                    client_order_id,
2371                    Ustr::from(&format!("WebSocket send failed: {e}")),
2372                    UUID4::new(),
2373                    ts_now, // ts_event
2374                    ts_now, // ts_init
2375                    false,  // Not from reconciliation
2376                    venue_order_id,
2377                    Some(self.account_id),
2378                );
2379                let _ = self.send(NautilusWsMessage::OrderModifyRejected(rejected));
2380
2381                Err(anyhow::anyhow!("Failed to send amend order: {e}"))
2382            }
2383        }
2384    }
2385
2386    async fn handle_cancel_algo_order(
2387        &mut self,
2388        client_order_id: Option<ClientOrderId>,
2389        algo_order_id: Option<VenueOrderId>,
2390        instrument_id: InstrumentId,
2391        trader_id: TraderId,
2392        strategy_id: StrategyId,
2393    ) -> anyhow::Result<()> {
2394        let mut builder = WsCancelAlgoOrderParamsBuilder::default();
2395        builder.inst_id(instrument_id.symbol.as_str());
2396
2397        if let Some(client_order_id) = &client_order_id {
2398            builder.algo_cl_ord_id(client_order_id.as_str());
2399        }
2400
2401        if let Some(algo_id) = &algo_order_id {
2402            builder.algo_id(algo_id.as_str());
2403        }
2404
2405        let params = builder
2406            .build()
2407            .map_err(|e| anyhow::anyhow!("Failed to build cancel algo params: {e}"))?;
2408
2409        let request_id = self.generate_unique_request_id();
2410
2411        // Track pending cancellation if we have a client order ID
2412        if let Some(client_order_id) = client_order_id {
2413            self.pending_cancel_requests.insert(
2414                request_id.clone(),
2415                (client_order_id, trader_id, strategy_id, instrument_id, None),
2416            );
2417        }
2418
2419        let request = OKXWsRequest {
2420            id: Some(request_id.clone()),
2421            op: OKXWsOperation::CancelAlgos,
2422            exp_time: None,
2423            args: vec![params],
2424        };
2425
2426        let payload = serde_json::to_string(&request)
2427            .map_err(|e| anyhow::anyhow!("Failed to serialize cancel algo request: {e}"))?;
2428
2429        match self
2430            .send_with_retry(payload, Some(vec![OKX_RATE_LIMIT_KEY_CANCEL.to_string()]))
2431            .await
2432        {
2433            Ok(()) => {
2434                tracing::debug!("Sent cancel algo order request");
2435                Ok(())
2436            }
2437            Err(e) => {
2438                tracing::error!(error = %e, "Failed to send cancel algo order after retries");
2439
2440                self.pending_cancel_requests.remove(&request_id);
2441
2442                if let Some(client_order_id) = client_order_id {
2443                    let ts_now = self.clock.get_time_ns();
2444                    let rejected = OrderCancelRejected::new(
2445                        trader_id,
2446                        strategy_id,
2447                        instrument_id,
2448                        client_order_id,
2449                        Ustr::from(&format!("WebSocket send failed: {e}")),
2450                        UUID4::new(),
2451                        ts_now, // ts_event
2452                        ts_now, // ts_init
2453                        false,  // Not from reconciliation
2454                        None,
2455                        Some(self.account_id),
2456                    );
2457                    let _ = self.send(NautilusWsMessage::OrderCancelRejected(rejected));
2458                }
2459
2460                Err(anyhow::anyhow!("Failed to send cancel algo order: {e}"))
2461            }
2462        }
2463    }
2464}
2465
2466/// Returns `true` when an OKX error payload represents a post-only rejection.
2467pub fn is_post_only_rejection(code: &str, data: &[Value]) -> bool {
2468    if code == OKX_POST_ONLY_ERROR_CODE {
2469        return true;
2470    }
2471
2472    for entry in data {
2473        if let Some(s_code) = entry.get("sCode").and_then(|value| value.as_str())
2474            && s_code == OKX_POST_ONLY_ERROR_CODE
2475        {
2476            return true;
2477        }
2478
2479        if let Some(inner_code) = entry.get("code").and_then(|value| value.as_str())
2480            && inner_code == OKX_POST_ONLY_ERROR_CODE
2481        {
2482            return true;
2483        }
2484    }
2485
2486    false
2487}
2488
2489/// Case-insensitive substring check.
2490#[inline]
2491fn contains_ignore_ascii_case(haystack: &str, needle: &str) -> bool {
2492    haystack
2493        .as_bytes()
2494        .windows(needle.len())
2495        .any(|window| window.eq_ignore_ascii_case(needle.as_bytes()))
2496}
2497
2498/// Determines if an OKX WebSocket error should trigger a retry.
2499fn should_retry_okx_error(error: &OKXWsError) -> bool {
2500    match error {
2501        OKXWsError::OkxError { error_code, .. } => should_retry_error_code(error_code),
2502        OKXWsError::TungsteniteError(_) => true, // Network errors are retryable
2503        OKXWsError::ClientError(msg) => {
2504            // Retry on timeout and connection errors
2505            contains_ignore_ascii_case(msg, "timeout")
2506                || contains_ignore_ascii_case(msg, "timed out")
2507                || contains_ignore_ascii_case(msg, "connection")
2508                || contains_ignore_ascii_case(msg, "network")
2509        }
2510        OKXWsError::AuthenticationError(_)
2511        | OKXWsError::JsonError(_)
2512        | OKXWsError::ParsingError(_) => {
2513            // Don't retry authentication or parsing errors automatically
2514            false
2515        }
2516    }
2517}
2518
2519/// Creates a timeout error for the retry manager.
2520fn create_okx_timeout_error(msg: String) -> OKXWsError {
2521    OKXWsError::ClientError(msg)
2522}
2523
2524#[cfg(test)]
2525mod tests {
2526    use std::sync::{Arc, atomic::AtomicBool};
2527
2528    use ahash::AHashMap;
2529    use dashmap::DashMap;
2530    use nautilus_model::{
2531        identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
2532        types::{Money, Quantity},
2533    };
2534    use nautilus_network::websocket::{AuthTracker, SubscriptionState};
2535    use rstest::rstest;
2536
2537    use super::{NautilusWsMessage, OKXWsFeedHandler};
2538    use crate::websocket::parse::OrderStateSnapshot;
2539
2540    const OKX_WS_TOPIC_DELIMITER: char = ':';
2541
2542    #[allow(clippy::type_complexity)]
2543    fn create_test_handler() -> (
2544        OKXWsFeedHandler,
2545        tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>,
2546        Arc<DashMap<ClientOrderId, (TraderId, StrategyId, InstrumentId)>>,
2547        Arc<DashMap<ClientOrderId, ClientOrderId>>,
2548    ) {
2549        let account_id = AccountId::new("OKX-001");
2550        let signal = Arc::new(AtomicBool::new(false));
2551        let (_cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
2552        let (_raw_tx, raw_rx) = tokio::sync::mpsc::unbounded_channel();
2553        let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel();
2554        let active_client_orders = Arc::new(DashMap::new());
2555        let client_id_aliases = Arc::new(DashMap::new());
2556        let auth_tracker = AuthTracker::new();
2557        let subscriptions_state = SubscriptionState::new(OKX_WS_TOPIC_DELIMITER);
2558
2559        let handler = OKXWsFeedHandler::new(
2560            account_id,
2561            signal,
2562            cmd_rx,
2563            raw_rx,
2564            out_tx,
2565            active_client_orders.clone(),
2566            client_id_aliases.clone(),
2567            auth_tracker,
2568            subscriptions_state,
2569        );
2570
2571        (handler, out_rx, active_client_orders, client_id_aliases)
2572    }
2573
2574    #[rstest]
2575    fn test_is_post_only_rejection_detects_by_code() {
2576        assert!(super::is_post_only_rejection("51019", &[]));
2577    }
2578
2579    #[rstest]
2580    fn test_is_post_only_rejection_detects_by_inner_code() {
2581        let data = vec![serde_json::json!({
2582            "sCode": "51019"
2583        })];
2584        assert!(super::is_post_only_rejection("50000", &data));
2585    }
2586
2587    #[rstest]
2588    fn test_is_post_only_rejection_false_for_unrelated_error() {
2589        let data = vec![serde_json::json!({
2590            "sMsg": "Insufficient balance"
2591        })];
2592        assert!(!super::is_post_only_rejection("50000", &data));
2593    }
2594
2595    #[rstest]
2596    fn test_cleanup_alias_removes_canonical_entry() {
2597        let aliases: DashMap<ClientOrderId, ClientOrderId> = DashMap::new();
2598        let canonical = ClientOrderId::new("PARENT-001");
2599        aliases.insert(canonical, canonical);
2600
2601        aliases.remove(&canonical);
2602        aliases.retain(|_, v| *v != canonical);
2603
2604        assert!(!aliases.contains_key(&canonical));
2605        assert!(aliases.is_empty());
2606    }
2607
2608    #[rstest]
2609    fn test_cleanup_alias_removes_child_alias_pointing_to_canonical() {
2610        let aliases: DashMap<ClientOrderId, ClientOrderId> = DashMap::new();
2611        let canonical = ClientOrderId::new("PARENT-001");
2612        let child = ClientOrderId::new("CHILD-001");
2613        aliases.insert(canonical, canonical);
2614        aliases.insert(child, canonical);
2615
2616        aliases.remove(&canonical);
2617        aliases.retain(|_, v| *v != canonical);
2618
2619        assert!(!aliases.contains_key(&canonical));
2620        assert!(!aliases.contains_key(&child));
2621        assert!(aliases.is_empty());
2622    }
2623
2624    #[rstest]
2625    fn test_cleanup_alias_does_not_affect_unrelated_entries() {
2626        let aliases: DashMap<ClientOrderId, ClientOrderId> = DashMap::new();
2627        let canonical1 = ClientOrderId::new("PARENT-001");
2628        let child1 = ClientOrderId::new("CHILD-001");
2629        let canonical2 = ClientOrderId::new("PARENT-002");
2630        let child2 = ClientOrderId::new("CHILD-002");
2631        aliases.insert(canonical1, canonical1);
2632        aliases.insert(child1, canonical1);
2633        aliases.insert(canonical2, canonical2);
2634        aliases.insert(child2, canonical2);
2635
2636        aliases.remove(&canonical1);
2637        aliases.retain(|_, v| *v != canonical1);
2638
2639        assert!(!aliases.contains_key(&canonical1));
2640        assert!(!aliases.contains_key(&child1));
2641        assert!(aliases.contains_key(&canonical2));
2642        assert!(aliases.contains_key(&child2));
2643        assert_eq!(aliases.len(), 2);
2644    }
2645
2646    #[rstest]
2647    fn test_cleanup_alias_handles_multiple_children() {
2648        let aliases: DashMap<ClientOrderId, ClientOrderId> = DashMap::new();
2649        let canonical = ClientOrderId::new("PARENT-001");
2650        let child1 = ClientOrderId::new("CHILD-001");
2651        let child2 = ClientOrderId::new("CHILD-002");
2652        let child3 = ClientOrderId::new("CHILD-003");
2653        aliases.insert(canonical, canonical);
2654        aliases.insert(child1, canonical);
2655        aliases.insert(child2, canonical);
2656        aliases.insert(child3, canonical);
2657
2658        aliases.remove(&canonical);
2659        aliases.retain(|_, v| *v != canonical);
2660
2661        assert!(aliases.is_empty());
2662    }
2663
2664    #[rstest]
2665    fn test_cleanup_removes_from_all_caches() {
2666        let emitted_accepted: DashMap<VenueOrderId, ()> = DashMap::new();
2667        let order_state_cache: AHashMap<ClientOrderId, u32> = AHashMap::new();
2668        let active_orders: DashMap<ClientOrderId, ()> = DashMap::new();
2669        let aliases: DashMap<ClientOrderId, ClientOrderId> = DashMap::new();
2670        let fee_cache: AHashMap<ustr::Ustr, f64> = AHashMap::new();
2671        let filled_qty_cache: AHashMap<ustr::Ustr, f64> = AHashMap::new();
2672        let canonical = ClientOrderId::new("PARENT-001");
2673        let child = ClientOrderId::new("CHILD-001");
2674        let venue_id = VenueOrderId::new("VENUE-001");
2675
2676        emitted_accepted.insert(venue_id, ());
2677        let mut order_state = order_state_cache;
2678        order_state.insert(canonical, 1);
2679        active_orders.insert(canonical, ());
2680        aliases.insert(canonical, canonical);
2681        aliases.insert(child, canonical);
2682        let mut fees = fee_cache;
2683        fees.insert(venue_id.inner(), 0.001);
2684        let mut filled = filled_qty_cache;
2685        filled.insert(venue_id.inner(), 1.0);
2686
2687        emitted_accepted.remove(&venue_id);
2688        order_state.remove(&canonical);
2689        active_orders.remove(&canonical);
2690        aliases.remove(&canonical);
2691        aliases.retain(|_, v| *v != canonical);
2692        fees.remove(&venue_id.inner());
2693        filled.remove(&venue_id.inner());
2694
2695        assert!(emitted_accepted.is_empty());
2696        assert!(order_state.is_empty());
2697        assert!(active_orders.is_empty());
2698        assert!(aliases.is_empty());
2699        assert!(fees.is_empty());
2700        assert!(filled.is_empty());
2701    }
2702
2703    #[rstest]
2704    fn test_alias_registration_parent_with_child() {
2705        let aliases: DashMap<ClientOrderId, ClientOrderId> = DashMap::new();
2706        let parent = ClientOrderId::new("PARENT-001");
2707        let child = ClientOrderId::new("CHILD-001");
2708        aliases.insert(parent, parent);
2709        aliases.insert(child, parent);
2710
2711        assert_eq!(*aliases.get(&parent).unwrap(), parent);
2712        assert_eq!(*aliases.get(&child).unwrap(), parent);
2713    }
2714
2715    #[rstest]
2716    fn test_alias_registration_standalone_order() {
2717        let aliases: DashMap<ClientOrderId, ClientOrderId> = DashMap::new();
2718        let order_id = ClientOrderId::new("ORDER-001");
2719        aliases.insert(order_id, order_id);
2720
2721        assert_eq!(*aliases.get(&order_id).unwrap(), order_id);
2722    }
2723
2724    #[rstest]
2725    fn test_alias_lookup_returns_canonical() {
2726        let aliases: DashMap<ClientOrderId, ClientOrderId> = DashMap::new();
2727        let canonical = ClientOrderId::new("PARENT-001");
2728        let child = ClientOrderId::new("CHILD-001");
2729
2730        aliases.insert(canonical, canonical);
2731        aliases.insert(child, canonical);
2732
2733        let resolved = aliases.get(&child).map(|v| *v);
2734        assert_eq!(resolved, Some(canonical));
2735    }
2736
2737    #[rstest]
2738    fn test_handler_register_client_order_aliases_with_parent() {
2739        let (handler, _out_rx, _active, client_id_aliases) = create_test_handler();
2740
2741        let child = Some(ClientOrderId::new("CHILD-001"));
2742        let parent = Some(ClientOrderId::new("PARENT-001"));
2743
2744        let result = handler.register_client_order_aliases(&child, &parent);
2745
2746        assert_eq!(result, Some(ClientOrderId::new("PARENT-001")));
2747        assert!(client_id_aliases.contains_key(&ClientOrderId::new("PARENT-001")));
2748        assert!(client_id_aliases.contains_key(&ClientOrderId::new("CHILD-001")));
2749        assert_eq!(
2750            *client_id_aliases
2751                .get(&ClientOrderId::new("CHILD-001"))
2752                .unwrap(),
2753            ClientOrderId::new("PARENT-001")
2754        );
2755    }
2756
2757    #[rstest]
2758    fn test_handler_register_client_order_aliases_without_parent() {
2759        let (handler, _out_rx, _active, client_id_aliases) = create_test_handler();
2760
2761        let child = Some(ClientOrderId::new("ORDER-001"));
2762        let parent: Option<ClientOrderId> = None;
2763
2764        let result = handler.register_client_order_aliases(&child, &parent);
2765
2766        assert_eq!(result, Some(ClientOrderId::new("ORDER-001")));
2767        assert!(client_id_aliases.contains_key(&ClientOrderId::new("ORDER-001")));
2768        assert_eq!(
2769            *client_id_aliases
2770                .get(&ClientOrderId::new("ORDER-001"))
2771                .unwrap(),
2772            ClientOrderId::new("ORDER-001")
2773        );
2774    }
2775
2776    #[rstest]
2777    fn test_handler_cleanup_terminal_order_removes_all_state() {
2778        let (mut handler, _out_rx, active_client_orders, client_id_aliases) = create_test_handler();
2779
2780        let canonical = ClientOrderId::new("PARENT-001");
2781        let child = ClientOrderId::new("CHILD-001");
2782        let venue_id = VenueOrderId::new("VENUE-001");
2783        let trader_id = TraderId::new("TRADER-001");
2784        let strategy_id = StrategyId::new("STRATEGY-001");
2785        let instrument_id = InstrumentId::from("ETH-USDT-PERP.OKX");
2786
2787        active_client_orders.insert(canonical, (trader_id, strategy_id, instrument_id));
2788        client_id_aliases.insert(canonical, canonical);
2789        client_id_aliases.insert(child, canonical);
2790        handler
2791            .fee_cache
2792            .insert(venue_id.inner(), Money::from("0.001 USDT"));
2793        handler
2794            .filled_qty_cache
2795            .insert(venue_id.inner(), Quantity::from("1.0"));
2796        handler.order_state_cache.insert(
2797            canonical,
2798            OrderStateSnapshot {
2799                venue_order_id: venue_id,
2800                quantity: Quantity::from("1.0"),
2801                price: None,
2802            },
2803        );
2804
2805        handler.cleanup_terminal_order(&canonical, &venue_id);
2806
2807        assert!(!active_client_orders.contains_key(&canonical));
2808        assert!(!client_id_aliases.contains_key(&canonical));
2809        assert!(!client_id_aliases.contains_key(&child));
2810        assert!(!handler.fee_cache.contains_key(&venue_id.inner()));
2811        assert!(!handler.filled_qty_cache.contains_key(&venue_id.inner()));
2812        assert!(!handler.order_state_cache.contains_key(&canonical));
2813    }
2814
2815    #[rstest]
2816    fn test_handler_cleanup_terminal_order_removes_multiple_children() {
2817        let (mut handler, _out_rx, _active, client_id_aliases) = create_test_handler();
2818
2819        let canonical = ClientOrderId::new("PARENT-001");
2820        let child1 = ClientOrderId::new("CHILD-001");
2821        let child2 = ClientOrderId::new("CHILD-002");
2822        let child3 = ClientOrderId::new("CHILD-003");
2823        let venue_id = VenueOrderId::new("VENUE-001");
2824
2825        client_id_aliases.insert(canonical, canonical);
2826        client_id_aliases.insert(child1, canonical);
2827        client_id_aliases.insert(child2, canonical);
2828        client_id_aliases.insert(child3, canonical);
2829
2830        handler.cleanup_terminal_order(&canonical, &venue_id);
2831
2832        assert!(!client_id_aliases.contains_key(&canonical));
2833        assert!(!client_id_aliases.contains_key(&child1));
2834        assert!(!client_id_aliases.contains_key(&child2));
2835        assert!(!client_id_aliases.contains_key(&child3));
2836        assert!(client_id_aliases.is_empty());
2837    }
2838
2839    #[rstest]
2840    fn test_handler_cleanup_does_not_affect_other_orders() {
2841        let (mut handler, _out_rx, active_client_orders, client_id_aliases) = create_test_handler();
2842
2843        let canonical1 = ClientOrderId::new("PARENT-001");
2844        let child1 = ClientOrderId::new("CHILD-001");
2845        let venue_id1 = VenueOrderId::new("VENUE-001");
2846
2847        let canonical2 = ClientOrderId::new("PARENT-002");
2848        let child2 = ClientOrderId::new("CHILD-002");
2849        let venue_id2 = VenueOrderId::new("VENUE-002");
2850
2851        let trader_id = TraderId::new("TRADER-001");
2852        let strategy_id = StrategyId::new("STRATEGY-001");
2853        let instrument_id = InstrumentId::from("ETH-USDT-PERP.OKX");
2854
2855        active_client_orders.insert(canonical1, (trader_id, strategy_id, instrument_id));
2856        active_client_orders.insert(canonical2, (trader_id, strategy_id, instrument_id));
2857        client_id_aliases.insert(canonical1, canonical1);
2858        client_id_aliases.insert(child1, canonical1);
2859        client_id_aliases.insert(canonical2, canonical2);
2860        client_id_aliases.insert(child2, canonical2);
2861        handler
2862            .fee_cache
2863            .insert(venue_id1.inner(), Money::from("0.001 USDT"));
2864        handler
2865            .fee_cache
2866            .insert(venue_id2.inner(), Money::from("0.002 USDT"));
2867
2868        handler.cleanup_terminal_order(&canonical1, &venue_id1);
2869
2870        assert!(!active_client_orders.contains_key(&canonical1));
2871        assert!(!client_id_aliases.contains_key(&canonical1));
2872        assert!(!client_id_aliases.contains_key(&child1));
2873        assert!(!handler.fee_cache.contains_key(&venue_id1.inner()));
2874
2875        assert!(active_client_orders.contains_key(&canonical2));
2876        assert!(client_id_aliases.contains_key(&canonical2));
2877        assert!(client_id_aliases.contains_key(&child2));
2878        assert!(handler.fee_cache.contains_key(&venue_id2.inner()));
2879    }
2880
2881    // ==================================================================================
2882    // Channel routing integration tests
2883    // ==================================================================================
2884
2885    mod channel_routing {
2886        use nautilus_core::nanos::UnixNanos;
2887        use nautilus_model::{
2888            identifiers::{InstrumentId, Symbol},
2889            instruments::{CryptoPerpetual, CurrencyPair, Instrument, InstrumentAny},
2890            types::{Currency, Price, Quantity},
2891        };
2892        use rstest::rstest;
2893        use ustr::Ustr;
2894
2895        use super::*;
2896        use crate::{
2897            common::{enums::OKXBookAction, testing::load_test_json},
2898            websocket::{enums::OKXWsChannel, messages::OKXWsMessage},
2899        };
2900
2901        fn create_spot_instrument() -> InstrumentAny {
2902            let instrument_id = InstrumentId::from("BTC-USDT.OKX");
2903            InstrumentAny::CurrencyPair(CurrencyPair::new(
2904                instrument_id,
2905                Symbol::from("BTC-USDT"),
2906                Currency::BTC(),
2907                Currency::USDT(),
2908                2,
2909                8,
2910                Price::from("0.01"),
2911                Quantity::from("0.00000001"),
2912                None, // multiplier
2913                None, // lot_size
2914                None, // max_quantity
2915                None, // min_quantity
2916                None, // max_notional
2917                None, // min_notional
2918                None, // max_price
2919                None, // min_price
2920                None, // margin_init
2921                None, // margin_maint
2922                None, // maker_fee
2923                None, // taker_fee
2924                UnixNanos::default(),
2925                UnixNanos::default(),
2926            ))
2927        }
2928
2929        fn create_swap_instrument() -> InstrumentAny {
2930            let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
2931            InstrumentAny::CryptoPerpetual(CryptoPerpetual::new(
2932                instrument_id,
2933                Symbol::from("BTC-USDT-SWAP"),
2934                Currency::BTC(),
2935                Currency::USDT(),
2936                Currency::USDT(),
2937                false,
2938                2,
2939                8,
2940                Price::from("0.01"),
2941                Quantity::from("0.00000001"),
2942                None,
2943                None,
2944                None,
2945                None,
2946                None,
2947                None,
2948                None,
2949                None,
2950                None,
2951                None,
2952                None,
2953                None,
2954                UnixNanos::default(),
2955                UnixNanos::default(),
2956            ))
2957        }
2958
2959        fn create_handler_with_instruments(instruments: Vec<InstrumentAny>) -> OKXWsFeedHandler {
2960            let (mut handler, _, _, _) = create_test_handler();
2961            for inst in instruments {
2962                handler
2963                    .instruments_cache
2964                    .insert(inst.symbol().inner(), inst);
2965            }
2966            handler
2967        }
2968
2969        #[rstest]
2970        fn test_parse_raw_message_ticker_channel() {
2971            let json = load_test_json("ws_tickers.json");
2972            let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
2973
2974            match msg {
2975                OKXWsMessage::Data { arg, data } => {
2976                    assert!(
2977                        matches!(arg.channel, OKXWsChannel::Tickers),
2978                        "Expected Tickers channel"
2979                    );
2980                    assert_eq!(arg.inst_id, Some(Ustr::from("BTC-USDT")));
2981                    assert!(data.is_array());
2982                }
2983                _ => panic!("Expected OKXWsMessage::Data variant"),
2984            }
2985        }
2986
2987        #[rstest]
2988        fn test_parse_raw_message_trades_channel() {
2989            let json = load_test_json("ws_trades.json");
2990            let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
2991
2992            match msg {
2993                OKXWsMessage::Data { arg, data } => {
2994                    assert!(
2995                        matches!(arg.channel, OKXWsChannel::Trades),
2996                        "Expected Trades channel"
2997                    );
2998                    assert_eq!(arg.inst_id, Some(Ustr::from("BTC-USD")));
2999                    assert!(data.is_array());
3000                }
3001                _ => panic!("Expected OKXWsMessage::Data variant"),
3002            }
3003        }
3004
3005        #[rstest]
3006        fn test_parse_raw_message_books_channel() {
3007            let json = load_test_json("ws_books_snapshot.json");
3008            let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3009
3010            match msg {
3011                OKXWsMessage::BookData { arg, action, data } => {
3012                    assert!(
3013                        matches!(arg.channel, OKXWsChannel::Books),
3014                        "Expected Books channel"
3015                    );
3016                    assert_eq!(arg.inst_id, Some(Ustr::from("BTC-USDT")));
3017                    assert!(
3018                        matches!(action, OKXBookAction::Snapshot),
3019                        "Expected snapshot action"
3020                    );
3021                    assert!(!data.is_empty());
3022                }
3023                _ => panic!("Expected OKXWsMessage::BookData variant"),
3024            }
3025        }
3026
3027        #[rstest]
3028        fn test_parse_raw_message_candle_channel() {
3029            let json = load_test_json("ws_candle.json");
3030            let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3031
3032            match msg {
3033                OKXWsMessage::Data { arg, data } => {
3034                    // Candle channel variant is Candle1Day for "candle1D"
3035                    assert!(
3036                        matches!(arg.channel, OKXWsChannel::Candle1Day),
3037                        "Expected Candle1Day channel, got {:?}",
3038                        arg.channel
3039                    );
3040                    assert_eq!(arg.inst_id, Some(Ustr::from("BTC-USDT")));
3041                    assert!(data.is_array());
3042                }
3043                _ => panic!("Expected OKXWsMessage::Data variant"),
3044            }
3045        }
3046
3047        #[rstest]
3048        fn test_parse_raw_message_funding_rate_channel() {
3049            let json = load_test_json("ws_funding_rate.json");
3050            let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3051
3052            match msg {
3053                OKXWsMessage::Data { arg, data } => {
3054                    assert!(
3055                        matches!(arg.channel, OKXWsChannel::FundingRate),
3056                        "Expected FundingRate channel"
3057                    );
3058                    assert_eq!(arg.inst_id, Some(Ustr::from("BTC-USDT-SWAP")));
3059                    assert!(data.is_array());
3060                }
3061                _ => panic!("Expected OKXWsMessage::Data variant"),
3062            }
3063        }
3064
3065        #[rstest]
3066        fn test_parse_raw_message_bbo_tbt_channel() {
3067            let json = load_test_json("ws_bbo_tbt.json");
3068            let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3069
3070            match msg {
3071                OKXWsMessage::Data { arg, data } => {
3072                    assert!(
3073                        matches!(arg.channel, OKXWsChannel::BboTbt),
3074                        "Expected BboTbt channel"
3075                    );
3076                    assert!(data.is_array());
3077                }
3078                _ => panic!("Expected OKXWsMessage::Data variant"),
3079            }
3080        }
3081
3082        #[rstest]
3083        fn test_handle_other_channel_data_tickers() {
3084            let mut handler = create_handler_with_instruments(vec![create_spot_instrument()]);
3085            let json = load_test_json("ws_tickers.json");
3086            let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3087
3088            let OKXWsMessage::Data { arg, data } = msg else {
3089                panic!("Expected OKXWsMessage::Data");
3090            };
3091
3092            let ts_init = UnixNanos::from(1_000_000_000u64);
3093            let result = handler.handle_other_channel_data(arg.channel, arg.inst_id, data, ts_init);
3094
3095            assert!(result.is_some());
3096            match result.unwrap() {
3097                NautilusWsMessage::Data(payloads) => {
3098                    assert!(!payloads.is_empty(), "Should produce data payloads");
3099                }
3100                other => panic!("Expected NautilusWsMessage::Data, got {other:?}"),
3101            }
3102        }
3103
3104        #[rstest]
3105        fn test_handle_other_channel_data_trades() {
3106            // Create instrument with BTC-USD symbol (matches test data)
3107            let instrument_id = InstrumentId::from("BTC-USD.OKX");
3108            let instrument = InstrumentAny::CurrencyPair(CurrencyPair::new(
3109                instrument_id,
3110                Symbol::from("BTC-USD"),
3111                Currency::BTC(),
3112                Currency::USD(),
3113                1,
3114                8,
3115                Price::from("0.1"),
3116                Quantity::from("0.00000001"),
3117                None, // multiplier
3118                None, // lot_size
3119                None, // max_quantity
3120                None, // min_quantity
3121                None, // max_notional
3122                None, // min_notional
3123                None, // max_price
3124                None, // min_price
3125                None, // margin_init
3126                None, // margin_maint
3127                None, // maker_fee
3128                None, // taker_fee
3129                UnixNanos::default(),
3130                UnixNanos::default(),
3131            ));
3132
3133            let mut handler = create_handler_with_instruments(vec![instrument]);
3134            let json = load_test_json("ws_trades.json");
3135            let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3136
3137            let OKXWsMessage::Data { arg, data } = msg else {
3138                panic!("Expected OKXWsMessage::Data");
3139            };
3140
3141            let ts_init = UnixNanos::from(1_000_000_000u64);
3142            let result = handler.handle_other_channel_data(arg.channel, arg.inst_id, data, ts_init);
3143
3144            assert!(result.is_some());
3145            match result.unwrap() {
3146                NautilusWsMessage::Data(payloads) => {
3147                    assert!(!payloads.is_empty(), "Should produce trade data payloads");
3148                }
3149                other => panic!("Expected NautilusWsMessage::Data, got {other:?}"),
3150            }
3151        }
3152
3153        #[rstest]
3154        fn test_handle_book_data_snapshot() {
3155            let handler = create_handler_with_instruments(vec![create_spot_instrument()]);
3156            let json = load_test_json("ws_books_snapshot.json");
3157            let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3158
3159            let OKXWsMessage::BookData { arg, action, data } = msg else {
3160                panic!("Expected OKXWsMessage::BookData");
3161            };
3162
3163            let ts_init = UnixNanos::from(1_000_000_000u64);
3164            let result = handler.handle_book_data(arg, action, data, ts_init);
3165
3166            assert!(result.is_some());
3167            match result.unwrap() {
3168                NautilusWsMessage::Data(payloads) => {
3169                    assert!(!payloads.is_empty(), "Should produce order book payloads");
3170                }
3171                other => panic!("Expected NautilusWsMessage::Data, got {other:?}"),
3172            }
3173        }
3174
3175        #[rstest]
3176        fn test_handle_book_data_update() {
3177            let handler = create_handler_with_instruments(vec![create_spot_instrument()]);
3178            let json = load_test_json("ws_books_update.json");
3179            let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3180
3181            let OKXWsMessage::BookData { arg, action, data } = msg else {
3182                panic!("Expected OKXWsMessage::BookData");
3183            };
3184
3185            let ts_init = UnixNanos::from(1_000_000_000u64);
3186            let result = handler.handle_book_data(arg, action, data, ts_init);
3187
3188            assert!(result.is_some());
3189            match result.unwrap() {
3190                NautilusWsMessage::Data(payloads) => {
3191                    assert!(
3192                        !payloads.is_empty(),
3193                        "Should produce order book delta payloads"
3194                    );
3195                }
3196                other => panic!("Expected NautilusWsMessage::Data, got {other:?}"),
3197            }
3198        }
3199
3200        #[rstest]
3201        fn test_handle_other_channel_data_candles() {
3202            let mut handler = create_handler_with_instruments(vec![create_spot_instrument()]);
3203            let json = load_test_json("ws_candle.json");
3204            let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3205
3206            let OKXWsMessage::Data { arg, data } = msg else {
3207                panic!("Expected OKXWsMessage::Data");
3208            };
3209
3210            let ts_init = UnixNanos::from(1_000_000_000u64);
3211            let result = handler.handle_other_channel_data(arg.channel, arg.inst_id, data, ts_init);
3212
3213            assert!(result.is_some());
3214            match result.unwrap() {
3215                NautilusWsMessage::Data(payloads) => {
3216                    assert!(!payloads.is_empty(), "Should produce bar data payloads");
3217                }
3218                other => panic!("Expected NautilusWsMessage::Data, got {other:?}"),
3219            }
3220        }
3221
3222        #[rstest]
3223        fn test_handle_other_channel_data_funding_rate() {
3224            let mut handler = create_handler_with_instruments(vec![create_swap_instrument()]);
3225            let json = load_test_json("ws_funding_rate.json");
3226            let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3227
3228            let OKXWsMessage::Data { arg, data } = msg else {
3229                panic!("Expected OKXWsMessage::Data");
3230            };
3231
3232            let ts_init = UnixNanos::from(1_000_000_000u64);
3233            let result = handler.handle_other_channel_data(arg.channel, arg.inst_id, data, ts_init);
3234
3235            // Funding rate returns FundingRates variant when rate changes
3236            assert!(result.is_none() || matches!(result, Some(NautilusWsMessage::FundingRates(_))));
3237        }
3238
3239        #[rstest]
3240        fn test_handle_account_data_parses_successfully() {
3241            let mut handler = create_handler_with_instruments(vec![]);
3242            let json = load_test_json("ws_account.json");
3243            let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3244
3245            let OKXWsMessage::Data { data, .. } = msg else {
3246                panic!("Expected OKXWsMessage::Data");
3247            };
3248
3249            let ts_init = UnixNanos::from(1_000_000_000u64);
3250            let result = handler.handle_account_data(data, ts_init);
3251
3252            assert!(result.is_some());
3253            match result.unwrap() {
3254                NautilusWsMessage::AccountUpdate(account_state) => {
3255                    assert!(
3256                        !account_state.balances.is_empty(),
3257                        "Should have balance data"
3258                    );
3259                }
3260                other => panic!("Expected NautilusWsMessage::AccountUpdate, got {other:?}"),
3261            }
3262        }
3263
3264        #[rstest]
3265        fn test_handle_other_channel_data_missing_instrument() {
3266            let mut handler = create_handler_with_instruments(vec![]);
3267            let json = load_test_json("ws_tickers.json");
3268            let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3269
3270            let OKXWsMessage::Data { arg, data } = msg else {
3271                panic!("Expected OKXWsMessage::Data");
3272            };
3273
3274            let ts_init = UnixNanos::from(1_000_000_000u64);
3275            let result = handler.handle_other_channel_data(arg.channel, arg.inst_id, data, ts_init);
3276
3277            // Should return None when instrument is not in cache
3278            assert!(result.is_none());
3279        }
3280
3281        #[rstest]
3282        fn test_handle_book_data_missing_instrument() {
3283            let handler = create_handler_with_instruments(vec![]);
3284            let json = load_test_json("ws_books_snapshot.json");
3285            let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3286
3287            let OKXWsMessage::BookData { arg, action, data } = msg else {
3288                panic!("Expected OKXWsMessage::BookData");
3289            };
3290
3291            let ts_init = UnixNanos::from(1_000_000_000u64);
3292            let result = handler.handle_book_data(arg, action, data, ts_init);
3293
3294            // Should return None when instrument is not in cache
3295            assert!(result.is_none());
3296        }
3297    }
3298}