nautilus_okx/websocket/
handler.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! WebSocket message handler for OKX.
17//!
18//! The handler runs in a dedicated Tokio task as the I/O boundary between the client
19//! orchestrator and the network layer. It exclusively owns the `WebSocketClient` and
20//! processes commands from the client via an unbounded channel, serializing them to JSON
21//! and sending via the WebSocket. Raw messages are received from the network, deserialized,
22//! and transformed into `NautilusWsMessage` events which are emitted back to the client.
23//!
24//! Key responsibilities:
25//! - Command processing: Receives `HandlerCommand` from client, executes WebSocket operations.
26//! - Message transformation: Parses raw venue messages into Nautilus domain events.
27//! - Pending state tracking: Owns `AHashMap` for matching requests/responses (single-threaded).
28//! - Retry logic: Retries transient WebSocket send failures using `RetryManager`.
29//! - Error event emission: Emits `OrderRejected`, `OrderCancelRejected` when retries exhausted.
30
31use std::{
32    collections::VecDeque,
33    sync::{
34        Arc,
35        atomic::{AtomicBool, AtomicU64, Ordering},
36    },
37};
38
39use ahash::AHashMap;
40use dashmap::DashMap;
41use nautilus_core::{AtomicTime, UUID4, nanos::UnixNanos, time::get_atomic_clock_realtime};
42use nautilus_model::{
43    enums::{OrderStatus, OrderType},
44    events::{
45        AccountState, OrderAccepted, OrderCancelRejected, OrderModifyRejected, OrderRejected,
46    },
47    identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
48    instruments::{Instrument, InstrumentAny},
49    types::{Money, Quantity},
50};
51use nautilus_network::{
52    RECONNECTED,
53    retry::{RetryManager, create_websocket_retry_manager},
54    websocket::{AuthTracker, SubscriptionState, TEXT_PING, TEXT_PONG, WebSocketClient},
55};
56use serde_json::Value;
57use tokio_tungstenite::tungstenite::Message;
58use ustr::Ustr;
59
60use super::{
61    enums::{OKXSubscriptionEvent, OKXWsChannel, OKXWsOperation},
62    error::OKXWsError,
63    messages::{
64        ExecutionReport, NautilusWsMessage, OKXAlgoOrderMsg, OKXBookMsg, OKXOrderMsg,
65        OKXSubscription, OKXSubscriptionArg, OKXWebSocketArg, OKXWebSocketError, OKXWsMessage,
66        OKXWsRequest, WsAmendOrderParams, WsCancelAlgoOrderParamsBuilder,
67        WsCancelOrderParamsBuilder, WsMassCancelParams, WsPostAlgoOrderParams, WsPostOrderParams,
68    },
69    parse::{
70        OrderStateSnapshot, ParsedOrderEvent, parse_algo_order_msg, parse_book_msg_vec,
71        parse_order_event, parse_order_msg, parse_ws_message_data,
72    },
73    subscription::topic_from_websocket_arg,
74};
75use crate::{
76    common::{
77        consts::{
78            OKX_POST_ONLY_CANCEL_REASON, OKX_POST_ONLY_CANCEL_SOURCE, OKX_POST_ONLY_ERROR_CODE,
79            should_retry_error_code,
80        },
81        enums::{
82            OKXBookAction, OKXInstrumentType, OKXOrderStatus, OKXSide, OKXTargetCurrency,
83            OKXTradeMode,
84        },
85        parse::{
86            determine_order_type, is_market_price, okx_instrument_type, parse_account_state,
87            parse_client_order_id, parse_millisecond_timestamp, parse_position_status_report,
88            parse_price, parse_quantity,
89        },
90    },
91    http::models::{OKXAccount, OKXPosition},
92    websocket::client::{
93        OKX_RATE_LIMIT_KEY_AMEND, OKX_RATE_LIMIT_KEY_CANCEL, OKX_RATE_LIMIT_KEY_ORDER,
94        OKX_RATE_LIMIT_KEY_SUBSCRIPTION,
95    },
96};
97
98/// Data cached for pending place requests to correlate with responses.
99type PlaceRequestData = (
100    PendingOrderParams,
101    ClientOrderId,
102    TraderId,
103    StrategyId,
104    InstrumentId,
105);
106
107/// Data cached for pending cancel requests to correlate with responses.
108type CancelRequestData = (
109    ClientOrderId,
110    TraderId,
111    StrategyId,
112    InstrumentId,
113    Option<VenueOrderId>,
114);
115
116/// Data cached for pending amend requests to correlate with responses.
117type AmendRequestData = (
118    ClientOrderId,
119    TraderId,
120    StrategyId,
121    InstrumentId,
122    Option<VenueOrderId>,
123);
124
125#[derive(Debug)]
126pub enum PendingOrderParams {
127    Regular(WsPostOrderParams),
128    Algo(WsPostAlgoOrderParams),
129}
130
131/// Commands sent from the outer client to the inner message handler.
132#[allow(
133    clippy::large_enum_variant,
134    reason = "Commands are ephemeral and immediately consumed"
135)]
136#[allow(missing_debug_implementations)]
137pub enum HandlerCommand {
138    SetClient(WebSocketClient),
139    Disconnect,
140    Authenticate {
141        payload: String,
142    },
143    InitializeInstruments(Vec<InstrumentAny>),
144    UpdateInstrument(InstrumentAny),
145    Subscribe {
146        args: Vec<OKXSubscriptionArg>,
147    },
148    Unsubscribe {
149        args: Vec<OKXSubscriptionArg>,
150    },
151    PlaceOrder {
152        params: WsPostOrderParams,
153        client_order_id: ClientOrderId,
154        trader_id: TraderId,
155        strategy_id: StrategyId,
156        instrument_id: InstrumentId,
157    },
158    PlaceAlgoOrder {
159        params: WsPostAlgoOrderParams,
160        client_order_id: ClientOrderId,
161        trader_id: TraderId,
162        strategy_id: StrategyId,
163        instrument_id: InstrumentId,
164    },
165    AmendOrder {
166        params: WsAmendOrderParams,
167        client_order_id: ClientOrderId,
168        trader_id: TraderId,
169        strategy_id: StrategyId,
170        instrument_id: InstrumentId,
171        venue_order_id: Option<VenueOrderId>,
172    },
173    CancelOrder {
174        client_order_id: Option<ClientOrderId>,
175        venue_order_id: Option<VenueOrderId>,
176        instrument_id: InstrumentId,
177        trader_id: TraderId,
178        strategy_id: StrategyId,
179    },
180    CancelAlgoOrder {
181        client_order_id: Option<ClientOrderId>,
182        algo_order_id: Option<VenueOrderId>,
183        instrument_id: InstrumentId,
184        trader_id: TraderId,
185        strategy_id: StrategyId,
186    },
187    MassCancel {
188        instrument_id: InstrumentId,
189    },
190    BatchPlaceOrders {
191        args: Vec<Value>,
192        request_id: String,
193    },
194    BatchAmendOrders {
195        args: Vec<Value>,
196        request_id: String,
197    },
198    BatchCancelOrders {
199        args: Vec<Value>,
200        request_id: String,
201    },
202}
203
204pub(super) struct OKXWsFeedHandler {
205    clock: &'static AtomicTime,
206    account_id: AccountId,
207    signal: Arc<AtomicBool>,
208    inner: Option<WebSocketClient>,
209    cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
210    raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
211    out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
212    auth_tracker: AuthTracker,
213    subscriptions_state: SubscriptionState,
214    retry_manager: RetryManager<OKXWsError>,
215    pending_place_requests: AHashMap<String, PlaceRequestData>,
216    pending_cancel_requests: AHashMap<String, CancelRequestData>,
217    pending_amend_requests: AHashMap<String, AmendRequestData>,
218    pending_mass_cancel_requests: AHashMap<String, InstrumentId>,
219    pending_messages: VecDeque<NautilusWsMessage>,
220    active_client_orders: Arc<DashMap<ClientOrderId, (TraderId, StrategyId, InstrumentId)>>,
221    client_id_aliases: Arc<DashMap<ClientOrderId, ClientOrderId>>,
222    emitted_order_accepted: AHashMap<VenueOrderId, ()>,
223    instruments_cache: AHashMap<Ustr, InstrumentAny>,
224    fee_cache: AHashMap<Ustr, Money>,           // Key is order ID
225    filled_qty_cache: AHashMap<Ustr, Quantity>, // Key is order ID
226    order_state_cache: AHashMap<ClientOrderId, OrderStateSnapshot>,
227    funding_rate_cache: AHashMap<Ustr, (Ustr, u64)>, // Cache (funding_rate, funding_time) by inst_id
228    last_account_state: Option<AccountState>,
229    request_id_counter: AtomicU64,
230}
231
232impl OKXWsFeedHandler {
233    /// Creates a new [`OKXWsFeedHandler`] instance.
234    #[allow(clippy::too_many_arguments)]
235    pub fn new(
236        account_id: AccountId,
237        signal: Arc<AtomicBool>,
238        cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
239        raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
240        out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
241        active_client_orders: Arc<DashMap<ClientOrderId, (TraderId, StrategyId, InstrumentId)>>,
242        client_id_aliases: Arc<DashMap<ClientOrderId, ClientOrderId>>,
243        auth_tracker: AuthTracker,
244        subscriptions_state: SubscriptionState,
245    ) -> Self {
246        Self {
247            clock: get_atomic_clock_realtime(),
248            account_id,
249            signal,
250            inner: None,
251            cmd_rx,
252            raw_rx,
253            out_tx,
254            auth_tracker,
255            subscriptions_state,
256            retry_manager: create_websocket_retry_manager(),
257            pending_place_requests: AHashMap::new(),
258            pending_cancel_requests: AHashMap::new(),
259            pending_amend_requests: AHashMap::new(),
260            pending_mass_cancel_requests: AHashMap::new(),
261            pending_messages: VecDeque::new(),
262            active_client_orders,
263            client_id_aliases,
264            emitted_order_accepted: AHashMap::new(),
265            instruments_cache: AHashMap::new(),
266            fee_cache: AHashMap::new(),
267            filled_qty_cache: AHashMap::new(),
268            order_state_cache: AHashMap::new(),
269            funding_rate_cache: AHashMap::new(),
270            last_account_state: None,
271            request_id_counter: AtomicU64::new(0),
272        }
273    }
274
275    pub(super) fn is_stopped(&self) -> bool {
276        self.signal.load(std::sync::atomic::Ordering::Acquire)
277    }
278
279    pub(super) fn send(&self, msg: NautilusWsMessage) -> Result<(), ()> {
280        self.out_tx.send(msg).map_err(|_| ())
281    }
282
283    async fn send_with_retry(
284        &self,
285        payload: String,
286        rate_limit_keys: Option<Vec<String>>,
287    ) -> Result<(), OKXWsError> {
288        if let Some(client) = &self.inner {
289            self.retry_manager
290                .execute_with_retry(
291                    "websocket_send",
292                    || {
293                        let payload = payload.clone();
294                        let keys = rate_limit_keys.clone();
295                        async move {
296                            client
297                                .send_text(payload, keys)
298                                .await
299                                .map_err(|e| OKXWsError::ClientError(format!("Send failed: {e}")))
300                        }
301                    },
302                    should_retry_okx_error,
303                    create_okx_timeout_error,
304                )
305                .await
306        } else {
307            Err(OKXWsError::ClientError(
308                "No active WebSocket client".to_string(),
309            ))
310        }
311    }
312
313    pub(super) async fn send_pong(&self) -> anyhow::Result<()> {
314        match self.send_with_retry(TEXT_PONG.to_string(), None).await {
315            Ok(()) => {
316                log::trace!("Sent pong response to OKX text ping");
317                Ok(())
318            }
319            Err(e) => {
320                log::warn!("Failed to send pong after retries: error={e}");
321                Err(anyhow::anyhow!("Failed to send pong: {e}"))
322            }
323        }
324    }
325
326    pub(super) async fn next(&mut self) -> Option<NautilusWsMessage> {
327        if let Some(message) = self.pending_messages.pop_front() {
328            return Some(message);
329        }
330
331        loop {
332            tokio::select! {
333                Some(cmd) = self.cmd_rx.recv() => {
334                    match cmd {
335                        HandlerCommand::SetClient(client) => {
336                            log::debug!("Handler received WebSocket client");
337                            self.inner = Some(client);
338                        }
339                        HandlerCommand::Disconnect => {
340                            log::debug!("Handler disconnecting WebSocket client");
341                            self.inner = None;
342                            return None;
343                        }
344                        HandlerCommand::Authenticate { payload } => {
345                            if let Err(e) = self.send_with_retry(payload, Some(vec![OKX_RATE_LIMIT_KEY_SUBSCRIPTION.to_string()])).await {
346                                log::error!("Failed to send authentication message after retries: error={e}");
347                            }
348                        }
349                        HandlerCommand::InitializeInstruments(instruments) => {
350                            for inst in instruments {
351                                self.instruments_cache.insert(inst.symbol().inner(), inst);
352                            }
353                        }
354                        HandlerCommand::UpdateInstrument(inst) => {
355                            self.instruments_cache.insert(inst.symbol().inner(), inst);
356                        }
357                        HandlerCommand::Subscribe { args } => {
358                            if let Err(e) = self.handle_subscribe(args).await {
359                                log::error!("Failed to handle subscribe command: error={e}");
360                            }
361                        }
362                        HandlerCommand::Unsubscribe { args } => {
363                            if let Err(e) = self.handle_unsubscribe(args).await {
364                                log::error!("Failed to handle unsubscribe command: error={e}");
365                            }
366                        }
367                        HandlerCommand::CancelOrder {
368                            client_order_id,
369                            venue_order_id,
370                            instrument_id,
371                            trader_id,
372                            strategy_id,
373                        } => {
374                            if let Err(e) = self
375                                .handle_cancel_order(
376                                    client_order_id,
377                                    venue_order_id,
378                                    instrument_id,
379                                    trader_id,
380                                    strategy_id,
381                                )
382                                .await
383                            {
384                                log::error!("Failed to handle cancel order command: error={e}");
385                            }
386                        }
387                        HandlerCommand::CancelAlgoOrder {
388                            client_order_id,
389                            algo_order_id,
390                            instrument_id,
391                            trader_id,
392                            strategy_id,
393                        } => {
394                            if let Err(e) = self
395                                .handle_cancel_algo_order(
396                                    client_order_id,
397                                    algo_order_id,
398                                    instrument_id,
399                                    trader_id,
400                                    strategy_id,
401                                )
402                                .await
403                            {
404                                log::error!("Failed to handle cancel algo order command: error={e}");
405                            }
406                        }
407                        HandlerCommand::PlaceOrder {
408                            params,
409                            client_order_id,
410                            trader_id,
411                            strategy_id,
412                            instrument_id,
413                        } => {
414                            if let Err(e) = self
415                                .handle_place_order(
416                                    params,
417                                    client_order_id,
418                                    trader_id,
419                                    strategy_id,
420                                    instrument_id,
421                                )
422                                .await
423                            {
424                                log::error!("Failed to handle place order command: error={e}");
425                            }
426                        }
427                        HandlerCommand::PlaceAlgoOrder {
428                            params,
429                            client_order_id,
430                            trader_id,
431                            strategy_id,
432                            instrument_id,
433                        } => {
434                            if let Err(e) = self
435                                .handle_place_algo_order(
436                                    params,
437                                    client_order_id,
438                                    trader_id,
439                                    strategy_id,
440                                    instrument_id,
441                                )
442                                .await
443                            {
444                                log::error!("Failed to handle place algo order command: error={e}");
445                            }
446                        }
447                        HandlerCommand::AmendOrder {
448                            params,
449                            client_order_id,
450                            trader_id,
451                            strategy_id,
452                            instrument_id,
453                            venue_order_id,
454                        } => {
455                            if let Err(e) = self
456                                .handle_amend_order(
457                                    params,
458                                    client_order_id,
459                                    trader_id,
460                                    strategy_id,
461                                    instrument_id,
462                                    venue_order_id,
463                                )
464                                .await
465                            {
466                                log::error!("Failed to handle amend order command: error={e}");
467                            }
468                        }
469                        HandlerCommand::MassCancel { instrument_id } => {
470                            if let Err(e) = self.handle_mass_cancel(instrument_id).await {
471                                log::error!("Failed to handle mass cancel command: error={e}");
472                            }
473                        }
474                        HandlerCommand::BatchCancelOrders { args, request_id } => {
475                            if let Err(e) = self.handle_batch_cancel_orders(args, request_id).await {
476                                log::error!("Failed to handle batch cancel orders command: error={e}");
477                            }
478                        }
479                        HandlerCommand::BatchPlaceOrders { args, request_id } => {
480                            if let Err(e) = self.handle_batch_place_orders(args, request_id).await {
481                                log::error!("Failed to handle batch place orders command: error={e}");
482                            }
483                        }
484                        HandlerCommand::BatchAmendOrders { args, request_id } => {
485                            if let Err(e) = self.handle_batch_amend_orders(args, request_id).await {
486                                log::error!("Failed to handle batch amend orders command: error={e}");
487                            }
488                        }
489                    }
490                    // Continue processing following command
491                    continue;
492                }
493
494                () = tokio::time::sleep(std::time::Duration::from_millis(100)) => {
495                    if self.signal.load(std::sync::atomic::Ordering::Acquire) {
496                        log::debug!("Stop signal received during idle period");
497                        return None;
498                    }
499                    continue;
500                }
501
502                msg = self.raw_rx.recv() => {
503                    let event = match msg {
504                        Some(msg) => match Self::parse_raw_message(msg) {
505                            Some(event) => event,
506                            None => continue,
507                        },
508                        None => {
509                            log::debug!("WebSocket stream closed");
510                            return None;
511                        }
512                    };
513
514                    let ts_init = self.clock.get_time_ns();
515
516            match event {
517                OKXWsMessage::Ping => {
518                    if let Err(e) = self.send_pong().await {
519                        log::warn!("Failed to send pong response: error={e}");
520                    }
521                    continue;
522                }
523                OKXWsMessage::Login {
524                    code, msg, conn_id, ..
525                } => {
526                    if code == "0" {
527                        self.auth_tracker.succeed();
528
529                        // Must return immediately to deliver Authenticated message.
530                        // Using push_back() + continue blocks the select! loop and prevents
531                        // the spawn block from receiving this event, breaking reconnection flow.
532                        return Some(NautilusWsMessage::Authenticated);
533                    }
534
535                    log::error!("WebSocket authentication failed: error={msg}");
536                    self.auth_tracker.fail(msg.clone());
537
538                    let error = OKXWebSocketError {
539                        code,
540                        message: msg,
541                        conn_id: Some(conn_id),
542                        timestamp: self.clock.get_time_ns().as_u64(),
543                    };
544                    self.pending_messages
545                        .push_back(NautilusWsMessage::Error(error));
546                    continue;
547                }
548                OKXWsMessage::BookData { arg, action, data } => {
549                    if let Some(msg) = self.handle_book_data(arg, action, data, ts_init) {
550                        return Some(msg);
551                    }
552                    continue;
553                }
554                OKXWsMessage::OrderResponse {
555                    id,
556                    op,
557                    code,
558                    msg,
559                    data,
560                } => {
561                    if let Some(msg) = self.handle_order_response(id, op, code, msg, data, ts_init) {
562                        return Some(msg);
563                    }
564                    continue;
565                }
566                OKXWsMessage::Data { arg, data } => {
567                    let OKXWebSocketArg {
568                        channel, inst_id, ..
569                    } = arg;
570
571                    match channel {
572                        OKXWsChannel::Account => {
573                            if let Some(msg) = self.handle_account_data(data, ts_init) {
574                                return Some(msg);
575                            }
576                            continue;
577                        }
578                        OKXWsChannel::Positions => {
579                            self.handle_positions_data(data, ts_init);
580                            continue;
581                        }
582                        OKXWsChannel::Orders => {
583                            if let Some(msg) = self.handle_orders_data(data, ts_init) {
584                                return Some(msg);
585                            }
586                            continue;
587                        }
588                        OKXWsChannel::OrdersAlgo => {
589                            if let Some(msg) = self.handle_algo_orders_data(data, ts_init) {
590                                return Some(msg);
591                            }
592                            continue;
593                        }
594                        _ => {
595                            if let Some(msg) =
596                                self.handle_other_channel_data(channel, inst_id, data, ts_init)
597                            {
598                                return Some(msg);
599                            }
600                            continue;
601                        }
602                    }
603                }
604                OKXWsMessage::Error { code, msg } => {
605                    let error = OKXWebSocketError {
606                        code,
607                        message: msg,
608                        conn_id: None,
609                        timestamp: self.clock.get_time_ns().as_u64(),
610                    };
611                    return Some(NautilusWsMessage::Error(error));
612                }
613                OKXWsMessage::Reconnected => {
614                    return Some(NautilusWsMessage::Reconnected);
615                }
616                OKXWsMessage::Subscription {
617                    event,
618                    arg,
619                    code,
620                    msg,
621                    ..
622                } => {
623                    let topic = topic_from_websocket_arg(&arg);
624                    let success = code.as_deref().is_none_or(|c| c == "0");
625
626                    match event {
627                        OKXSubscriptionEvent::Subscribe => {
628                            if success {
629                                self.subscriptions_state.confirm_subscribe(&topic);
630                            } else {
631                                log::warn!("Subscription failed: topic={topic:?}, error={msg:?}, code={code:?}");
632                                self.subscriptions_state.mark_failure(&topic);
633                            }
634                        }
635                        OKXSubscriptionEvent::Unsubscribe => {
636                            if success {
637                                self.subscriptions_state.confirm_unsubscribe(&topic);
638                            } else {
639                                log::warn!("Unsubscription failed - restoring subscription: topic={topic:?}, error={msg:?}, code={code:?}");
640                                // Venue rejected unsubscribe, so we're still subscribed. Restore state:
641                                self.subscriptions_state.confirm_unsubscribe(&topic); // Clear pending_unsubscribe
642                                self.subscriptions_state.mark_subscribe(&topic);      // Mark as subscribing
643                                self.subscriptions_state.confirm_subscribe(&topic);   // Confirm subscription
644                            }
645                        }
646                    }
647
648                    continue;
649                }
650                OKXWsMessage::ChannelConnCount { .. } => continue,
651            }
652                }
653
654                // Handle shutdown - either channel closed or stream ended
655                else => {
656                    log::debug!("Handler shutting down: stream ended or command channel closed");
657                    return None;
658                }
659            }
660        }
661    }
662
663    pub(super) fn is_post_only_auto_cancel(msg: &OKXOrderMsg) -> bool {
664        if msg.state != OKXOrderStatus::Canceled {
665            return false;
666        }
667
668        let cancel_source_matches = matches!(
669            msg.cancel_source.as_deref(),
670            Some(source) if source == OKX_POST_ONLY_CANCEL_SOURCE
671        );
672
673        let reason_matches = matches!(
674            msg.cancel_source_reason.as_deref(),
675            Some(reason) if reason.contains("POST_ONLY")
676        );
677
678        if !(cancel_source_matches || reason_matches) {
679            return false;
680        }
681
682        msg.acc_fill_sz
683            .as_ref()
684            .is_none_or(|filled| filled == "0" || filled.is_empty())
685    }
686
687    fn try_handle_post_only_auto_cancel(
688        &mut self,
689        msg: &OKXOrderMsg,
690        ts_init: UnixNanos,
691        exec_reports: &mut Vec<ExecutionReport>,
692    ) -> bool {
693        if !Self::is_post_only_auto_cancel(msg) {
694            return false;
695        }
696
697        let Some(client_order_id) = parse_client_order_id(&msg.cl_ord_id) else {
698            return false;
699        };
700
701        let Some((_, (trader_id, strategy_id, instrument_id))) =
702            self.active_client_orders.remove(&client_order_id)
703        else {
704            return false;
705        };
706
707        self.client_id_aliases.remove(&client_order_id);
708
709        if !exec_reports.is_empty() {
710            let reports = std::mem::take(exec_reports);
711            self.pending_messages
712                .push_back(NautilusWsMessage::ExecutionReports(reports));
713        }
714
715        let reason = msg
716            .cancel_source_reason
717            .as_ref()
718            .filter(|reason| !reason.is_empty())
719            .map_or_else(
720                || Ustr::from(OKX_POST_ONLY_CANCEL_REASON),
721                |reason| Ustr::from(reason.as_str()),
722            );
723
724        let ts_event = parse_millisecond_timestamp(msg.u_time);
725        let rejected = OrderRejected::new(
726            trader_id,
727            strategy_id,
728            instrument_id,
729            client_order_id,
730            self.account_id,
731            reason,
732            UUID4::new(),
733            ts_event,
734            ts_init,
735            false,
736            true,
737        );
738
739        self.pending_messages
740            .push_back(NautilusWsMessage::OrderRejected(rejected));
741
742        true
743    }
744
745    fn register_client_order_aliases(
746        &self,
747        raw_child: &Option<ClientOrderId>,
748        parent_from_msg: &Option<ClientOrderId>,
749    ) -> Option<ClientOrderId> {
750        if let Some(parent) = parent_from_msg {
751            self.client_id_aliases.insert(*parent, *parent);
752            if let Some(child) = raw_child.as_ref().filter(|child| **child != *parent) {
753                self.client_id_aliases.insert(*child, *parent);
754            }
755            Some(*parent)
756        } else if let Some(child) = raw_child.as_ref() {
757            if let Some(mapped) = self.client_id_aliases.get(child) {
758                Some(*mapped.value())
759            } else {
760                self.client_id_aliases.insert(*child, *child);
761                Some(*child)
762            }
763        } else {
764            None
765        }
766    }
767
768    fn adjust_execution_report(
769        &self,
770        report: ExecutionReport,
771        effective_client_id: &Option<ClientOrderId>,
772        raw_child: &Option<ClientOrderId>,
773    ) -> ExecutionReport {
774        match report {
775            ExecutionReport::Order(status_report) => {
776                let mut adjusted = status_report;
777                let mut final_id = *effective_client_id;
778
779                if final_id.is_none() {
780                    final_id = adjusted.client_order_id;
781                }
782
783                if final_id.is_none()
784                    && let Some(child) = raw_child.as_ref()
785                    && let Some(mapped) = self.client_id_aliases.get(child)
786                {
787                    final_id = Some(*mapped.value());
788                }
789
790                if let Some(final_id_value) = final_id {
791                    if adjusted.client_order_id != Some(final_id_value) {
792                        adjusted = adjusted.with_client_order_id(final_id_value);
793                    }
794                    self.client_id_aliases
795                        .insert(final_id_value, final_id_value);
796
797                    if let Some(child) =
798                        raw_child.as_ref().filter(|child| **child != final_id_value)
799                    {
800                        adjusted = adjusted.with_linked_order_ids(vec![*child]);
801                    }
802                }
803
804                ExecutionReport::Order(adjusted)
805            }
806            ExecutionReport::Fill(mut fill_report) => {
807                let mut final_id = *effective_client_id;
808                if final_id.is_none() {
809                    final_id = fill_report.client_order_id;
810                }
811                if final_id.is_none()
812                    && let Some(child) = raw_child.as_ref()
813                    && let Some(mapped) = self.client_id_aliases.get(child)
814                {
815                    final_id = Some(*mapped.value());
816                }
817
818                if let Some(final_id_value) = final_id {
819                    fill_report.client_order_id = Some(final_id_value);
820                    self.client_id_aliases
821                        .insert(final_id_value, final_id_value);
822                }
823
824                ExecutionReport::Fill(fill_report)
825            }
826        }
827    }
828
829    fn update_caches_with_report(&mut self, report: &ExecutionReport) {
830        match report {
831            ExecutionReport::Fill(fill_report) => {
832                let order_id = fill_report.venue_order_id.inner();
833                let current_fee = self
834                    .fee_cache
835                    .get(&order_id)
836                    .copied()
837                    .unwrap_or_else(|| Money::new(0.0, fill_report.commission.currency));
838                let total_fee = current_fee + fill_report.commission;
839                self.fee_cache.insert(order_id, total_fee);
840
841                let current_filled_qty = self
842                    .filled_qty_cache
843                    .get(&order_id)
844                    .copied()
845                    .unwrap_or_else(|| Quantity::zero(fill_report.last_qty.precision));
846                let total_filled_qty = current_filled_qty + fill_report.last_qty;
847                self.filled_qty_cache.insert(order_id, total_filled_qty);
848            }
849            ExecutionReport::Order(status_report) => {
850                if matches!(status_report.order_status, OrderStatus::Filled) {
851                    self.fee_cache.remove(&status_report.venue_order_id.inner());
852                    self.filled_qty_cache
853                        .remove(&status_report.venue_order_id.inner());
854                }
855
856                if matches!(
857                    status_report.order_status,
858                    OrderStatus::Canceled
859                        | OrderStatus::Expired
860                        | OrderStatus::Filled
861                        | OrderStatus::Rejected,
862                ) {
863                    self.emitted_order_accepted
864                        .remove(&status_report.venue_order_id);
865                    if let Some(client_order_id) = status_report.client_order_id {
866                        self.order_state_cache.remove(&client_order_id);
867                        self.active_client_orders.remove(&client_order_id);
868                        self.client_id_aliases.remove(&client_order_id);
869                    }
870                    if let Some(linked) = &status_report.linked_order_ids {
871                        for child in linked {
872                            self.client_id_aliases.remove(child);
873                        }
874                    }
875                }
876            }
877        }
878    }
879
880    #[allow(clippy::too_many_lines)]
881    fn handle_order_response(
882        &mut self,
883        id: Option<String>,
884        op: OKXWsOperation,
885        code: String,
886        msg: String,
887        data: Vec<Value>,
888        ts_init: UnixNanos,
889    ) -> Option<NautilusWsMessage> {
890        if code == "0" {
891            log::debug!("Order operation successful: id={id:?} op={op} code={code}");
892
893            if op == OKXWsOperation::BatchCancelOrders {
894                log::debug!(
895                    "Batch cancel operation successful: id={id:?} cancel_count={}",
896                    data.len()
897                );
898
899                // Check for per-order errors even when top-level code is "0"
900                for (idx, entry) in data.iter().enumerate() {
901                    if let Some(entry_code) = entry.get("sCode").and_then(|v| v.as_str())
902                        && entry_code != "0"
903                    {
904                        let entry_msg = entry
905                            .get("sMsg")
906                            .and_then(|v| v.as_str())
907                            .unwrap_or("Unknown error");
908
909                        if let Some(cl_ord_id_str) = entry
910                            .get("clOrdId")
911                            .and_then(|v| v.as_str())
912                            .filter(|s| !s.is_empty())
913                        {
914                            log::error!(
915                                "Batch cancel partial failure for order {cl_ord_id_str}: sCode={entry_code} sMsg={entry_msg}"
916                            );
917                            // TODO: Emit OrderCancelRejected for this specific order
918                        } else {
919                            log::error!(
920                                "Batch cancel entry[{idx}] failed: sCode={entry_code} sMsg={entry_msg} data={entry:?}"
921                            );
922                        }
923                    }
924                }
925
926                return None;
927            } else if op == OKXWsOperation::MassCancel
928                && let Some(request_id) = &id
929                && let Some(instrument_id) = self.pending_mass_cancel_requests.remove(request_id)
930            {
931                log::debug!("Mass cancel operation successful for instrument: {instrument_id}");
932            } else if op == OKXWsOperation::Order
933                && let Some(request_id) = &id
934                && let Some((params, client_order_id, trader_id, strategy_id, instrument_id)) =
935                    self.pending_place_requests.remove(request_id)
936            {
937                let (venue_order_id, ts_accepted) = if let Some(first) = data.first() {
938                    let ord_id = first
939                        .get("ordId")
940                        .and_then(|v| v.as_str())
941                        .filter(|s| !s.is_empty())
942                        .map(VenueOrderId::new);
943
944                    let ts = first
945                        .get("ts")
946                        .and_then(|v| v.as_str())
947                        .and_then(|s| s.parse::<u64>().ok())
948                        .map_or_else(
949                            || self.clock.get_time_ns(),
950                            |ms| UnixNanos::from(ms * 1_000_000),
951                        );
952
953                    (ord_id, ts)
954                } else {
955                    (None, self.clock.get_time_ns())
956                };
957
958                if let Some(instrument) = self.instruments_cache.get(&instrument_id.symbol.inner())
959                {
960                    match params {
961                        PendingOrderParams::Regular(order_params) => {
962                            let order_type = determine_order_type(
963                                order_params.ord_type,
964                                order_params.px.as_deref().unwrap_or(""),
965                            );
966
967                            let is_explicit_quote_sized = order_params
968                                .tgt_ccy
969                                .is_some_and(|tgt| tgt == OKXTargetCurrency::QuoteCcy);
970
971                            // SPOT market BUY in cash mode with no tgt_ccy defaults to quote-sizing
972                            let is_implicit_quote_sized = order_params.tgt_ccy.is_none()
973                                && order_params.side == OKXSide::Buy
974                                && order_type == OrderType::Market
975                                && order_params.td_mode == OKXTradeMode::Cash
976                                && instrument.instrument_class().as_ref() == "SPOT";
977
978                            if is_explicit_quote_sized || is_implicit_quote_sized {
979                                // For quote-sized orders, sz is in quote currency (USDT),
980                                // not base currency (ETH). We can't accurately parse the
981                                // base quantity without the fill price, so we skip the
982                                // synthetic OrderAccepted and rely on the orders channel
983                                log::debug!(
984                                    "Skipping synthetic OrderAccepted for {} quote-sized order: client_order_id={client_order_id}, venue_order_id={venue_order_id:?}",
985                                    if is_explicit_quote_sized {
986                                        "explicit"
987                                    } else {
988                                        "implicit"
989                                    },
990                                );
991                                return None;
992                            }
993
994                            let Some(v_order_id) = venue_order_id else {
995                                log::error!(
996                                    "No venue_order_id for accepted order: client_order_id={client_order_id}"
997                                );
998                                return None;
999                            };
1000
1001                            // Check if already emitted from orders channel push
1002                            if self.emitted_order_accepted.contains_key(&v_order_id) {
1003                                log::debug!(
1004                                    "Skipping duplicate OrderAccepted from operation response for venue_order_id={v_order_id}"
1005                                );
1006                                return None;
1007                            }
1008                            self.emitted_order_accepted.insert(v_order_id, ());
1009
1010                            let accepted = OrderAccepted::new(
1011                                trader_id,
1012                                strategy_id,
1013                                instrument_id,
1014                                client_order_id,
1015                                v_order_id,
1016                                self.account_id,
1017                                UUID4::new(),
1018                                ts_accepted,
1019                                ts_init,
1020                                false, // Not from reconciliation
1021                            );
1022
1023                            log::debug!(
1024                                "Order accepted: client_order_id={client_order_id}, venue_order_id={v_order_id}"
1025                            );
1026
1027                            return Some(NautilusWsMessage::OrderAccepted(accepted));
1028                        }
1029                        PendingOrderParams::Algo(_) => {
1030                            log::debug!(
1031                                "Algo order placement confirmed: client_order_id={client_order_id}, venue_order_id={venue_order_id:?}"
1032                            );
1033                        }
1034                    }
1035                } else {
1036                    log::error!("Instrument not found for accepted order: {instrument_id}");
1037                }
1038            }
1039
1040            if let Some(first) = data.first()
1041                && let Some(success_msg) = first.get("sMsg").and_then(|value| value.as_str())
1042            {
1043                log::debug!("Order details: {success_msg}");
1044            }
1045
1046            return None;
1047        }
1048
1049        let error_msg = data
1050            .first()
1051            .and_then(|d| d.get("sMsg"))
1052            .and_then(|s| s.as_str())
1053            .unwrap_or(&msg)
1054            .to_string();
1055
1056        if let Some(first) = data.first() {
1057            log::debug!(
1058                "Error data fields: {}",
1059                serde_json::to_string_pretty(first)
1060                    .unwrap_or_else(|_| "unable to serialize".to_string())
1061            );
1062        }
1063
1064        log::warn!("Order operation failed: id={id:?} op={op} code={code} msg={error_msg}");
1065
1066        let ts_event = self.clock.get_time_ns();
1067
1068        if let Some(request_id) = &id {
1069            match op {
1070                OKXWsOperation::Order => {
1071                    if let Some((_params, client_order_id, trader_id, strategy_id, instrument_id)) =
1072                        self.pending_place_requests.remove(request_id)
1073                    {
1074                        let due_post_only = is_post_only_rejection(code.as_str(), &data);
1075                        let rejected = OrderRejected::new(
1076                            trader_id,
1077                            strategy_id,
1078                            instrument_id,
1079                            client_order_id,
1080                            self.account_id,
1081                            Ustr::from(error_msg.as_str()),
1082                            UUID4::new(),
1083                            ts_event,
1084                            ts_init,
1085                            false, // Not from reconciliation
1086                            due_post_only,
1087                        );
1088
1089                        return Some(NautilusWsMessage::OrderRejected(rejected));
1090                    }
1091                }
1092                OKXWsOperation::CancelOrder => {
1093                    if let Some((
1094                        client_order_id,
1095                        trader_id,
1096                        strategy_id,
1097                        instrument_id,
1098                        venue_order_id,
1099                    )) = self.pending_cancel_requests.remove(request_id)
1100                    {
1101                        let rejected = OrderCancelRejected::new(
1102                            trader_id,
1103                            strategy_id,
1104                            instrument_id,
1105                            client_order_id,
1106                            Ustr::from(error_msg.as_str()),
1107                            UUID4::new(),
1108                            ts_event,
1109                            ts_init,
1110                            false, // Not from reconciliation
1111                            venue_order_id,
1112                            Some(self.account_id),
1113                        );
1114
1115                        return Some(NautilusWsMessage::OrderCancelRejected(rejected));
1116                    }
1117                }
1118                OKXWsOperation::AmendOrder => {
1119                    if let Some((
1120                        client_order_id,
1121                        trader_id,
1122                        strategy_id,
1123                        instrument_id,
1124                        venue_order_id,
1125                    )) = self.pending_amend_requests.remove(request_id)
1126                    {
1127                        let rejected = OrderModifyRejected::new(
1128                            trader_id,
1129                            strategy_id,
1130                            instrument_id,
1131                            client_order_id,
1132                            Ustr::from(error_msg.as_str()),
1133                            UUID4::new(),
1134                            ts_event,
1135                            ts_init,
1136                            false, // Not from reconciliation
1137                            venue_order_id,
1138                            Some(self.account_id),
1139                        );
1140
1141                        return Some(NautilusWsMessage::OrderModifyRejected(rejected));
1142                    }
1143                }
1144                OKXWsOperation::OrderAlgo => {
1145                    if let Some((_params, client_order_id, trader_id, strategy_id, instrument_id)) =
1146                        self.pending_place_requests.remove(request_id)
1147                    {
1148                        let due_post_only = is_post_only_rejection(code.as_str(), &data);
1149                        let rejected = OrderRejected::new(
1150                            trader_id,
1151                            strategy_id,
1152                            instrument_id,
1153                            client_order_id,
1154                            self.account_id,
1155                            Ustr::from(error_msg.as_str()),
1156                            UUID4::new(),
1157                            ts_event,
1158                            ts_init,
1159                            false, // Not from reconciliation
1160                            due_post_only,
1161                        );
1162
1163                        return Some(NautilusWsMessage::OrderRejected(rejected));
1164                    }
1165                }
1166                OKXWsOperation::CancelAlgos => {
1167                    if let Some((
1168                        client_order_id,
1169                        trader_id,
1170                        strategy_id,
1171                        instrument_id,
1172                        venue_order_id,
1173                    )) = self.pending_cancel_requests.remove(request_id)
1174                    {
1175                        let rejected = OrderCancelRejected::new(
1176                            trader_id,
1177                            strategy_id,
1178                            instrument_id,
1179                            client_order_id,
1180                            Ustr::from(error_msg.as_str()),
1181                            UUID4::new(),
1182                            ts_event,
1183                            ts_init,
1184                            false, // Not from reconciliation
1185                            venue_order_id,
1186                            Some(self.account_id),
1187                        );
1188
1189                        return Some(NautilusWsMessage::OrderCancelRejected(rejected));
1190                    }
1191                }
1192                OKXWsOperation::MassCancel => {
1193                    if let Some(instrument_id) =
1194                        self.pending_mass_cancel_requests.remove(request_id)
1195                    {
1196                        log::error!(
1197                            "Mass cancel operation failed for {instrument_id}: code={code} msg={error_msg}"
1198                        );
1199                        let error = OKXWebSocketError {
1200                            code,
1201                            message: format!("Mass cancel failed for {instrument_id}: {error_msg}"),
1202                            conn_id: None,
1203                            timestamp: ts_event.as_u64(),
1204                        };
1205                        return Some(NautilusWsMessage::Error(error));
1206                    } else {
1207                        log::error!("Mass cancel operation failed: code={code} msg={error_msg}");
1208                    }
1209                }
1210                OKXWsOperation::BatchCancelOrders => {
1211                    log::warn!(
1212                        "Batch cancel operation failed: id={id:?} code={code} msg={error_msg} data_count={}",
1213                        data.len()
1214                    );
1215
1216                    // Iterate through data array to check per-order errors
1217                    for (idx, entry) in data.iter().enumerate() {
1218                        let entry_code =
1219                            entry.get("sCode").and_then(|v| v.as_str()).unwrap_or(&code);
1220                        let entry_msg = entry
1221                            .get("sMsg")
1222                            .and_then(|v| v.as_str())
1223                            .unwrap_or(&error_msg);
1224
1225                        if entry_code != "0" {
1226                            // Try to extract client order ID for targeted error events
1227                            if let Some(cl_ord_id_str) = entry
1228                                .get("clOrdId")
1229                                .and_then(|v| v.as_str())
1230                                .filter(|s| !s.is_empty())
1231                            {
1232                                log::error!(
1233                                    "Batch cancel failed for order {cl_ord_id_str}: sCode={entry_code} sMsg={entry_msg}"
1234                                );
1235                                // TODO: Emit OrderCancelRejected event once we track
1236                                // batch cancel metadata (client_order_id, trader_id, etc.)
1237                            } else {
1238                                log::error!(
1239                                    "Batch cancel entry[{idx}] failed: sCode={entry_code} sMsg={entry_msg} data={entry:?}"
1240                                );
1241                            }
1242                        }
1243                    }
1244
1245                    // Emit generic error for the batch operation
1246                    let error = OKXWebSocketError {
1247                        code,
1248                        message: format!("Batch cancel failed: {error_msg}"),
1249                        conn_id: None,
1250                        timestamp: ts_event.as_u64(),
1251                    };
1252                    return Some(NautilusWsMessage::Error(error));
1253                }
1254                _ => log::warn!("Unhandled operation type for rejection: {op}"),
1255            }
1256        }
1257
1258        let error = OKXWebSocketError {
1259            code,
1260            message: error_msg,
1261            conn_id: None,
1262            timestamp: ts_event.as_u64(),
1263        };
1264        Some(NautilusWsMessage::Error(error))
1265    }
1266
1267    fn handle_book_data(
1268        &self,
1269        arg: OKXWebSocketArg,
1270        action: OKXBookAction,
1271        data: Vec<OKXBookMsg>,
1272        ts_init: UnixNanos,
1273    ) -> Option<NautilusWsMessage> {
1274        let Some(inst_id) = arg.inst_id else {
1275            log::error!("Instrument ID missing for book data event");
1276            return None;
1277        };
1278
1279        let inst = self.instruments_cache.get(&inst_id)?;
1280
1281        let instrument_id = inst.id();
1282        let price_precision = inst.price_precision();
1283        let size_precision = inst.size_precision();
1284
1285        match parse_book_msg_vec(
1286            data,
1287            &instrument_id,
1288            price_precision,
1289            size_precision,
1290            action,
1291            ts_init,
1292        ) {
1293            Ok(payloads) => Some(NautilusWsMessage::Data(payloads)),
1294            Err(e) => {
1295                log::error!("Failed to parse book message: {e}");
1296                None
1297            }
1298        }
1299    }
1300
1301    fn handle_account_data(
1302        &mut self,
1303        data: Value,
1304        ts_init: UnixNanos,
1305    ) -> Option<NautilusWsMessage> {
1306        let Value::Array(arr) = data else {
1307            log::error!("Account data is not an array");
1308            return None;
1309        };
1310
1311        let first = arr.into_iter().next()?;
1312
1313        let account: OKXAccount = match serde_json::from_value(first) {
1314            Ok(acc) => acc,
1315            Err(e) => {
1316                log::error!("Failed to parse account data: {e}");
1317                return None;
1318            }
1319        };
1320
1321        match parse_account_state(&account, self.account_id, ts_init) {
1322            Ok(account_state) => {
1323                if let Some(last_account_state) = &self.last_account_state
1324                    && account_state.has_same_balances_and_margins(last_account_state)
1325                {
1326                    return None;
1327                }
1328                self.last_account_state = Some(account_state.clone());
1329                Some(NautilusWsMessage::AccountUpdate(account_state))
1330            }
1331            Err(e) => {
1332                log::error!("Failed to parse account state: {e}");
1333                None
1334            }
1335        }
1336    }
1337
1338    fn handle_positions_data(&mut self, data: Value, ts_init: UnixNanos) {
1339        match serde_json::from_value::<Vec<OKXPosition>>(data) {
1340            Ok(positions) => {
1341                log::debug!("Received {} position update(s)", positions.len());
1342
1343                for position in positions {
1344                    let instrument = match self.instruments_cache.get(&position.inst_id) {
1345                        Some(inst) => inst,
1346                        None => {
1347                            log::warn!(
1348                                "Received position update for unknown instrument {}, skipping",
1349                                position.inst_id
1350                            );
1351                            continue;
1352                        }
1353                    };
1354
1355                    let instrument_id = instrument.id();
1356                    let size_precision = instrument.size_precision();
1357
1358                    match parse_position_status_report(
1359                        position,
1360                        self.account_id,
1361                        instrument_id,
1362                        size_precision,
1363                        ts_init,
1364                    ) {
1365                        Ok(position_report) => {
1366                            self.pending_messages
1367                                .push_back(NautilusWsMessage::PositionUpdate(position_report));
1368                        }
1369                        Err(e) => {
1370                            log::error!(
1371                                "Failed to parse position status report for {instrument_id}: {e}"
1372                            );
1373                        }
1374                    }
1375                }
1376            }
1377            Err(e) => {
1378                log::error!("Failed to parse positions data: {e}");
1379            }
1380        }
1381    }
1382
1383    fn handle_orders_data(&mut self, data: Value, ts_init: UnixNanos) -> Option<NautilusWsMessage> {
1384        let orders: Vec<OKXOrderMsg> = match serde_json::from_value(data) {
1385            Ok(orders) => orders,
1386            Err(e) => {
1387                log::error!("Failed to deserialize orders channel payload: {e}");
1388                return None;
1389            }
1390        };
1391
1392        log::debug!(
1393            "Received {} order message(s) from orders channel",
1394            orders.len()
1395        );
1396
1397        let mut exec_reports: Vec<ExecutionReport> = Vec::with_capacity(orders.len());
1398
1399        for msg in orders {
1400            log::debug!(
1401                "Processing order message: inst_id={}, cl_ord_id={}, state={:?}, exec_type={:?}",
1402                msg.inst_id,
1403                msg.cl_ord_id,
1404                msg.state,
1405                msg.exec_type
1406            );
1407
1408            if self.try_handle_post_only_auto_cancel(&msg, ts_init, &mut exec_reports) {
1409                continue;
1410            }
1411
1412            let raw_child = parse_client_order_id(&msg.cl_ord_id);
1413            let parent_from_msg = msg
1414                .algo_cl_ord_id
1415                .as_ref()
1416                .filter(|value| !value.is_empty())
1417                .map(ClientOrderId::new);
1418            let effective_client_id =
1419                self.register_client_order_aliases(&raw_child, &parent_from_msg);
1420
1421            let Some(instrument) = self.instruments_cache.get(&msg.inst_id) else {
1422                log::error!(
1423                    "No instrument found for inst_id: {inst_id}",
1424                    inst_id = msg.inst_id
1425                );
1426                continue;
1427            };
1428            let price_precision = instrument.price_precision();
1429            let size_precision = instrument.size_precision();
1430
1431            let order_metadata = effective_client_id
1432                .and_then(|cid| self.active_client_orders.get(&cid).map(|e| *e.value()));
1433
1434            let previous_fee = self.fee_cache.get(&msg.ord_id).copied();
1435            let previous_filled_qty = self.filled_qty_cache.get(&msg.ord_id).copied();
1436            let previous_state =
1437                effective_client_id.and_then(|cid| self.order_state_cache.get(&cid).cloned());
1438
1439            // SAFETY: order_metadata being Some implies effective_client_id is Some
1440            if let (Some((trader_id, strategy_id, _instrument_id)), Some(canonical_client_id)) =
1441                (order_metadata, effective_client_id)
1442            {
1443                match parse_order_event(
1444                    &msg,
1445                    canonical_client_id,
1446                    self.account_id,
1447                    trader_id,
1448                    strategy_id,
1449                    instrument,
1450                    previous_fee,
1451                    previous_filled_qty,
1452                    previous_state.as_ref(),
1453                    ts_init,
1454                ) {
1455                    Ok(event) => {
1456                        self.process_parsed_order_event(
1457                            event,
1458                            &msg,
1459                            price_precision,
1460                            size_precision,
1461                            canonical_client_id,
1462                            &raw_child,
1463                            &mut exec_reports,
1464                        );
1465                    }
1466                    Err(e) => log::error!("Failed to parse order event: {e}"),
1467                }
1468            } else {
1469                // External order or not tracked - use old parse_order_msg for backward compatibility
1470                match parse_order_msg(
1471                    &msg,
1472                    self.account_id,
1473                    &self.instruments_cache,
1474                    &self.fee_cache,
1475                    &self.filled_qty_cache,
1476                    ts_init,
1477                ) {
1478                    Ok(report) => {
1479                        log::debug!("Parsed external order as execution report: {report:?}");
1480                        let adjusted =
1481                            self.adjust_execution_report(report, &effective_client_id, &raw_child);
1482                        self.update_caches_with_report(&adjusted);
1483                        exec_reports.push(adjusted);
1484                    }
1485                    Err(e) => log::error!("Failed to parse order message: {e}"),
1486                }
1487            }
1488        }
1489
1490        if !exec_reports.is_empty() {
1491            log::debug!(
1492                "Pushing {count} execution report(s) to message queue",
1493                count = exec_reports.len()
1494            );
1495            self.pending_messages
1496                .push_back(NautilusWsMessage::ExecutionReports(exec_reports));
1497        }
1498
1499        self.pending_messages.pop_front()
1500    }
1501
1502    /// Processes a parsed order event and emits the appropriate message.
1503    #[allow(clippy::too_many_arguments)]
1504    fn process_parsed_order_event(
1505        &mut self,
1506        event: ParsedOrderEvent,
1507        msg: &OKXOrderMsg,
1508        price_precision: u8,
1509        size_precision: u8,
1510        canonical_client_id: ClientOrderId,
1511        raw_child: &Option<ClientOrderId>,
1512        exec_reports: &mut Vec<ExecutionReport>,
1513    ) {
1514        let venue_order_id = VenueOrderId::new(msg.ord_id);
1515
1516        match event {
1517            ParsedOrderEvent::Accepted(accepted) => {
1518                if self.emitted_order_accepted.contains_key(&venue_order_id) {
1519                    log::debug!(
1520                        "Skipping duplicate OrderAccepted for venue_order_id={venue_order_id}"
1521                    );
1522                    return;
1523                }
1524                self.emitted_order_accepted.insert(venue_order_id, ());
1525                self.update_order_state_cache(
1526                    &canonical_client_id,
1527                    msg,
1528                    price_precision,
1529                    size_precision,
1530                );
1531
1532                self.pending_messages
1533                    .push_back(NautilusWsMessage::OrderAccepted(accepted));
1534            }
1535            ParsedOrderEvent::Canceled(canceled) => {
1536                self.cleanup_terminal_order(&canonical_client_id, &venue_order_id);
1537                self.pending_messages
1538                    .push_back(NautilusWsMessage::OrderCanceled(canceled));
1539            }
1540            ParsedOrderEvent::Expired(expired) => {
1541                self.cleanup_terminal_order(&canonical_client_id, &venue_order_id);
1542                self.pending_messages
1543                    .push_back(NautilusWsMessage::OrderExpired(expired));
1544            }
1545            ParsedOrderEvent::Triggered(triggered) => {
1546                self.update_order_state_cache(
1547                    &canonical_client_id,
1548                    msg,
1549                    price_precision,
1550                    size_precision,
1551                );
1552                self.pending_messages
1553                    .push_back(NautilusWsMessage::OrderTriggered(triggered));
1554            }
1555            ParsedOrderEvent::Updated(updated) => {
1556                self.update_order_state_cache(
1557                    &canonical_client_id,
1558                    msg,
1559                    price_precision,
1560                    size_precision,
1561                );
1562                self.pending_messages
1563                    .push_back(NautilusWsMessage::OrderUpdated(updated));
1564            }
1565            ParsedOrderEvent::Fill(fill_report) => {
1566                let effective_client_id = Some(canonical_client_id);
1567                let adjusted = self.adjust_execution_report(
1568                    ExecutionReport::Fill(fill_report),
1569                    &effective_client_id,
1570                    raw_child,
1571                );
1572                self.update_caches_with_report(&adjusted);
1573
1574                if msg.state == OKXOrderStatus::Filled {
1575                    self.cleanup_terminal_order(&canonical_client_id, &venue_order_id);
1576                }
1577
1578                exec_reports.push(adjusted);
1579            }
1580            ParsedOrderEvent::StatusOnly(status_report) => {
1581                let effective_client_id = Some(canonical_client_id);
1582                let adjusted = self.adjust_execution_report(
1583                    ExecutionReport::Order(*status_report),
1584                    &effective_client_id,
1585                    raw_child,
1586                );
1587                self.update_caches_with_report(&adjusted);
1588                exec_reports.push(adjusted);
1589            }
1590        }
1591    }
1592
1593    /// Updates the order state cache for detecting future updates.
1594    fn update_order_state_cache(
1595        &mut self,
1596        client_order_id: &ClientOrderId,
1597        msg: &OKXOrderMsg,
1598        price_precision: u8,
1599        size_precision: u8,
1600    ) {
1601        let venue_order_id = VenueOrderId::new(msg.ord_id);
1602        let quantity = parse_quantity(&msg.sz, size_precision).ok();
1603        let price = if is_market_price(&msg.px) {
1604            None
1605        } else {
1606            parse_price(&msg.px, price_precision).ok()
1607        };
1608
1609        if let Some(qty) = quantity {
1610            self.order_state_cache.insert(
1611                *client_order_id,
1612                OrderStateSnapshot {
1613                    venue_order_id,
1614                    quantity: qty,
1615                    price,
1616                },
1617            );
1618        }
1619    }
1620
1621    /// Cleans up tracking state for terminal orders.
1622    fn cleanup_terminal_order(
1623        &mut self,
1624        client_order_id: &ClientOrderId,
1625        venue_order_id: &VenueOrderId,
1626    ) {
1627        self.emitted_order_accepted.remove(venue_order_id);
1628        self.order_state_cache.remove(client_order_id);
1629        self.active_client_orders.remove(client_order_id);
1630        self.client_id_aliases.remove(client_order_id);
1631        self.client_id_aliases.retain(|_, v| *v != *client_order_id);
1632
1633        self.fee_cache.remove(&venue_order_id.inner());
1634        self.filled_qty_cache.remove(&venue_order_id.inner());
1635    }
1636
1637    fn handle_algo_orders_data(
1638        &mut self,
1639        data: Value,
1640        ts_init: UnixNanos,
1641    ) -> Option<NautilusWsMessage> {
1642        let orders: Vec<OKXAlgoOrderMsg> = match serde_json::from_value(data) {
1643            Ok(orders) => orders,
1644            Err(e) => {
1645                log::error!("Failed to deserialize algo orders payload: {e}");
1646                return None;
1647            }
1648        };
1649
1650        let mut exec_reports: Vec<ExecutionReport> = Vec::with_capacity(orders.len());
1651
1652        for msg in orders {
1653            let raw_child = parse_client_order_id(&msg.cl_ord_id);
1654            let parent_from_msg = parse_client_order_id(&msg.algo_cl_ord_id);
1655            let effective_client_id =
1656                self.register_client_order_aliases(&raw_child, &parent_from_msg);
1657
1658            match parse_algo_order_msg(msg, self.account_id, &self.instruments_cache, ts_init) {
1659                Ok(report) => {
1660                    let adjusted =
1661                        self.adjust_execution_report(report, &effective_client_id, &raw_child);
1662                    self.update_caches_with_report(&adjusted);
1663                    exec_reports.push(adjusted);
1664                }
1665                Err(e) => {
1666                    log::error!("Failed to parse algo order message: {e}");
1667                }
1668            }
1669        }
1670
1671        if exec_reports.is_empty() {
1672            None
1673        } else {
1674            Some(NautilusWsMessage::ExecutionReports(exec_reports))
1675        }
1676    }
1677
1678    fn handle_other_channel_data(
1679        &mut self,
1680        channel: OKXWsChannel,
1681        inst_id: Option<Ustr>,
1682        data: Value,
1683        ts_init: UnixNanos,
1684    ) -> Option<NautilusWsMessage> {
1685        let Some(inst_id) = inst_id else {
1686            log::error!("No instrument for channel {channel:?}");
1687            return None;
1688        };
1689
1690        let Some(instrument) = self.instruments_cache.get(&inst_id) else {
1691            log::error!("No instrument for channel {channel:?}, inst_id {inst_id:?}");
1692            return None;
1693        };
1694
1695        let instrument_id = instrument.id();
1696        let price_precision = instrument.price_precision();
1697        let size_precision = instrument.size_precision();
1698
1699        match parse_ws_message_data(
1700            &channel,
1701            data,
1702            &instrument_id,
1703            price_precision,
1704            size_precision,
1705            ts_init,
1706            &mut self.funding_rate_cache,
1707            &self.instruments_cache,
1708        ) {
1709            Ok(Some(msg)) => {
1710                if let NautilusWsMessage::Instrument(ref inst) = msg {
1711                    self.instruments_cache
1712                        .insert(inst.symbol().inner(), inst.as_ref().clone());
1713                }
1714                Some(msg)
1715            }
1716            Ok(None) => None,
1717            Err(e) => {
1718                log::error!("Error parsing message for channel {channel:?}: {e}");
1719                None
1720            }
1721        }
1722    }
1723
1724    pub(crate) fn parse_raw_message(
1725        msg: tokio_tungstenite::tungstenite::Message,
1726    ) -> Option<OKXWsMessage> {
1727        match msg {
1728            tokio_tungstenite::tungstenite::Message::Text(text) => {
1729                if text == TEXT_PONG {
1730                    log::trace!("Received pong from OKX");
1731                    return None;
1732                }
1733                if text == TEXT_PING {
1734                    log::trace!("Received ping from OKX (text)");
1735                    return Some(OKXWsMessage::Ping);
1736                }
1737
1738                if text == RECONNECTED {
1739                    log::debug!("Received WebSocket reconnection signal");
1740                    return Some(OKXWsMessage::Reconnected);
1741                }
1742                log::trace!("Received WebSocket message: {text}");
1743
1744                match serde_json::from_str(&text) {
1745                    Ok(ws_event) => match &ws_event {
1746                        OKXWsMessage::Error { code, msg } => {
1747                            log::error!("WebSocket error: {code} - {msg}");
1748                            Some(ws_event)
1749                        }
1750                        OKXWsMessage::Login {
1751                            event,
1752                            code,
1753                            msg,
1754                            conn_id,
1755                        } => {
1756                            if code == "0" {
1757                                log::info!("WebSocket authenticated: conn_id={conn_id}");
1758                            } else {
1759                                log::error!(
1760                                    "WebSocket authentication failed: event={event}, code={code}, error={msg}"
1761                                );
1762                            }
1763                            Some(ws_event)
1764                        }
1765                        OKXWsMessage::Subscription {
1766                            event,
1767                            arg,
1768                            conn_id,
1769                            ..
1770                        } => {
1771                            let channel_str = serde_json::to_string(&arg.channel)
1772                                .expect("Invalid OKX websocket channel")
1773                                .trim_matches('"')
1774                                .to_string();
1775                            log::debug!("{event}d: channel={channel_str}, conn_id={conn_id}");
1776                            Some(ws_event)
1777                        }
1778                        OKXWsMessage::ChannelConnCount {
1779                            event: _,
1780                            channel,
1781                            conn_count,
1782                            conn_id,
1783                        } => {
1784                            let channel_str = serde_json::to_string(&channel)
1785                                .expect("Invalid OKX websocket channel")
1786                                .trim_matches('"')
1787                                .to_string();
1788                            log::debug!(
1789                                "Channel connection status: channel={channel_str}, connections={conn_count}, conn_id={conn_id}",
1790                            );
1791                            None
1792                        }
1793                        OKXWsMessage::Ping => {
1794                            log::trace!("Ignoring ping event parsed from text payload");
1795                            None
1796                        }
1797                        OKXWsMessage::Data { .. } => Some(ws_event),
1798                        OKXWsMessage::BookData { .. } => Some(ws_event),
1799                        OKXWsMessage::OrderResponse {
1800                            id,
1801                            op,
1802                            code,
1803                            msg: _,
1804                            data,
1805                        } => {
1806                            if code == "0" {
1807                                log::debug!(
1808                                    "Order operation successful: id={id:?}, op={op}, code={code}"
1809                                );
1810
1811                                if let Some(order_data) = data.first() {
1812                                    let success_msg = order_data
1813                                        .get("sMsg")
1814                                        .and_then(|s| s.as_str())
1815                                        .unwrap_or("Order operation successful");
1816                                    log::debug!("Order success details: {success_msg}");
1817                                }
1818                            }
1819                            Some(ws_event)
1820                        }
1821                        OKXWsMessage::Reconnected => {
1822                            // This shouldn't happen as we handle RECONNECTED string directly
1823                            log::warn!("Unexpected Reconnected event from deserialization");
1824                            None
1825                        }
1826                    },
1827                    Err(e) => {
1828                        log::error!("Failed to parse message: {e}: {text}");
1829                        None
1830                    }
1831                }
1832            }
1833            Message::Ping(_payload) => {
1834                log::trace!("Received binary ping frame from OKX");
1835                Some(OKXWsMessage::Ping)
1836            }
1837            Message::Pong(payload) => {
1838                log::trace!("Received pong frame from OKX ({} bytes)", payload.len());
1839                None
1840            }
1841            Message::Binary(msg) => {
1842                log::debug!("Raw binary: {msg:?}");
1843                None
1844            }
1845            Message::Close(_) => {
1846                log::debug!("Received close message");
1847                None
1848            }
1849            msg => {
1850                log::warn!("Unexpected message: {msg}");
1851                None
1852            }
1853        }
1854    }
1855
1856    fn generate_unique_request_id(&self) -> String {
1857        self.request_id_counter
1858            .fetch_add(1, Ordering::SeqCst)
1859            .to_string()
1860    }
1861
1862    fn get_instrument_type_and_family_from_instrument(
1863        instrument: &InstrumentAny,
1864    ) -> anyhow::Result<(OKXInstrumentType, String)> {
1865        let inst_type = okx_instrument_type(instrument)?;
1866        let symbol = instrument.symbol().inner();
1867
1868        // Determine instrument family based on instrument type
1869        let inst_family = match instrument {
1870            InstrumentAny::CurrencyPair(_) => symbol.as_str().to_string(),
1871            InstrumentAny::CryptoPerpetual(_) => {
1872                // For SWAP: "BTC-USDT-SWAP" -> "BTC-USDT"
1873                symbol
1874                    .as_str()
1875                    .strip_suffix("-SWAP")
1876                    .unwrap_or(symbol.as_str())
1877                    .to_string()
1878            }
1879            InstrumentAny::CryptoFuture(_) => {
1880                // For FUTURES: "BTC-USDT-250328" -> "BTC-USDT"
1881                // Extract the base pair by removing date suffix
1882                let s = symbol.as_str();
1883                if let Some(idx) = s.rfind('-') {
1884                    s[..idx].to_string()
1885                } else {
1886                    s.to_string()
1887                }
1888            }
1889            _ => {
1890                anyhow::bail!("Unsupported instrument type for OKX");
1891            }
1892        };
1893
1894        Ok((inst_type, inst_family))
1895    }
1896
1897    async fn handle_mass_cancel(&mut self, instrument_id: InstrumentId) -> anyhow::Result<()> {
1898        let instrument = self
1899            .instruments_cache
1900            .get(&instrument_id.symbol.inner())
1901            .ok_or_else(|| anyhow::anyhow!("Unknown instrument {instrument_id}"))?;
1902
1903        let (inst_type, inst_family) =
1904            Self::get_instrument_type_and_family_from_instrument(instrument)?;
1905
1906        let params = WsMassCancelParams {
1907            inst_type,
1908            inst_family: Ustr::from(&inst_family),
1909        };
1910
1911        let args =
1912            vec![serde_json::to_value(params).map_err(|e| anyhow::anyhow!("JSON error: {e}"))?];
1913
1914        let request_id = self.generate_unique_request_id();
1915
1916        self.pending_mass_cancel_requests
1917            .insert(request_id.clone(), instrument_id);
1918
1919        let request = OKXWsRequest {
1920            id: Some(request_id.clone()),
1921            op: OKXWsOperation::MassCancel,
1922            exp_time: None,
1923            args,
1924        };
1925
1926        let payload = serde_json::to_string(&request)
1927            .map_err(|e| anyhow::anyhow!("Failed to serialize mass cancel request: {e}"))?;
1928
1929        match self
1930            .send_with_retry(payload, Some(vec![OKX_RATE_LIMIT_KEY_CANCEL.to_string()]))
1931            .await
1932        {
1933            Ok(()) => {
1934                log::debug!("Sent mass cancel for {instrument_id}");
1935                Ok(())
1936            }
1937            Err(e) => {
1938                log::error!("Failed to send mass cancel after retries: error={e}");
1939
1940                self.pending_mass_cancel_requests.remove(&request_id);
1941
1942                let error = OKXWebSocketError {
1943                    code: "CLIENT_ERROR".to_string(),
1944                    message: format!("Mass cancel failed for {instrument_id}: {e}"),
1945                    conn_id: None,
1946                    timestamp: self.clock.get_time_ns().as_u64(),
1947                };
1948                let _ = self.send(NautilusWsMessage::Error(error));
1949
1950                Err(anyhow::anyhow!("Failed to send mass cancel: {e}"))
1951            }
1952        }
1953    }
1954
1955    async fn handle_batch_cancel_orders(
1956        &self,
1957        args: Vec<Value>,
1958        request_id: String,
1959    ) -> anyhow::Result<()> {
1960        let request = OKXWsRequest {
1961            id: Some(request_id),
1962            op: OKXWsOperation::BatchCancelOrders,
1963            exp_time: None,
1964            args,
1965        };
1966
1967        let payload = serde_json::to_string(&request)
1968            .map_err(|e| anyhow::anyhow!("Failed to serialize batch cancel request: {e}"))?;
1969
1970        if let Some(client) = &self.inner {
1971            client
1972                .send_text(payload, Some(vec![OKX_RATE_LIMIT_KEY_CANCEL.to_string()]))
1973                .await
1974                .map_err(|e| anyhow::anyhow!("Failed to send batch cancel: {e}"))?;
1975            log::debug!("Sent batch cancel orders");
1976            Ok(())
1977        } else {
1978            Err(anyhow::anyhow!("No active WebSocket client"))
1979        }
1980    }
1981
1982    async fn handle_batch_place_orders(
1983        &self,
1984        args: Vec<Value>,
1985        request_id: String,
1986    ) -> anyhow::Result<()> {
1987        let request = OKXWsRequest {
1988            id: Some(request_id),
1989            op: OKXWsOperation::BatchOrders,
1990            exp_time: None,
1991            args,
1992        };
1993
1994        let payload = serde_json::to_string(&request)
1995            .map_err(|e| anyhow::anyhow!("Failed to serialize batch place request: {e}"))?;
1996
1997        if let Some(client) = &self.inner {
1998            client
1999                .send_text(payload, Some(vec![OKX_RATE_LIMIT_KEY_ORDER.to_string()]))
2000                .await
2001                .map_err(|e| anyhow::anyhow!("Failed to send batch place: {e}"))?;
2002            log::debug!("Sent batch place orders");
2003            Ok(())
2004        } else {
2005            Err(anyhow::anyhow!("No active WebSocket client"))
2006        }
2007    }
2008
2009    async fn handle_batch_amend_orders(
2010        &self,
2011        args: Vec<Value>,
2012        request_id: String,
2013    ) -> anyhow::Result<()> {
2014        let request = OKXWsRequest {
2015            id: Some(request_id),
2016            op: OKXWsOperation::BatchAmendOrders,
2017            exp_time: None,
2018            args,
2019        };
2020
2021        let payload = serde_json::to_string(&request)
2022            .map_err(|e| anyhow::anyhow!("Failed to serialize batch amend request: {e}"))?;
2023
2024        if let Some(client) = &self.inner {
2025            client
2026                .send_text(payload, Some(vec![OKX_RATE_LIMIT_KEY_AMEND.to_string()]))
2027                .await
2028                .map_err(|e| anyhow::anyhow!("Failed to send batch amend: {e}"))?;
2029            log::debug!("Sent batch amend orders");
2030            Ok(())
2031        } else {
2032            Err(anyhow::anyhow!("No active WebSocket client"))
2033        }
2034    }
2035
2036    async fn handle_subscribe(&self, args: Vec<OKXSubscriptionArg>) -> anyhow::Result<()> {
2037        for arg in &args {
2038            log::debug!(
2039                "Subscribing to channel: channel={:?}, inst_id={:?}",
2040                arg.channel,
2041                arg.inst_id
2042            );
2043        }
2044
2045        let message = OKXSubscription {
2046            op: OKXWsOperation::Subscribe,
2047            args,
2048        };
2049
2050        let json_txt = serde_json::to_string(&message)
2051            .map_err(|e| anyhow::anyhow!("Failed to serialize subscription: {e}"))?;
2052
2053        self.send_with_retry(
2054            json_txt,
2055            Some(vec![OKX_RATE_LIMIT_KEY_SUBSCRIPTION.to_string()]),
2056        )
2057        .await
2058        .map_err(|e| anyhow::anyhow!("Failed to send subscription after retries: {e}"))?;
2059        Ok(())
2060    }
2061
2062    async fn handle_unsubscribe(&self, args: Vec<OKXSubscriptionArg>) -> anyhow::Result<()> {
2063        for arg in &args {
2064            log::debug!(
2065                "Unsubscribing from channel: channel={:?}, inst_id={:?}",
2066                arg.channel,
2067                arg.inst_id
2068            );
2069        }
2070
2071        let message = OKXSubscription {
2072            op: OKXWsOperation::Unsubscribe,
2073            args,
2074        };
2075
2076        let json_txt = serde_json::to_string(&message)
2077            .map_err(|e| anyhow::anyhow!("Failed to serialize unsubscription: {e}"))?;
2078
2079        self.send_with_retry(
2080            json_txt,
2081            Some(vec![OKX_RATE_LIMIT_KEY_SUBSCRIPTION.to_string()]),
2082        )
2083        .await
2084        .map_err(|e| anyhow::anyhow!("Failed to send unsubscription after retries: {e}"))?;
2085        Ok(())
2086    }
2087
2088    async fn handle_place_order(
2089        &mut self,
2090        params: WsPostOrderParams,
2091        client_order_id: ClientOrderId,
2092        trader_id: TraderId,
2093        strategy_id: StrategyId,
2094        instrument_id: InstrumentId,
2095    ) -> anyhow::Result<()> {
2096        let request_id = self.generate_unique_request_id();
2097
2098        self.pending_place_requests.insert(
2099            request_id.clone(),
2100            (
2101                PendingOrderParams::Regular(params.clone()),
2102                client_order_id,
2103                trader_id,
2104                strategy_id,
2105                instrument_id,
2106            ),
2107        );
2108
2109        let request = OKXWsRequest {
2110            id: Some(request_id.clone()),
2111            op: OKXWsOperation::Order,
2112            exp_time: None,
2113            args: vec![params],
2114        };
2115
2116        let payload = serde_json::to_string(&request)
2117            .map_err(|e| anyhow::anyhow!("Failed to serialize place order request: {e}"))?;
2118
2119        match self
2120            .send_with_retry(payload, Some(vec![OKX_RATE_LIMIT_KEY_ORDER.to_string()]))
2121            .await
2122        {
2123            Ok(()) => {
2124                log::debug!("Sent place order request");
2125                Ok(())
2126            }
2127            Err(e) => {
2128                log::error!("Failed to send place order after retries: error={e}");
2129
2130                self.pending_place_requests.remove(&request_id);
2131
2132                let ts_now = self.clock.get_time_ns();
2133                let rejected = OrderRejected::new(
2134                    trader_id,
2135                    strategy_id,
2136                    instrument_id,
2137                    client_order_id,
2138                    self.account_id,
2139                    Ustr::from(&format!("WebSocket send failed: {e}")),
2140                    UUID4::new(),
2141                    ts_now, // ts_event
2142                    ts_now, // ts_init
2143                    false,  // Not from reconciliation
2144                    false,  // Not due to post-only
2145                );
2146                let _ = self.send(NautilusWsMessage::OrderRejected(rejected));
2147
2148                Err(anyhow::anyhow!("Failed to send place order: {e}"))
2149            }
2150        }
2151    }
2152
2153    async fn handle_place_algo_order(
2154        &mut self,
2155        params: WsPostAlgoOrderParams,
2156        client_order_id: ClientOrderId,
2157        trader_id: TraderId,
2158        strategy_id: StrategyId,
2159        instrument_id: InstrumentId,
2160    ) -> anyhow::Result<()> {
2161        let request_id = self.generate_unique_request_id();
2162
2163        self.pending_place_requests.insert(
2164            request_id.clone(),
2165            (
2166                PendingOrderParams::Algo(params.clone()),
2167                client_order_id,
2168                trader_id,
2169                strategy_id,
2170                instrument_id,
2171            ),
2172        );
2173
2174        let request = OKXWsRequest {
2175            id: Some(request_id.clone()),
2176            op: OKXWsOperation::OrderAlgo,
2177            exp_time: None,
2178            args: vec![params],
2179        };
2180
2181        let payload = serde_json::to_string(&request)
2182            .map_err(|e| anyhow::anyhow!("Failed to serialize place algo order request: {e}"))?;
2183
2184        match self
2185            .send_with_retry(payload, Some(vec![OKX_RATE_LIMIT_KEY_ORDER.to_string()]))
2186            .await
2187        {
2188            Ok(()) => {
2189                log::debug!("Sent place algo order request");
2190                Ok(())
2191            }
2192            Err(e) => {
2193                log::error!("Failed to send place algo order after retries: error={e}");
2194
2195                self.pending_place_requests.remove(&request_id);
2196
2197                let ts_now = self.clock.get_time_ns();
2198                let rejected = OrderRejected::new(
2199                    trader_id,
2200                    strategy_id,
2201                    instrument_id,
2202                    client_order_id,
2203                    self.account_id,
2204                    Ustr::from(&format!("WebSocket send failed: {e}")),
2205                    UUID4::new(),
2206                    ts_now, // ts_event
2207                    ts_now, // ts_init
2208                    false,  // Not from reconciliation
2209                    false,  // Not due to post-only
2210                );
2211                let _ = self.send(NautilusWsMessage::OrderRejected(rejected));
2212
2213                Err(anyhow::anyhow!("Failed to send place algo order: {e}"))
2214            }
2215        }
2216    }
2217
2218    async fn handle_cancel_order(
2219        &mut self,
2220        client_order_id: Option<ClientOrderId>,
2221        venue_order_id: Option<VenueOrderId>,
2222        instrument_id: InstrumentId,
2223        trader_id: TraderId,
2224        strategy_id: StrategyId,
2225    ) -> anyhow::Result<()> {
2226        let mut builder = WsCancelOrderParamsBuilder::default();
2227        builder.inst_id(instrument_id.symbol.as_str());
2228
2229        if let Some(venue_order_id) = venue_order_id {
2230            builder.ord_id(venue_order_id.as_str());
2231        }
2232
2233        if let Some(client_order_id) = client_order_id {
2234            builder.cl_ord_id(client_order_id.as_str());
2235        }
2236
2237        let params = builder
2238            .build()
2239            .map_err(|e| anyhow::anyhow!("Failed to build cancel params: {e}"))?;
2240
2241        let request_id = self.generate_unique_request_id();
2242
2243        // Track pending request if we have a client order ID
2244        if let Some(client_order_id) = client_order_id {
2245            self.pending_cancel_requests.insert(
2246                request_id.clone(),
2247                (
2248                    client_order_id,
2249                    trader_id,
2250                    strategy_id,
2251                    instrument_id,
2252                    venue_order_id,
2253                ),
2254            );
2255        }
2256
2257        let request = OKXWsRequest {
2258            id: Some(request_id.clone()),
2259            op: OKXWsOperation::CancelOrder,
2260            exp_time: None,
2261            args: vec![params],
2262        };
2263
2264        let payload = serde_json::to_string(&request)
2265            .map_err(|e| anyhow::anyhow!("Failed to serialize cancel request: {e}"))?;
2266
2267        match self
2268            .send_with_retry(payload, Some(vec![OKX_RATE_LIMIT_KEY_CANCEL.to_string()]))
2269            .await
2270        {
2271            Ok(()) => {
2272                log::debug!("Sent cancel order request");
2273                Ok(())
2274            }
2275            Err(e) => {
2276                log::error!("Failed to send cancel order after retries: error={e}");
2277
2278                self.pending_cancel_requests.remove(&request_id);
2279
2280                if let Some(client_order_id) = client_order_id {
2281                    let ts_now = self.clock.get_time_ns();
2282                    let rejected = OrderCancelRejected::new(
2283                        trader_id,
2284                        strategy_id,
2285                        instrument_id,
2286                        client_order_id,
2287                        Ustr::from(&format!("WebSocket send failed: {e}")),
2288                        UUID4::new(),
2289                        ts_now, // ts_event
2290                        ts_now, // ts_init
2291                        false,  // Not from reconciliation
2292                        venue_order_id,
2293                        Some(self.account_id),
2294                    );
2295                    let _ = self.send(NautilusWsMessage::OrderCancelRejected(rejected));
2296                }
2297
2298                Err(anyhow::anyhow!("Failed to send cancel order: {e}"))
2299            }
2300        }
2301    }
2302
2303    async fn handle_amend_order(
2304        &mut self,
2305        params: WsAmendOrderParams,
2306        client_order_id: ClientOrderId,
2307        trader_id: TraderId,
2308        strategy_id: StrategyId,
2309        instrument_id: InstrumentId,
2310        venue_order_id: Option<VenueOrderId>,
2311    ) -> anyhow::Result<()> {
2312        let request_id = self.generate_unique_request_id();
2313
2314        self.pending_amend_requests.insert(
2315            request_id.clone(),
2316            (
2317                client_order_id,
2318                trader_id,
2319                strategy_id,
2320                instrument_id,
2321                venue_order_id,
2322            ),
2323        );
2324
2325        let request = OKXWsRequest {
2326            id: Some(request_id.clone()),
2327            op: OKXWsOperation::AmendOrder,
2328            exp_time: None,
2329            args: vec![params],
2330        };
2331
2332        let payload = serde_json::to_string(&request)
2333            .map_err(|e| anyhow::anyhow!("Failed to serialize amend order request: {e}"))?;
2334
2335        match self
2336            .send_with_retry(payload, Some(vec![OKX_RATE_LIMIT_KEY_AMEND.to_string()]))
2337            .await
2338        {
2339            Ok(()) => {
2340                log::debug!("Sent amend order request");
2341                Ok(())
2342            }
2343            Err(e) => {
2344                log::error!("Failed to send amend order after retries: error={e}");
2345
2346                self.pending_amend_requests.remove(&request_id);
2347
2348                let ts_now = self.clock.get_time_ns();
2349                let rejected = OrderModifyRejected::new(
2350                    trader_id,
2351                    strategy_id,
2352                    instrument_id,
2353                    client_order_id,
2354                    Ustr::from(&format!("WebSocket send failed: {e}")),
2355                    UUID4::new(),
2356                    ts_now, // ts_event
2357                    ts_now, // ts_init
2358                    false,  // Not from reconciliation
2359                    venue_order_id,
2360                    Some(self.account_id),
2361                );
2362                let _ = self.send(NautilusWsMessage::OrderModifyRejected(rejected));
2363
2364                Err(anyhow::anyhow!("Failed to send amend order: {e}"))
2365            }
2366        }
2367    }
2368
2369    async fn handle_cancel_algo_order(
2370        &mut self,
2371        client_order_id: Option<ClientOrderId>,
2372        algo_order_id: Option<VenueOrderId>,
2373        instrument_id: InstrumentId,
2374        trader_id: TraderId,
2375        strategy_id: StrategyId,
2376    ) -> anyhow::Result<()> {
2377        let mut builder = WsCancelAlgoOrderParamsBuilder::default();
2378        builder.inst_id(instrument_id.symbol.as_str());
2379
2380        if let Some(client_order_id) = &client_order_id {
2381            builder.algo_cl_ord_id(client_order_id.as_str());
2382        }
2383
2384        if let Some(algo_id) = &algo_order_id {
2385            builder.algo_id(algo_id.as_str());
2386        }
2387
2388        let params = builder
2389            .build()
2390            .map_err(|e| anyhow::anyhow!("Failed to build cancel algo params: {e}"))?;
2391
2392        let request_id = self.generate_unique_request_id();
2393
2394        // Track pending cancellation if we have a client order ID
2395        if let Some(client_order_id) = client_order_id {
2396            self.pending_cancel_requests.insert(
2397                request_id.clone(),
2398                (client_order_id, trader_id, strategy_id, instrument_id, None),
2399            );
2400        }
2401
2402        let request = OKXWsRequest {
2403            id: Some(request_id.clone()),
2404            op: OKXWsOperation::CancelAlgos,
2405            exp_time: None,
2406            args: vec![params],
2407        };
2408
2409        let payload = serde_json::to_string(&request)
2410            .map_err(|e| anyhow::anyhow!("Failed to serialize cancel algo request: {e}"))?;
2411
2412        match self
2413            .send_with_retry(payload, Some(vec![OKX_RATE_LIMIT_KEY_CANCEL.to_string()]))
2414            .await
2415        {
2416            Ok(()) => {
2417                log::debug!("Sent cancel algo order request");
2418                Ok(())
2419            }
2420            Err(e) => {
2421                log::error!("Failed to send cancel algo order after retries: error={e}");
2422
2423                self.pending_cancel_requests.remove(&request_id);
2424
2425                if let Some(client_order_id) = client_order_id {
2426                    let ts_now = self.clock.get_time_ns();
2427                    let rejected = OrderCancelRejected::new(
2428                        trader_id,
2429                        strategy_id,
2430                        instrument_id,
2431                        client_order_id,
2432                        Ustr::from(&format!("WebSocket send failed: {e}")),
2433                        UUID4::new(),
2434                        ts_now, // ts_event
2435                        ts_now, // ts_init
2436                        false,  // Not from reconciliation
2437                        None,
2438                        Some(self.account_id),
2439                    );
2440                    let _ = self.send(NautilusWsMessage::OrderCancelRejected(rejected));
2441                }
2442
2443                Err(anyhow::anyhow!("Failed to send cancel algo order: {e}"))
2444            }
2445        }
2446    }
2447}
2448
2449/// Returns `true` when an OKX error payload represents a post-only rejection.
2450pub fn is_post_only_rejection(code: &str, data: &[Value]) -> bool {
2451    if code == OKX_POST_ONLY_ERROR_CODE {
2452        return true;
2453    }
2454
2455    for entry in data {
2456        if let Some(s_code) = entry.get("sCode").and_then(|value| value.as_str())
2457            && s_code == OKX_POST_ONLY_ERROR_CODE
2458        {
2459            return true;
2460        }
2461
2462        if let Some(inner_code) = entry.get("code").and_then(|value| value.as_str())
2463            && inner_code == OKX_POST_ONLY_ERROR_CODE
2464        {
2465            return true;
2466        }
2467    }
2468
2469    false
2470}
2471
2472/// Case-insensitive substring check.
2473#[inline]
2474fn contains_ignore_ascii_case(haystack: &str, needle: &str) -> bool {
2475    haystack
2476        .as_bytes()
2477        .windows(needle.len())
2478        .any(|window| window.eq_ignore_ascii_case(needle.as_bytes()))
2479}
2480
2481/// Determines if an OKX WebSocket error should trigger a retry.
2482fn should_retry_okx_error(error: &OKXWsError) -> bool {
2483    match error {
2484        OKXWsError::OkxError { error_code, .. } => should_retry_error_code(error_code),
2485        OKXWsError::TungsteniteError(_) => true, // Network errors are retryable
2486        OKXWsError::ClientError(msg) => {
2487            // Retry on timeout and connection errors
2488            contains_ignore_ascii_case(msg, "timeout")
2489                || contains_ignore_ascii_case(msg, "timed out")
2490                || contains_ignore_ascii_case(msg, "connection")
2491                || contains_ignore_ascii_case(msg, "network")
2492        }
2493        OKXWsError::AuthenticationError(_)
2494        | OKXWsError::JsonError(_)
2495        | OKXWsError::ParsingError(_) => {
2496            // Don't retry authentication or parsing errors automatically
2497            false
2498        }
2499    }
2500}
2501
2502/// Creates a timeout error for the retry manager.
2503fn create_okx_timeout_error(msg: String) -> OKXWsError {
2504    OKXWsError::ClientError(msg)
2505}
2506
2507#[cfg(test)]
2508mod tests {
2509    use std::sync::{Arc, atomic::AtomicBool};
2510
2511    use ahash::AHashMap;
2512    use dashmap::DashMap;
2513    use nautilus_model::{
2514        identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
2515        types::{Money, Quantity},
2516    };
2517    use nautilus_network::websocket::{AuthTracker, SubscriptionState};
2518    use rstest::rstest;
2519    use ustr::Ustr;
2520
2521    use super::{NautilusWsMessage, OKXWsFeedHandler};
2522    use crate::websocket::parse::OrderStateSnapshot;
2523
2524    const OKX_WS_TOPIC_DELIMITER: char = ':';
2525
2526    #[allow(clippy::type_complexity)]
2527    fn create_test_handler() -> (
2528        OKXWsFeedHandler,
2529        tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>,
2530        Arc<DashMap<ClientOrderId, (TraderId, StrategyId, InstrumentId)>>,
2531        Arc<DashMap<ClientOrderId, ClientOrderId>>,
2532    ) {
2533        let account_id = AccountId::new("OKX-001");
2534        let signal = Arc::new(AtomicBool::new(false));
2535        let (_cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
2536        let (_raw_tx, raw_rx) = tokio::sync::mpsc::unbounded_channel();
2537        let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel();
2538        let active_client_orders = Arc::new(DashMap::new());
2539        let client_id_aliases = Arc::new(DashMap::new());
2540        let auth_tracker = AuthTracker::new();
2541        let subscriptions_state = SubscriptionState::new(OKX_WS_TOPIC_DELIMITER);
2542
2543        let handler = OKXWsFeedHandler::new(
2544            account_id,
2545            signal,
2546            cmd_rx,
2547            raw_rx,
2548            out_tx,
2549            active_client_orders.clone(),
2550            client_id_aliases.clone(),
2551            auth_tracker,
2552            subscriptions_state,
2553        );
2554
2555        (handler, out_rx, active_client_orders, client_id_aliases)
2556    }
2557
2558    #[rstest]
2559    fn test_is_post_only_rejection_detects_by_code() {
2560        assert!(super::is_post_only_rejection("51019", &[]));
2561    }
2562
2563    #[rstest]
2564    fn test_is_post_only_rejection_detects_by_inner_code() {
2565        let data = vec![serde_json::json!({
2566            "sCode": "51019"
2567        })];
2568        assert!(super::is_post_only_rejection("50000", &data));
2569    }
2570
2571    #[rstest]
2572    fn test_is_post_only_rejection_false_for_unrelated_error() {
2573        let data = vec![serde_json::json!({
2574            "sMsg": "Insufficient balance"
2575        })];
2576        assert!(!super::is_post_only_rejection("50000", &data));
2577    }
2578
2579    #[rstest]
2580    fn test_cleanup_alias_removes_canonical_entry() {
2581        let aliases: DashMap<ClientOrderId, ClientOrderId> = DashMap::new();
2582        let canonical = ClientOrderId::new("PARENT-001");
2583        aliases.insert(canonical, canonical);
2584
2585        aliases.remove(&canonical);
2586        aliases.retain(|_, v| *v != canonical);
2587
2588        assert!(!aliases.contains_key(&canonical));
2589        assert!(aliases.is_empty());
2590    }
2591
2592    #[rstest]
2593    fn test_cleanup_alias_removes_child_alias_pointing_to_canonical() {
2594        let aliases: DashMap<ClientOrderId, ClientOrderId> = DashMap::new();
2595        let canonical = ClientOrderId::new("PARENT-001");
2596        let child = ClientOrderId::new("CHILD-001");
2597        aliases.insert(canonical, canonical);
2598        aliases.insert(child, canonical);
2599
2600        aliases.remove(&canonical);
2601        aliases.retain(|_, v| *v != canonical);
2602
2603        assert!(!aliases.contains_key(&canonical));
2604        assert!(!aliases.contains_key(&child));
2605        assert!(aliases.is_empty());
2606    }
2607
2608    #[rstest]
2609    fn test_cleanup_alias_does_not_affect_unrelated_entries() {
2610        let aliases: DashMap<ClientOrderId, ClientOrderId> = DashMap::new();
2611        let canonical1 = ClientOrderId::new("PARENT-001");
2612        let child1 = ClientOrderId::new("CHILD-001");
2613        let canonical2 = ClientOrderId::new("PARENT-002");
2614        let child2 = ClientOrderId::new("CHILD-002");
2615        aliases.insert(canonical1, canonical1);
2616        aliases.insert(child1, canonical1);
2617        aliases.insert(canonical2, canonical2);
2618        aliases.insert(child2, canonical2);
2619
2620        aliases.remove(&canonical1);
2621        aliases.retain(|_, v| *v != canonical1);
2622
2623        assert!(!aliases.contains_key(&canonical1));
2624        assert!(!aliases.contains_key(&child1));
2625        assert!(aliases.contains_key(&canonical2));
2626        assert!(aliases.contains_key(&child2));
2627        assert_eq!(aliases.len(), 2);
2628    }
2629
2630    #[rstest]
2631    fn test_cleanup_alias_handles_multiple_children() {
2632        let aliases: DashMap<ClientOrderId, ClientOrderId> = DashMap::new();
2633        let canonical = ClientOrderId::new("PARENT-001");
2634        let child1 = ClientOrderId::new("CHILD-001");
2635        let child2 = ClientOrderId::new("CHILD-002");
2636        let child3 = ClientOrderId::new("CHILD-003");
2637        aliases.insert(canonical, canonical);
2638        aliases.insert(child1, canonical);
2639        aliases.insert(child2, canonical);
2640        aliases.insert(child3, canonical);
2641
2642        aliases.remove(&canonical);
2643        aliases.retain(|_, v| *v != canonical);
2644
2645        assert!(aliases.is_empty());
2646    }
2647
2648    #[rstest]
2649    fn test_cleanup_removes_from_all_caches() {
2650        let emitted_accepted: DashMap<VenueOrderId, ()> = DashMap::new();
2651        let order_state_cache: AHashMap<ClientOrderId, u32> = AHashMap::new();
2652        let active_orders: DashMap<ClientOrderId, ()> = DashMap::new();
2653        let aliases: DashMap<ClientOrderId, ClientOrderId> = DashMap::new();
2654        let fee_cache: AHashMap<Ustr, f64> = AHashMap::new();
2655        let filled_qty_cache: AHashMap<Ustr, f64> = AHashMap::new();
2656        let canonical = ClientOrderId::new("PARENT-001");
2657        let child = ClientOrderId::new("CHILD-001");
2658        let venue_id = VenueOrderId::new("VENUE-001");
2659
2660        emitted_accepted.insert(venue_id, ());
2661        let mut order_state = order_state_cache;
2662        order_state.insert(canonical, 1);
2663        active_orders.insert(canonical, ());
2664        aliases.insert(canonical, canonical);
2665        aliases.insert(child, canonical);
2666        let mut fees = fee_cache;
2667        fees.insert(venue_id.inner(), 0.001);
2668        let mut filled = filled_qty_cache;
2669        filled.insert(venue_id.inner(), 1.0);
2670
2671        emitted_accepted.remove(&venue_id);
2672        order_state.remove(&canonical);
2673        active_orders.remove(&canonical);
2674        aliases.remove(&canonical);
2675        aliases.retain(|_, v| *v != canonical);
2676        fees.remove(&venue_id.inner());
2677        filled.remove(&venue_id.inner());
2678
2679        assert!(emitted_accepted.is_empty());
2680        assert!(order_state.is_empty());
2681        assert!(active_orders.is_empty());
2682        assert!(aliases.is_empty());
2683        assert!(fees.is_empty());
2684        assert!(filled.is_empty());
2685    }
2686
2687    #[rstest]
2688    fn test_alias_registration_parent_with_child() {
2689        let aliases: DashMap<ClientOrderId, ClientOrderId> = DashMap::new();
2690        let parent = ClientOrderId::new("PARENT-001");
2691        let child = ClientOrderId::new("CHILD-001");
2692        aliases.insert(parent, parent);
2693        aliases.insert(child, parent);
2694
2695        assert_eq!(*aliases.get(&parent).unwrap(), parent);
2696        assert_eq!(*aliases.get(&child).unwrap(), parent);
2697    }
2698
2699    #[rstest]
2700    fn test_alias_registration_standalone_order() {
2701        let aliases: DashMap<ClientOrderId, ClientOrderId> = DashMap::new();
2702        let order_id = ClientOrderId::new("ORDER-001");
2703        aliases.insert(order_id, order_id);
2704
2705        assert_eq!(*aliases.get(&order_id).unwrap(), order_id);
2706    }
2707
2708    #[rstest]
2709    fn test_alias_lookup_returns_canonical() {
2710        let aliases: DashMap<ClientOrderId, ClientOrderId> = DashMap::new();
2711        let canonical = ClientOrderId::new("PARENT-001");
2712        let child = ClientOrderId::new("CHILD-001");
2713
2714        aliases.insert(canonical, canonical);
2715        aliases.insert(child, canonical);
2716
2717        let resolved = aliases.get(&child).map(|v| *v);
2718        assert_eq!(resolved, Some(canonical));
2719    }
2720
2721    #[rstest]
2722    fn test_handler_register_client_order_aliases_with_parent() {
2723        let (handler, _out_rx, _active, client_id_aliases) = create_test_handler();
2724
2725        let child = Some(ClientOrderId::new("CHILD-001"));
2726        let parent = Some(ClientOrderId::new("PARENT-001"));
2727
2728        let result = handler.register_client_order_aliases(&child, &parent);
2729
2730        assert_eq!(result, Some(ClientOrderId::new("PARENT-001")));
2731        assert!(client_id_aliases.contains_key(&ClientOrderId::new("PARENT-001")));
2732        assert!(client_id_aliases.contains_key(&ClientOrderId::new("CHILD-001")));
2733        assert_eq!(
2734            *client_id_aliases
2735                .get(&ClientOrderId::new("CHILD-001"))
2736                .unwrap(),
2737            ClientOrderId::new("PARENT-001")
2738        );
2739    }
2740
2741    #[rstest]
2742    fn test_handler_register_client_order_aliases_without_parent() {
2743        let (handler, _out_rx, _active, client_id_aliases) = create_test_handler();
2744
2745        let child = Some(ClientOrderId::new("ORDER-001"));
2746        let parent: Option<ClientOrderId> = None;
2747
2748        let result = handler.register_client_order_aliases(&child, &parent);
2749
2750        assert_eq!(result, Some(ClientOrderId::new("ORDER-001")));
2751        assert!(client_id_aliases.contains_key(&ClientOrderId::new("ORDER-001")));
2752        assert_eq!(
2753            *client_id_aliases
2754                .get(&ClientOrderId::new("ORDER-001"))
2755                .unwrap(),
2756            ClientOrderId::new("ORDER-001")
2757        );
2758    }
2759
2760    #[rstest]
2761    fn test_handler_cleanup_terminal_order_removes_all_state() {
2762        let (mut handler, _out_rx, active_client_orders, client_id_aliases) = create_test_handler();
2763
2764        let canonical = ClientOrderId::new("PARENT-001");
2765        let child = ClientOrderId::new("CHILD-001");
2766        let venue_id = VenueOrderId::new("VENUE-001");
2767        let trader_id = TraderId::new("TRADER-001");
2768        let strategy_id = StrategyId::new("STRATEGY-001");
2769        let instrument_id = InstrumentId::from("ETH-USDT-PERP.OKX");
2770
2771        active_client_orders.insert(canonical, (trader_id, strategy_id, instrument_id));
2772        client_id_aliases.insert(canonical, canonical);
2773        client_id_aliases.insert(child, canonical);
2774        handler
2775            .fee_cache
2776            .insert(venue_id.inner(), Money::from("0.001 USDT"));
2777        handler
2778            .filled_qty_cache
2779            .insert(venue_id.inner(), Quantity::from("1.0"));
2780        handler.order_state_cache.insert(
2781            canonical,
2782            OrderStateSnapshot {
2783                venue_order_id: venue_id,
2784                quantity: Quantity::from("1.0"),
2785                price: None,
2786            },
2787        );
2788
2789        handler.cleanup_terminal_order(&canonical, &venue_id);
2790
2791        assert!(!active_client_orders.contains_key(&canonical));
2792        assert!(!client_id_aliases.contains_key(&canonical));
2793        assert!(!client_id_aliases.contains_key(&child));
2794        assert!(!handler.fee_cache.contains_key(&venue_id.inner()));
2795        assert!(!handler.filled_qty_cache.contains_key(&venue_id.inner()));
2796        assert!(!handler.order_state_cache.contains_key(&canonical));
2797    }
2798
2799    #[rstest]
2800    fn test_handler_cleanup_terminal_order_removes_multiple_children() {
2801        let (mut handler, _out_rx, _active, client_id_aliases) = create_test_handler();
2802
2803        let canonical = ClientOrderId::new("PARENT-001");
2804        let child1 = ClientOrderId::new("CHILD-001");
2805        let child2 = ClientOrderId::new("CHILD-002");
2806        let child3 = ClientOrderId::new("CHILD-003");
2807        let venue_id = VenueOrderId::new("VENUE-001");
2808
2809        client_id_aliases.insert(canonical, canonical);
2810        client_id_aliases.insert(child1, canonical);
2811        client_id_aliases.insert(child2, canonical);
2812        client_id_aliases.insert(child3, canonical);
2813
2814        handler.cleanup_terminal_order(&canonical, &venue_id);
2815
2816        assert!(!client_id_aliases.contains_key(&canonical));
2817        assert!(!client_id_aliases.contains_key(&child1));
2818        assert!(!client_id_aliases.contains_key(&child2));
2819        assert!(!client_id_aliases.contains_key(&child3));
2820        assert!(client_id_aliases.is_empty());
2821    }
2822
2823    #[rstest]
2824    fn test_handler_cleanup_does_not_affect_other_orders() {
2825        let (mut handler, _out_rx, active_client_orders, client_id_aliases) = create_test_handler();
2826
2827        let canonical1 = ClientOrderId::new("PARENT-001");
2828        let child1 = ClientOrderId::new("CHILD-001");
2829        let venue_id1 = VenueOrderId::new("VENUE-001");
2830
2831        let canonical2 = ClientOrderId::new("PARENT-002");
2832        let child2 = ClientOrderId::new("CHILD-002");
2833        let venue_id2 = VenueOrderId::new("VENUE-002");
2834
2835        let trader_id = TraderId::new("TRADER-001");
2836        let strategy_id = StrategyId::new("STRATEGY-001");
2837        let instrument_id = InstrumentId::from("ETH-USDT-PERP.OKX");
2838
2839        active_client_orders.insert(canonical1, (trader_id, strategy_id, instrument_id));
2840        active_client_orders.insert(canonical2, (trader_id, strategy_id, instrument_id));
2841        client_id_aliases.insert(canonical1, canonical1);
2842        client_id_aliases.insert(child1, canonical1);
2843        client_id_aliases.insert(canonical2, canonical2);
2844        client_id_aliases.insert(child2, canonical2);
2845        handler
2846            .fee_cache
2847            .insert(venue_id1.inner(), Money::from("0.001 USDT"));
2848        handler
2849            .fee_cache
2850            .insert(venue_id2.inner(), Money::from("0.002 USDT"));
2851
2852        handler.cleanup_terminal_order(&canonical1, &venue_id1);
2853
2854        assert!(!active_client_orders.contains_key(&canonical1));
2855        assert!(!client_id_aliases.contains_key(&canonical1));
2856        assert!(!client_id_aliases.contains_key(&child1));
2857        assert!(!handler.fee_cache.contains_key(&venue_id1.inner()));
2858
2859        assert!(active_client_orders.contains_key(&canonical2));
2860        assert!(client_id_aliases.contains_key(&canonical2));
2861        assert!(client_id_aliases.contains_key(&child2));
2862        assert!(handler.fee_cache.contains_key(&venue_id2.inner()));
2863    }
2864
2865    mod channel_routing {
2866        use nautilus_core::nanos::UnixNanos;
2867        use nautilus_model::{
2868            identifiers::{InstrumentId, Symbol},
2869            instruments::{CryptoPerpetual, CurrencyPair, Instrument, InstrumentAny},
2870            types::{Currency, Price, Quantity},
2871        };
2872        use rstest::rstest;
2873        use ustr::Ustr;
2874
2875        use super::*;
2876        use crate::{
2877            common::{enums::OKXBookAction, testing::load_test_json},
2878            websocket::{enums::OKXWsChannel, messages::OKXWsMessage},
2879        };
2880
2881        fn create_spot_instrument() -> InstrumentAny {
2882            let instrument_id = InstrumentId::from("BTC-USDT.OKX");
2883            InstrumentAny::CurrencyPair(CurrencyPair::new(
2884                instrument_id,
2885                Symbol::from("BTC-USDT"),
2886                Currency::BTC(),
2887                Currency::USDT(),
2888                2,
2889                8,
2890                Price::from("0.01"),
2891                Quantity::from("0.00000001"),
2892                None, // multiplier
2893                None, // lot_size
2894                None, // max_quantity
2895                None, // min_quantity
2896                None, // max_notional
2897                None, // min_notional
2898                None, // max_price
2899                None, // min_price
2900                None, // margin_init
2901                None, // margin_maint
2902                None, // maker_fee
2903                None, // taker_fee
2904                UnixNanos::default(),
2905                UnixNanos::default(),
2906            ))
2907        }
2908
2909        fn create_swap_instrument() -> InstrumentAny {
2910            let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
2911            InstrumentAny::CryptoPerpetual(CryptoPerpetual::new(
2912                instrument_id,
2913                Symbol::from("BTC-USDT-SWAP"),
2914                Currency::BTC(),
2915                Currency::USDT(),
2916                Currency::USDT(),
2917                false,
2918                2,
2919                8,
2920                Price::from("0.01"),
2921                Quantity::from("0.00000001"),
2922                None,
2923                None,
2924                None,
2925                None,
2926                None,
2927                None,
2928                None,
2929                None,
2930                None,
2931                None,
2932                None,
2933                None,
2934                UnixNanos::default(),
2935                UnixNanos::default(),
2936            ))
2937        }
2938
2939        fn create_handler_with_instruments(instruments: Vec<InstrumentAny>) -> OKXWsFeedHandler {
2940            let (mut handler, _, _, _) = create_test_handler();
2941            for inst in instruments {
2942                handler
2943                    .instruments_cache
2944                    .insert(inst.symbol().inner(), inst);
2945            }
2946            handler
2947        }
2948
2949        #[rstest]
2950        fn test_parse_raw_message_ticker_channel() {
2951            let json = load_test_json("ws_tickers.json");
2952            let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
2953
2954            match msg {
2955                OKXWsMessage::Data { arg, data } => {
2956                    assert!(
2957                        matches!(arg.channel, OKXWsChannel::Tickers),
2958                        "Expected Tickers channel"
2959                    );
2960                    assert_eq!(arg.inst_id, Some(Ustr::from("BTC-USDT")));
2961                    assert!(data.is_array());
2962                }
2963                _ => panic!("Expected OKXWsMessage::Data variant"),
2964            }
2965        }
2966
2967        #[rstest]
2968        fn test_parse_raw_message_trades_channel() {
2969            let json = load_test_json("ws_trades.json");
2970            let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
2971
2972            match msg {
2973                OKXWsMessage::Data { arg, data } => {
2974                    assert!(
2975                        matches!(arg.channel, OKXWsChannel::Trades),
2976                        "Expected Trades channel"
2977                    );
2978                    assert_eq!(arg.inst_id, Some(Ustr::from("BTC-USD")));
2979                    assert!(data.is_array());
2980                }
2981                _ => panic!("Expected OKXWsMessage::Data variant"),
2982            }
2983        }
2984
2985        #[rstest]
2986        fn test_parse_raw_message_books_channel() {
2987            let json = load_test_json("ws_books_snapshot.json");
2988            let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
2989
2990            match msg {
2991                OKXWsMessage::BookData { arg, action, data } => {
2992                    assert!(
2993                        matches!(arg.channel, OKXWsChannel::Books),
2994                        "Expected Books channel"
2995                    );
2996                    assert_eq!(arg.inst_id, Some(Ustr::from("BTC-USDT")));
2997                    assert!(
2998                        matches!(action, OKXBookAction::Snapshot),
2999                        "Expected snapshot action"
3000                    );
3001                    assert!(!data.is_empty());
3002                }
3003                _ => panic!("Expected OKXWsMessage::BookData variant"),
3004            }
3005        }
3006
3007        #[rstest]
3008        fn test_parse_raw_message_candle_channel() {
3009            let json = load_test_json("ws_candle.json");
3010            let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3011
3012            match msg {
3013                OKXWsMessage::Data { arg, data } => {
3014                    // Candle channel variant is Candle1Day for "candle1D"
3015                    assert!(
3016                        matches!(arg.channel, OKXWsChannel::Candle1Day),
3017                        "Expected Candle1Day channel, got {:?}",
3018                        arg.channel
3019                    );
3020                    assert_eq!(arg.inst_id, Some(Ustr::from("BTC-USDT")));
3021                    assert!(data.is_array());
3022                }
3023                _ => panic!("Expected OKXWsMessage::Data variant"),
3024            }
3025        }
3026
3027        #[rstest]
3028        fn test_parse_raw_message_funding_rate_channel() {
3029            let json = load_test_json("ws_funding_rate.json");
3030            let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3031
3032            match msg {
3033                OKXWsMessage::Data { arg, data } => {
3034                    assert!(
3035                        matches!(arg.channel, OKXWsChannel::FundingRate),
3036                        "Expected FundingRate channel"
3037                    );
3038                    assert_eq!(arg.inst_id, Some(Ustr::from("BTC-USDT-SWAP")));
3039                    assert!(data.is_array());
3040                }
3041                _ => panic!("Expected OKXWsMessage::Data variant"),
3042            }
3043        }
3044
3045        #[rstest]
3046        fn test_parse_raw_message_bbo_tbt_channel() {
3047            let json = load_test_json("ws_bbo_tbt.json");
3048            let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3049
3050            match msg {
3051                OKXWsMessage::Data { arg, data } => {
3052                    assert!(
3053                        matches!(arg.channel, OKXWsChannel::BboTbt),
3054                        "Expected BboTbt channel"
3055                    );
3056                    assert!(data.is_array());
3057                }
3058                _ => panic!("Expected OKXWsMessage::Data variant"),
3059            }
3060        }
3061
3062        #[rstest]
3063        fn test_handle_other_channel_data_tickers() {
3064            let mut handler = create_handler_with_instruments(vec![create_spot_instrument()]);
3065            let json = load_test_json("ws_tickers.json");
3066            let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3067
3068            let OKXWsMessage::Data { arg, data } = msg else {
3069                panic!("Expected OKXWsMessage::Data");
3070            };
3071
3072            let ts_init = UnixNanos::from(1_000_000_000u64);
3073            let result = handler.handle_other_channel_data(arg.channel, arg.inst_id, data, ts_init);
3074
3075            assert!(result.is_some());
3076            match result.unwrap() {
3077                NautilusWsMessage::Data(payloads) => {
3078                    assert!(!payloads.is_empty(), "Should produce data payloads");
3079                }
3080                other => panic!("Expected NautilusWsMessage::Data, got {other:?}"),
3081            }
3082        }
3083
3084        #[rstest]
3085        fn test_handle_other_channel_data_trades() {
3086            // Create instrument with BTC-USD symbol (matches test data)
3087            let instrument_id = InstrumentId::from("BTC-USD.OKX");
3088            let instrument = InstrumentAny::CurrencyPair(CurrencyPair::new(
3089                instrument_id,
3090                Symbol::from("BTC-USD"),
3091                Currency::BTC(),
3092                Currency::USD(),
3093                1,
3094                8,
3095                Price::from("0.1"),
3096                Quantity::from("0.00000001"),
3097                None, // multiplier
3098                None, // lot_size
3099                None, // max_quantity
3100                None, // min_quantity
3101                None, // max_notional
3102                None, // min_notional
3103                None, // max_price
3104                None, // min_price
3105                None, // margin_init
3106                None, // margin_maint
3107                None, // maker_fee
3108                None, // taker_fee
3109                UnixNanos::default(),
3110                UnixNanos::default(),
3111            ));
3112
3113            let mut handler = create_handler_with_instruments(vec![instrument]);
3114            let json = load_test_json("ws_trades.json");
3115            let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3116
3117            let OKXWsMessage::Data { arg, data } = msg else {
3118                panic!("Expected OKXWsMessage::Data");
3119            };
3120
3121            let ts_init = UnixNanos::from(1_000_000_000u64);
3122            let result = handler.handle_other_channel_data(arg.channel, arg.inst_id, data, ts_init);
3123
3124            assert!(result.is_some());
3125            match result.unwrap() {
3126                NautilusWsMessage::Data(payloads) => {
3127                    assert!(!payloads.is_empty(), "Should produce trade data payloads");
3128                }
3129                other => panic!("Expected NautilusWsMessage::Data, got {other:?}"),
3130            }
3131        }
3132
3133        #[rstest]
3134        fn test_handle_book_data_snapshot() {
3135            let handler = create_handler_with_instruments(vec![create_spot_instrument()]);
3136            let json = load_test_json("ws_books_snapshot.json");
3137            let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3138
3139            let OKXWsMessage::BookData { arg, action, data } = msg else {
3140                panic!("Expected OKXWsMessage::BookData");
3141            };
3142
3143            let ts_init = UnixNanos::from(1_000_000_000u64);
3144            let result = handler.handle_book_data(arg, action, data, ts_init);
3145
3146            assert!(result.is_some());
3147            match result.unwrap() {
3148                NautilusWsMessage::Data(payloads) => {
3149                    assert!(!payloads.is_empty(), "Should produce order book payloads");
3150                }
3151                other => panic!("Expected NautilusWsMessage::Data, got {other:?}"),
3152            }
3153        }
3154
3155        #[rstest]
3156        fn test_handle_book_data_update() {
3157            let handler = create_handler_with_instruments(vec![create_spot_instrument()]);
3158            let json = load_test_json("ws_books_update.json");
3159            let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3160
3161            let OKXWsMessage::BookData { arg, action, data } = msg else {
3162                panic!("Expected OKXWsMessage::BookData");
3163            };
3164
3165            let ts_init = UnixNanos::from(1_000_000_000u64);
3166            let result = handler.handle_book_data(arg, action, data, ts_init);
3167
3168            assert!(result.is_some());
3169            match result.unwrap() {
3170                NautilusWsMessage::Data(payloads) => {
3171                    assert!(
3172                        !payloads.is_empty(),
3173                        "Should produce order book delta payloads"
3174                    );
3175                }
3176                other => panic!("Expected NautilusWsMessage::Data, got {other:?}"),
3177            }
3178        }
3179
3180        #[rstest]
3181        fn test_handle_other_channel_data_candles() {
3182            let mut handler = create_handler_with_instruments(vec![create_spot_instrument()]);
3183            let json = load_test_json("ws_candle.json");
3184            let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3185
3186            let OKXWsMessage::Data { arg, data } = msg else {
3187                panic!("Expected OKXWsMessage::Data");
3188            };
3189
3190            let ts_init = UnixNanos::from(1_000_000_000u64);
3191            let result = handler.handle_other_channel_data(arg.channel, arg.inst_id, data, ts_init);
3192
3193            assert!(result.is_some());
3194            match result.unwrap() {
3195                NautilusWsMessage::Data(payloads) => {
3196                    assert!(!payloads.is_empty(), "Should produce bar data payloads");
3197                }
3198                other => panic!("Expected NautilusWsMessage::Data, got {other:?}"),
3199            }
3200        }
3201
3202        #[rstest]
3203        fn test_handle_other_channel_data_funding_rate() {
3204            let mut handler = create_handler_with_instruments(vec![create_swap_instrument()]);
3205            let json = load_test_json("ws_funding_rate.json");
3206            let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3207
3208            let OKXWsMessage::Data { arg, data } = msg else {
3209                panic!("Expected OKXWsMessage::Data");
3210            };
3211
3212            let ts_init = UnixNanos::from(1_000_000_000u64);
3213            let result = handler.handle_other_channel_data(arg.channel, arg.inst_id, data, ts_init);
3214
3215            // Funding rate returns FundingRates variant when rate changes
3216            assert!(result.is_none() || matches!(result, Some(NautilusWsMessage::FundingRates(_))));
3217        }
3218
3219        #[rstest]
3220        fn test_handle_account_data_parses_successfully() {
3221            let mut handler = create_handler_with_instruments(vec![]);
3222            let json = load_test_json("ws_account.json");
3223            let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3224
3225            let OKXWsMessage::Data { data, .. } = msg else {
3226                panic!("Expected OKXWsMessage::Data");
3227            };
3228
3229            let ts_init = UnixNanos::from(1_000_000_000u64);
3230            let result = handler.handle_account_data(data, ts_init);
3231
3232            assert!(result.is_some());
3233            match result.unwrap() {
3234                NautilusWsMessage::AccountUpdate(account_state) => {
3235                    assert!(
3236                        !account_state.balances.is_empty(),
3237                        "Should have balance data"
3238                    );
3239                }
3240                other => panic!("Expected NautilusWsMessage::AccountUpdate, got {other:?}"),
3241            }
3242        }
3243
3244        #[rstest]
3245        fn test_handle_other_channel_data_missing_instrument() {
3246            let mut handler = create_handler_with_instruments(vec![]);
3247            let json = load_test_json("ws_tickers.json");
3248            let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3249
3250            let OKXWsMessage::Data { arg, data } = msg else {
3251                panic!("Expected OKXWsMessage::Data");
3252            };
3253
3254            let ts_init = UnixNanos::from(1_000_000_000u64);
3255            let result = handler.handle_other_channel_data(arg.channel, arg.inst_id, data, ts_init);
3256
3257            // Should return None when instrument is not in cache
3258            assert!(result.is_none());
3259        }
3260
3261        #[rstest]
3262        fn test_handle_book_data_missing_instrument() {
3263            let handler = create_handler_with_instruments(vec![]);
3264            let json = load_test_json("ws_books_snapshot.json");
3265            let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3266
3267            let OKXWsMessage::BookData { arg, action, data } = msg else {
3268                panic!("Expected OKXWsMessage::BookData");
3269            };
3270
3271            let ts_init = UnixNanos::from(1_000_000_000u64);
3272            let result = handler.handle_book_data(arg, action, data, ts_init);
3273
3274            // Should return None when instrument is not in cache
3275            assert!(result.is_none());
3276        }
3277    }
3278}