Skip to main content

nautilus_bitmex/websocket/
handler.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! WebSocket message handler for BitMEX.
17
18use std::{
19    collections::VecDeque,
20    sync::{
21        Arc,
22        atomic::{AtomicBool, Ordering},
23    },
24};
25
26use ahash::AHashMap;
27use dashmap::DashMap;
28use nautilus_common::cache::quote::QuoteCache;
29use nautilus_core::{UnixNanos, time::get_atomic_clock_realtime};
30use nautilus_model::{
31    data::Data,
32    enums::{OrderStatus, OrderType},
33    identifiers::{AccountId, ClientOrderId},
34    instruments::{Instrument, InstrumentAny},
35    types::Price,
36};
37use nautilus_network::{
38    RECONNECTED,
39    retry::{RetryManager, create_websocket_retry_manager},
40    websocket::{AuthTracker, SubscriptionState, WebSocketClient},
41};
42use tokio_tungstenite::tungstenite::Message;
43use ustr::Ustr;
44
45use super::{
46    enums::{BitmexAction, BitmexWsAuthAction, BitmexWsOperation, BitmexWsTopic},
47    error::BitmexWsError,
48    messages::{
49        BitmexExecutionMsg, BitmexFundingMsg, BitmexHttpRequest, BitmexInstrumentMsg,
50        BitmexOrderBook10Msg, BitmexOrderBookMsg, BitmexPositionMsg, BitmexQuoteMsg,
51        BitmexTableMessage, BitmexTradeBinMsg, BitmexTradeMsg, BitmexWalletMsg, BitmexWsMessage,
52        NautilusWsMessage, OrderData,
53    },
54    parse::{
55        parse_book_msg_vec, parse_book10_msg_vec, parse_execution_msg, parse_funding_msg,
56        parse_instrument_msg, parse_order_msg, parse_order_update_msg, parse_position_msg,
57        parse_trade_bin_msg_vec, parse_trade_msg_vec, parse_wallet_msg,
58    },
59};
60use crate::{
61    common::{
62        enums::{BitmexExecType, BitmexOrderType, BitmexPegPriceType},
63        parse::parse_contracts_quantity,
64    },
65    http::parse::{InstrumentParseResult, parse_instrument_any},
66};
67
68/// Commands sent from the outer client to the inner message handler.
69#[derive(Debug)]
70#[allow(
71    clippy::large_enum_variant,
72    reason = "Commands are ephemeral and immediately consumed"
73)]
74pub enum HandlerCommand {
75    /// Set the WebSocketClient for the handler to use.
76    SetClient(WebSocketClient),
77    /// Disconnect the WebSocket connection.
78    Disconnect,
79    /// Send authentication payload to the WebSocket.
80    Authenticate { payload: String },
81    /// Subscribe to the given topics.
82    Subscribe { topics: Vec<String> },
83    /// Unsubscribe from the given topics.
84    Unsubscribe { topics: Vec<String> },
85    /// Initialize the instruments cache with the given instruments.
86    InitializeInstruments(Vec<InstrumentAny>),
87    /// Update a single instrument in the cache.
88    UpdateInstrument(InstrumentAny),
89}
90
91pub(super) struct FeedHandler {
92    account_id: AccountId,
93    signal: Arc<AtomicBool>,
94    client: Option<WebSocketClient>,
95    cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
96    raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
97    out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
98    auth_tracker: AuthTracker,
99    subscriptions: SubscriptionState,
100    retry_manager: RetryManager<BitmexWsError>,
101    instruments_cache: AHashMap<Ustr, InstrumentAny>,
102    order_type_cache: Arc<DashMap<ClientOrderId, OrderType>>,
103    order_symbol_cache: Arc<DashMap<ClientOrderId, Ustr>>,
104    quote_cache: QuoteCache,
105    pending_msgs: VecDeque<NautilusWsMessage>,
106}
107
108impl FeedHandler {
109    /// Creates a new [`FeedHandler`] instance.
110    #[allow(clippy::too_many_arguments)]
111    pub(super) fn new(
112        signal: Arc<AtomicBool>,
113        cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
114        raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
115        out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
116        account_id: AccountId,
117        auth_tracker: AuthTracker,
118        subscriptions: SubscriptionState,
119        order_type_cache: Arc<DashMap<ClientOrderId, OrderType>>,
120        order_symbol_cache: Arc<DashMap<ClientOrderId, Ustr>>,
121    ) -> Self {
122        Self {
123            account_id,
124            signal,
125            client: None,
126            cmd_rx,
127            raw_rx,
128            out_tx,
129            auth_tracker,
130            subscriptions,
131            retry_manager: create_websocket_retry_manager(),
132            instruments_cache: AHashMap::new(),
133            order_type_cache,
134            order_symbol_cache,
135            quote_cache: QuoteCache::new(),
136            pending_msgs: VecDeque::new(),
137        }
138    }
139
140    pub(super) fn is_stopped(&self) -> bool {
141        self.signal.load(Ordering::Relaxed)
142    }
143
144    pub(super) fn send(&self, msg: NautilusWsMessage) -> Result<(), ()> {
145        self.out_tx.send(msg).map_err(|_| ())
146    }
147
148    /// Sends a WebSocket message with retry logic.
149    async fn send_with_retry(&self, payload: String) -> anyhow::Result<()> {
150        if let Some(client) = &self.client {
151            self.retry_manager
152                .execute_with_retry(
153                    "websocket_send",
154                    || {
155                        let payload = payload.clone();
156                        async move {
157                            client.send_text(payload, None).await.map_err(|e| {
158                                BitmexWsError::ClientError(format!("Send failed: {e}"))
159                            })
160                        }
161                    },
162                    should_retry_bitmex_error,
163                    create_bitmex_timeout_error,
164                )
165                .await
166                .map_err(|e| anyhow::anyhow!("{e}"))
167        } else {
168            Err(anyhow::anyhow!("No active WebSocket client"))
169        }
170    }
171
172    #[inline]
173    fn get_instrument(
174        cache: &AHashMap<Ustr, InstrumentAny>,
175        symbol: &Ustr,
176    ) -> Option<InstrumentAny> {
177        cache.get(symbol).cloned()
178    }
179
180    pub(super) async fn next(&mut self) -> Option<NautilusWsMessage> {
181        if let Some(msg) = self.pending_msgs.pop_front() {
182            return Some(msg);
183        }
184
185        let clock = get_atomic_clock_realtime();
186
187        loop {
188            tokio::select! {
189                Some(cmd) = self.cmd_rx.recv() => {
190                    match cmd {
191                        HandlerCommand::SetClient(client) => {
192                            log::debug!("WebSocketClient received by handler");
193                            self.client = Some(client);
194                        }
195                        HandlerCommand::Disconnect => {
196                            log::debug!("Disconnect command received");
197                            if let Some(client) = self.client.take() {
198                                client.disconnect().await;
199                            }
200                        }
201                        HandlerCommand::Authenticate { payload } => {
202                            log::debug!("Authenticate command received");
203                            if let Err(e) = self.send_with_retry(payload).await {
204                                log::error!("Failed to send authentication after retries: {e}");
205                            }
206                        }
207                        HandlerCommand::Subscribe { topics } => {
208                            for topic in topics {
209                                log::debug!("Subscribing to topic: {topic}");
210                                if let Err(e) = self.send_with_retry(topic.clone()).await {
211                                    log::error!("Failed to send subscription after retries: topic={topic}, error={e}");
212                                }
213                            }
214                        }
215                        HandlerCommand::Unsubscribe { topics } => {
216                            for topic in topics {
217                                log::debug!("Unsubscribing from topic: {topic}");
218                                if let Err(e) = self.send_with_retry(topic.clone()).await {
219                                    log::error!("Failed to send unsubscription after retries: topic={topic}, error={e}");
220                                }
221                            }
222                        }
223                        HandlerCommand::InitializeInstruments(instruments) => {
224                            for inst in instruments {
225                                self.instruments_cache.insert(inst.symbol().inner(), inst);
226                            }
227                        }
228                        HandlerCommand::UpdateInstrument(inst) => {
229                            self.instruments_cache.insert(inst.symbol().inner(), inst);
230                        }
231                    }
232                    // Continue processing following command
233                    continue;
234                }
235
236                () = tokio::time::sleep(std::time::Duration::from_millis(100)) => {
237                    if self.signal.load(std::sync::atomic::Ordering::Relaxed) {
238                        log::debug!("Stop signal received during idle period");
239                        return None;
240                    }
241                    continue;
242                }
243
244                msg = self.raw_rx.recv() => {
245                    let msg = match msg {
246                        Some(msg) => msg,
247                        None => {
248                            log::debug!("WebSocket stream closed");
249                            return None;
250                        }
251                    };
252
253                    // Handle ping frames directly for minimal latency
254                    if let Message::Ping(data) = &msg {
255                        log::trace!("Received ping frame with {} bytes", data.len());
256                        if let Some(client) = &self.client
257                            && let Err(e) = client.send_pong(data.to_vec()).await
258                        {
259                            log::warn!("Failed to send pong frame: {e}");
260                        }
261                        continue;
262                    }
263
264                    let event = match Self::parse_raw_message(msg) {
265                        Some(event) => event,
266                        None => continue,
267                    };
268
269                    if self.signal.load(std::sync::atomic::Ordering::Relaxed) {
270                        log::debug!("Stop signal received");
271                        return None;
272                    }
273
274            match event {
275                BitmexWsMessage::Reconnected => {
276                    self.quote_cache.clear();
277                    self.order_type_cache.clear();
278                    self.order_symbol_cache.clear();
279                    return Some(NautilusWsMessage::Reconnected);
280                }
281                BitmexWsMessage::Subscription {
282                    success,
283                    subscribe,
284                    request,
285                    error,
286                } => {
287                    if let Some(msg) = self.handle_subscription_message(
288                        success,
289                        subscribe.as_ref(),
290                        request.as_ref(),
291                        error.as_deref(),
292                    ) {
293                        return Some(msg);
294                    }
295                    continue;
296                }
297                BitmexWsMessage::Table(table_msg) => {
298                    let ts_init = clock.get_time_ns();
299
300                    let msg = match table_msg {
301                        BitmexTableMessage::OrderBookL2 { action, data } => {
302                            self.handle_orderbook_l2(action, data, ts_init)
303                        }
304                        BitmexTableMessage::OrderBookL2_25 { action, data } => {
305                            self.handle_orderbook_l2(action, data, ts_init)
306                        }
307                        BitmexTableMessage::OrderBook10 { data, .. } => {
308                            self.handle_orderbook_10(data, ts_init)
309                        }
310                        BitmexTableMessage::Quote { data, .. } => {
311                            self.handle_quote(data, ts_init)
312                        }
313                        BitmexTableMessage::Trade { data, .. } => {
314                            self.handle_trade(data, ts_init)
315                        }
316                        BitmexTableMessage::TradeBin1m { action, data } => {
317                            self.handle_trade_bin(action, data, BitmexWsTopic::TradeBin1m, ts_init)
318                        }
319                        BitmexTableMessage::TradeBin5m { action, data } => {
320                            self.handle_trade_bin(action, data, BitmexWsTopic::TradeBin5m, ts_init)
321                        }
322                        BitmexTableMessage::TradeBin1h { action, data } => {
323                            self.handle_trade_bin(action, data, BitmexWsTopic::TradeBin1h, ts_init)
324                        }
325                        BitmexTableMessage::TradeBin1d { action, data } => {
326                            self.handle_trade_bin(action, data, BitmexWsTopic::TradeBin1d, ts_init)
327                        }
328                        // Execution messages
329                        // Note: BitMEX may send duplicate order status updates for the same order
330                        // (e.g., immediate response + stream update). This is expected behavior.
331                        BitmexTableMessage::Order { data, .. } => {
332                            let mut msgs = self.handle_order(data);
333                            if msgs.is_empty() {
334                                None
335                            } else {
336                                // Buffer overflow messages for subsequent next() calls
337                                let first = msgs.remove(0);
338                                self.pending_msgs.extend(msgs);
339                                Some(first)
340                            }
341                        }
342                        BitmexTableMessage::Execution { data, .. } => {
343                            self.handle_execution(data)
344                        }
345                        BitmexTableMessage::Position { data, .. } => {
346                            self.handle_position(data)
347                        }
348                        BitmexTableMessage::Wallet { data, .. } => {
349                            self.handle_wallet(data, ts_init)
350                        }
351                        BitmexTableMessage::Margin { .. } => {
352                            // Skip margin messages - BitMEX uses account-level cross-margin
353                            // which doesn't map well to Nautilus's per-instrument margin model
354                            None
355                        }
356                        BitmexTableMessage::Instrument { action, data } => {
357                            self.handle_instrument(action, data, ts_init)
358                        }
359                        BitmexTableMessage::Funding { data, .. } => {
360                            self.handle_funding(data, ts_init)
361                        }
362                        _ => {
363                            // Other message types not yet implemented
364                            log::warn!("Unhandled table message type: {table_msg:?}");
365                            None
366                        }
367                    };
368
369                    if let Some(msg) = msg {
370                        return Some(msg);
371                    }
372                    continue;
373                }
374                BitmexWsMessage::Welcome { .. } | BitmexWsMessage::Error { .. } => continue,
375            }
376                }
377
378                // Handle shutdown - either channel closed or stream ended
379                else => {
380                    log::debug!("Handler shutting down: stream ended or command channel closed");
381                    return None;
382                }
383            }
384        }
385    }
386
387    fn parse_raw_message(msg: Message) -> Option<BitmexWsMessage> {
388        match msg {
389            Message::Text(text) => {
390                if text == RECONNECTED {
391                    log::info!("Received WebSocket reconnected signal");
392                    return Some(BitmexWsMessage::Reconnected);
393                }
394
395                log::trace!("Raw websocket message: {text}");
396
397                if Self::is_heartbeat_message(&text) {
398                    log::trace!("Ignoring heartbeat control message: {text}");
399                    return None;
400                }
401
402                match serde_json::from_str(&text) {
403                    Ok(msg) => match &msg {
404                        BitmexWsMessage::Welcome {
405                            version,
406                            heartbeat_enabled,
407                            limit,
408                            ..
409                        } => {
410                            log::info!(
411                                "Welcome to the BitMEX Realtime API: version={}, heartbeat={}, rate_limit={:?}",
412                                version,
413                                heartbeat_enabled,
414                                limit.remaining,
415                            );
416                        }
417                        BitmexWsMessage::Subscription { .. } => return Some(msg),
418                        BitmexWsMessage::Error { status, error, .. } => {
419                            log::error!(
420                                "Received error from BitMEX: status={status}, error={error}",
421                            );
422                        }
423                        _ => return Some(msg),
424                    },
425                    Err(e) => {
426                        log::error!("Failed to parse WebSocket message: {e}: {text}");
427                    }
428                }
429            }
430            Message::Binary(msg) => {
431                log::debug!("Raw binary: {msg:?}");
432            }
433            Message::Close(_) => {
434                log::debug!("Received close message, waiting for reconnection");
435            }
436            Message::Ping(data) => {
437                // Handled in select! loop before parse_raw_message
438                log::trace!("Ping frame with {} bytes (already handled)", data.len());
439            }
440            Message::Pong(data) => {
441                log::trace!("Received pong frame with {} bytes", data.len());
442            }
443            Message::Frame(frame) => {
444                log::debug!("Received raw frame: {frame:?}");
445            }
446        }
447
448        None
449    }
450
451    fn is_heartbeat_message(text: &str) -> bool {
452        let trimmed = text.trim();
453
454        if !trimmed.starts_with('{') || trimmed.len() > 64 {
455            return false;
456        }
457
458        trimmed.contains("\"op\":\"ping\"") || trimmed.contains("\"op\":\"pong\"")
459    }
460
461    fn handle_subscription_ack(
462        &self,
463        success: bool,
464        request: Option<&BitmexHttpRequest>,
465        subscribe: Option<&String>,
466        error: Option<&str>,
467    ) {
468        let topics = Self::topics_from_request(request, subscribe);
469
470        if topics.is_empty() {
471            log::debug!("Subscription acknowledgement without topics");
472            return;
473        }
474
475        for topic in topics {
476            if success {
477                self.subscriptions.confirm_subscribe(topic);
478                log::debug!("Subscription confirmed: topic={topic}");
479            } else {
480                self.subscriptions.mark_failure(topic);
481                let reason = error.unwrap_or("Subscription rejected");
482                log::error!("Subscription failed: topic={topic}, error={reason}");
483            }
484        }
485    }
486
487    fn handle_unsubscribe_ack(
488        &self,
489        success: bool,
490        request: Option<&BitmexHttpRequest>,
491        subscribe: Option<&String>,
492        error: Option<&str>,
493    ) {
494        let topics = Self::topics_from_request(request, subscribe);
495
496        if topics.is_empty() {
497            log::debug!("Unsubscription acknowledgement without topics");
498            return;
499        }
500
501        for topic in topics {
502            if success {
503                log::debug!("Unsubscription confirmed: topic={topic}");
504                self.subscriptions.confirm_unsubscribe(topic);
505            } else {
506                let reason = error.unwrap_or("Unsubscription rejected");
507                log::error!(
508                    "Unsubscription failed - restoring subscription: topic={topic}, error={reason}",
509                );
510                // Venue rejected unsubscribe, so we're still subscribed. Restore state:
511                self.subscriptions.confirm_unsubscribe(topic); // Clear pending_unsubscribe
512                self.subscriptions.mark_subscribe(topic); // Mark as subscribing
513                self.subscriptions.confirm_subscribe(topic); // Confirm subscription
514            }
515        }
516    }
517
518    fn topics_from_request<'a>(
519        request: Option<&'a BitmexHttpRequest>,
520        fallback: Option<&'a String>,
521    ) -> Vec<&'a str> {
522        if let Some(req) = request
523            && !req.args.is_empty()
524        {
525            return req.args.iter().filter_map(|arg| arg.as_str()).collect();
526        }
527
528        fallback.into_iter().map(|topic| topic.as_str()).collect()
529    }
530
531    fn handle_orderbook_l2(
532        &self,
533        action: BitmexAction,
534        data: Vec<BitmexOrderBookMsg>,
535        ts_init: UnixNanos,
536    ) -> Option<NautilusWsMessage> {
537        if data.is_empty() {
538            return None;
539        }
540        let data = parse_book_msg_vec(data, action, &self.instruments_cache, ts_init);
541        Some(NautilusWsMessage::Data(data))
542    }
543
544    fn handle_orderbook_10(
545        &self,
546        data: Vec<BitmexOrderBook10Msg>,
547        ts_init: UnixNanos,
548    ) -> Option<NautilusWsMessage> {
549        if data.is_empty() {
550            return None;
551        }
552        let data = parse_book10_msg_vec(data, &self.instruments_cache, ts_init);
553        Some(NautilusWsMessage::Data(data))
554    }
555
556    fn handle_quote(
557        &mut self,
558        data: Vec<BitmexQuoteMsg>,
559        ts_init: UnixNanos,
560    ) -> Option<NautilusWsMessage> {
561        // Index symbols may return empty quote data
562        if data.is_empty() {
563            return None;
564        }
565
566        let mut quotes = Vec::with_capacity(data.len());
567
568        for msg in data {
569            let Some(instrument) = Self::get_instrument(&self.instruments_cache, &msg.symbol)
570            else {
571                log::error!(
572                    "Instrument cache miss: quote message dropped for symbol={}",
573                    msg.symbol
574                );
575                continue;
576            };
577
578            let instrument_id = instrument.id();
579            let price_precision = instrument.price_precision();
580
581            let bid_price = msg.bid_price.map(|p| Price::new(p, price_precision));
582            let ask_price = msg.ask_price.map(|p| Price::new(p, price_precision));
583            let bid_size = msg
584                .bid_size
585                .map(|s| parse_contracts_quantity(s, &instrument));
586            let ask_size = msg
587                .ask_size
588                .map(|s| parse_contracts_quantity(s, &instrument));
589            let ts_event = UnixNanos::from(msg.timestamp);
590
591            match self.quote_cache.process(
592                instrument_id,
593                bid_price,
594                ask_price,
595                bid_size,
596                ask_size,
597                ts_event,
598                ts_init,
599            ) {
600                Ok(quote) => quotes.push(Data::Quote(quote)),
601                Err(e) => {
602                    log::warn!("Failed to process quote for {}: {e}", msg.symbol);
603                }
604            }
605        }
606
607        if quotes.is_empty() {
608            return None;
609        }
610
611        Some(NautilusWsMessage::Data(quotes))
612    }
613
614    fn handle_trade(
615        &self,
616        data: Vec<BitmexTradeMsg>,
617        ts_init: UnixNanos,
618    ) -> Option<NautilusWsMessage> {
619        if data.is_empty() {
620            return None;
621        }
622        let data = parse_trade_msg_vec(data, &self.instruments_cache, ts_init);
623        Some(NautilusWsMessage::Data(data))
624    }
625
626    fn handle_trade_bin(
627        &self,
628        action: BitmexAction,
629        data: Vec<BitmexTradeBinMsg>,
630        topic: BitmexWsTopic,
631        ts_init: UnixNanos,
632    ) -> Option<NautilusWsMessage> {
633        if action == BitmexAction::Partial || data.is_empty() {
634            return None;
635        }
636        let data = parse_trade_bin_msg_vec(data, topic, &self.instruments_cache, ts_init);
637        Some(NautilusWsMessage::Data(data))
638    }
639
640    fn handle_order(&mut self, data: Vec<OrderData>) -> Vec<NautilusWsMessage> {
641        let mut reports = Vec::with_capacity(data.len());
642        let mut updates = Vec::new();
643
644        for order_data in data {
645            match order_data {
646                OrderData::Full(order_msg) => {
647                    let Some(instrument) =
648                        Self::get_instrument(&self.instruments_cache, &order_msg.symbol)
649                    else {
650                        log::error!(
651                            "Instrument cache miss: order message dropped for symbol={}, order_id={}",
652                            order_msg.symbol,
653                            order_msg.order_id
654                        );
655                        continue;
656                    };
657
658                    match parse_order_msg(&order_msg, &instrument, &self.order_type_cache) {
659                        Ok(report) => {
660                            // Cache the order type and symbol AFTER successful parse
661                            if let Some(client_order_id) = &order_msg.cl_ord_id {
662                                let client_order_id = ClientOrderId::new(client_order_id);
663
664                                if let Some(ord_type) = &order_msg.ord_type {
665                                    // Pegged orders with TrailingStopPeg are trailing stop orders
666                                    let order_type: OrderType = if *ord_type
667                                        == BitmexOrderType::Pegged
668                                        && order_msg.peg_price_type
669                                            == Some(BitmexPegPriceType::TrailingStopPeg)
670                                    {
671                                        if order_msg.price.is_some() {
672                                            OrderType::TrailingStopLimit
673                                        } else {
674                                            OrderType::TrailingStopMarket
675                                        }
676                                    } else {
677                                        (*ord_type).into()
678                                    };
679                                    self.order_type_cache.insert(client_order_id, order_type);
680                                }
681
682                                // Cache symbol for execution message routing
683                                self.order_symbol_cache
684                                    .insert(client_order_id, order_msg.symbol);
685                            }
686
687                            if is_terminal_order_status(report.order_status)
688                                && let Some(client_id) = report.client_order_id
689                            {
690                                self.order_type_cache.remove(&client_id);
691                                self.order_symbol_cache.remove(&client_id);
692                            }
693
694                            reports.push(report);
695                        }
696                        Err(e) => {
697                            log::error!(
698                                "Failed to parse full order message - potential data loss: \
699                                error={e}, symbol={}, order_id={}, time_in_force={:?}",
700                                order_msg.symbol,
701                                order_msg.order_id,
702                                order_msg.time_in_force,
703                            );
704                            // TODO: Add metric counter for parse failures
705                            continue;
706                        }
707                    }
708                }
709                OrderData::Update(msg) => {
710                    let Some(instrument) =
711                        Self::get_instrument(&self.instruments_cache, &msg.symbol)
712                    else {
713                        log::error!(
714                            "Instrument cache miss: order update dropped for symbol={}, order_id={}",
715                            msg.symbol,
716                            msg.order_id
717                        );
718                        continue;
719                    };
720
721                    // Populate cache for execution message routing
722                    if let Some(cl_ord_id) = &msg.cl_ord_id {
723                        let client_order_id = ClientOrderId::new(cl_ord_id);
724                        self.order_symbol_cache.insert(client_order_id, msg.symbol);
725                    }
726
727                    if let Some(event) = parse_order_update_msg(&msg, &instrument, self.account_id)
728                    {
729                        updates.push(event);
730                    } else {
731                        log::warn!(
732                            "Skipped order update message (insufficient data): \
733                            order_id={}, price={:?}",
734                            msg.order_id,
735                            msg.price,
736                        );
737                    }
738                }
739            }
740        }
741
742        let mut msgs = Vec::new();
743
744        if !reports.is_empty() {
745            msgs.push(NautilusWsMessage::OrderStatusReports(reports));
746        }
747
748        if !updates.is_empty() {
749            msgs.push(NautilusWsMessage::OrderUpdates(updates));
750        }
751
752        msgs
753    }
754
755    fn handle_execution(&mut self, data: Vec<BitmexExecutionMsg>) -> Option<NautilusWsMessage> {
756        let mut fills = Vec::with_capacity(data.len());
757
758        for exec_msg in data {
759            // Try to get symbol, fall back to cache lookup if missing
760            let symbol_opt = if let Some(sym) = &exec_msg.symbol {
761                Some(*sym)
762            } else if let Some(cl_ord_id) = &exec_msg.cl_ord_id {
763                // Try to look up symbol from order_symbol_cache
764                let client_order_id = ClientOrderId::new(cl_ord_id);
765                self.order_symbol_cache
766                    .get(&client_order_id)
767                    .map(|r| *r.value())
768            } else {
769                None
770            };
771
772            let Some(symbol) = symbol_opt else {
773                // Symbol missing - log appropriately based on exec type and whether we had clOrdID
774                if let Some(cl_ord_id) = &exec_msg.cl_ord_id {
775                    if exec_msg.exec_type == Some(BitmexExecType::Trade) {
776                        log::warn!(
777                            "Execution message missing symbol and not found in cache: \
778                            cl_ord_id={cl_ord_id}, exec_id={:?}, ord_rej_reason={:?}, text={:?}",
779                            exec_msg.exec_id,
780                            exec_msg.ord_rej_reason,
781                            exec_msg.text,
782                        );
783                    } else {
784                        log::debug!(
785                            "Execution message missing symbol and not found in cache: \
786                            cl_ord_id={cl_ord_id}, exec_id={:?}, exec_type={:?}, \
787                            ord_rej_reason={:?}, text={:?}",
788                            exec_msg.exec_id,
789                            exec_msg.exec_type,
790                            exec_msg.ord_rej_reason,
791                            exec_msg.text,
792                        );
793                    }
794                } else {
795                    // CancelReject messages without symbol/clOrdID are expected when using
796                    // redundant cancel broadcasting - one cancel succeeds, others arrive late
797                    // and BitMEX responds with CancelReject but doesn't populate the fields
798                    if exec_msg.exec_type == Some(BitmexExecType::CancelReject) {
799                        log::debug!(
800                            "CancelReject message missing symbol/clOrdID (expected with redundant cancels): \
801                            exec_id={:?}, order_id={:?}",
802                            exec_msg.exec_id,
803                            exec_msg.order_id,
804                        );
805                    } else {
806                        log::warn!(
807                            "Execution message missing both symbol and clOrdID, cannot process: \
808                            exec_id={:?}, order_id={:?}, exec_type={:?}, \
809                            ord_rej_reason={:?}, text={:?}",
810                            exec_msg.exec_id,
811                            exec_msg.order_id,
812                            exec_msg.exec_type,
813                            exec_msg.ord_rej_reason,
814                            exec_msg.text,
815                        );
816                    }
817                }
818                continue;
819            };
820
821            let Some(instrument) = Self::get_instrument(&self.instruments_cache, &symbol) else {
822                log::error!(
823                    "Instrument cache miss: execution message dropped for symbol={}, exec_id={:?}, exec_type={:?}, Liquidation/ADL fills may be lost",
824                    symbol,
825                    exec_msg.exec_id,
826                    exec_msg.exec_type
827                );
828                continue;
829            };
830
831            if let Some(fill) = parse_execution_msg(exec_msg, &instrument) {
832                fills.push(fill);
833            }
834        }
835
836        if fills.is_empty() {
837            return None;
838        }
839        Some(NautilusWsMessage::FillReports(fills))
840    }
841
842    fn handle_position(&self, data: Vec<BitmexPositionMsg>) -> Option<NautilusWsMessage> {
843        if data.is_empty() {
844            return None;
845        }
846
847        let mut reports = Vec::with_capacity(data.len());
848
849        for pos_msg in data {
850            let Some(instrument) = Self::get_instrument(&self.instruments_cache, &pos_msg.symbol)
851            else {
852                log::error!(
853                    "Instrument cache miss: position message dropped for symbol={}, account={}",
854                    pos_msg.symbol,
855                    pos_msg.account
856                );
857                continue;
858            };
859            reports.push(parse_position_msg(pos_msg, &instrument));
860        }
861
862        if reports.is_empty() {
863            return None;
864        }
865
866        Some(NautilusWsMessage::PositionStatusReports(reports))
867    }
868
869    fn handle_wallet(
870        &self,
871        data: Vec<BitmexWalletMsg>,
872        ts_init: UnixNanos,
873    ) -> Option<NautilusWsMessage> {
874        if data.is_empty() {
875            return None;
876        }
877
878        let states: Vec<_> = data
879            .into_iter()
880            .map(|wallet_msg| parse_wallet_msg(wallet_msg, ts_init))
881            .collect();
882
883        Some(NautilusWsMessage::AccountStates(states))
884    }
885
886    fn handle_instrument(
887        &mut self,
888        action: BitmexAction,
889        data: Vec<BitmexInstrumentMsg>,
890        ts_init: UnixNanos,
891    ) -> Option<NautilusWsMessage> {
892        match action {
893            BitmexAction::Partial | BitmexAction::Insert => {
894                let mut instruments = Vec::with_capacity(data.len());
895                let mut temp_cache = AHashMap::new();
896
897                let data_for_prices = data.clone();
898
899                for msg in data {
900                    match msg.try_into() {
901                        Ok(http_inst) => {
902                            match parse_instrument_any(&http_inst, ts_init) {
903                                InstrumentParseResult::Ok(boxed) => {
904                                    let instrument_any = *boxed;
905                                    let symbol = instrument_any.symbol().inner();
906                                    temp_cache.insert(symbol, instrument_any.clone());
907                                    instruments.push(instrument_any);
908                                }
909                                InstrumentParseResult::Unsupported { .. }
910                                | InstrumentParseResult::Inactive { .. } => {
911                                    // Silently skip unsupported or inactive instruments
912                                }
913                                InstrumentParseResult::Failed {
914                                    symbol,
915                                    instrument_type,
916                                    error,
917                                } => {
918                                    log::warn!(
919                                        "Failed to parse instrument {symbol} ({instrument_type:?}): {error}"
920                                    );
921                                }
922                            }
923                        }
924                        Err(e) => {
925                            log::debug!("Skipping instrument (missing required fields): {e}");
926                        }
927                    }
928                }
929
930                // Update instruments_cache with new instruments
931                for (symbol, instrument) in &temp_cache {
932                    self.instruments_cache.insert(*symbol, instrument.clone());
933                }
934
935                if !instruments.is_empty()
936                    && let Err(e) = self
937                        .out_tx
938                        .send(NautilusWsMessage::Instruments(instruments))
939                {
940                    log::error!("Error sending instruments: {e}");
941                }
942
943                let mut data_msgs = Vec::with_capacity(data_for_prices.len());
944
945                for msg in data_for_prices {
946                    let parsed = parse_instrument_msg(msg, &temp_cache, ts_init);
947                    data_msgs.extend(parsed);
948                }
949
950                if data_msgs.is_empty() {
951                    return None;
952                }
953                Some(NautilusWsMessage::Data(data_msgs))
954            }
955            BitmexAction::Update => {
956                let mut data_msgs = Vec::with_capacity(data.len());
957
958                for msg in data {
959                    let parsed = parse_instrument_msg(msg, &self.instruments_cache, ts_init);
960                    data_msgs.extend(parsed);
961                }
962
963                if data_msgs.is_empty() {
964                    return None;
965                }
966                Some(NautilusWsMessage::Data(data_msgs))
967            }
968            BitmexAction::Delete => {
969                log::info!(
970                    "Received instrument delete action for {} instrument(s)",
971                    data.len()
972                );
973                None
974            }
975        }
976    }
977
978    fn handle_funding(
979        &self,
980        data: Vec<BitmexFundingMsg>,
981        ts_init: UnixNanos,
982    ) -> Option<NautilusWsMessage> {
983        if data.is_empty() {
984            return None;
985        }
986
987        let funding_updates: Vec<_> = data
988            .into_iter()
989            .map(|msg| parse_funding_msg(msg, ts_init))
990            .collect();
991
992        Some(NautilusWsMessage::FundingRateUpdates(funding_updates))
993    }
994
995    fn handle_subscription_message(
996        &self,
997        success: bool,
998        subscribe: Option<&String>,
999        request: Option<&BitmexHttpRequest>,
1000        error: Option<&str>,
1001    ) -> Option<NautilusWsMessage> {
1002        if let Some(req) = request {
1003            if req
1004                .op
1005                .eq_ignore_ascii_case(BitmexWsAuthAction::AuthKeyExpires.as_ref())
1006            {
1007                if success {
1008                    log::info!("WebSocket authenticated");
1009                    self.auth_tracker.succeed();
1010                    return Some(NautilusWsMessage::Authenticated);
1011                } else {
1012                    let reason = error.unwrap_or("Authentication rejected").to_string();
1013                    log::error!("WebSocket authentication failed: {reason}");
1014                    self.auth_tracker.fail(reason);
1015                }
1016                return None;
1017            }
1018
1019            if req
1020                .op
1021                .eq_ignore_ascii_case(BitmexWsOperation::Subscribe.as_ref())
1022            {
1023                self.handle_subscription_ack(success, request, subscribe, error);
1024                return None;
1025            }
1026
1027            if req
1028                .op
1029                .eq_ignore_ascii_case(BitmexWsOperation::Unsubscribe.as_ref())
1030            {
1031                self.handle_unsubscribe_ack(success, request, subscribe, error);
1032                return None;
1033            }
1034        }
1035
1036        if subscribe.is_some() {
1037            self.handle_subscription_ack(success, request, subscribe, error);
1038            return None;
1039        }
1040
1041        if let Some(error) = error {
1042            log::warn!("Unhandled subscription control message: success={success}, error={error}");
1043        }
1044
1045        None
1046    }
1047}
1048
1049fn is_terminal_order_status(status: OrderStatus) -> bool {
1050    matches!(
1051        status,
1052        OrderStatus::Canceled | OrderStatus::Expired | OrderStatus::Rejected | OrderStatus::Filled,
1053    )
1054}
1055
1056/// Returns `true` when a BitMEX error should be retried.
1057pub(crate) fn should_retry_bitmex_error(error: &BitmexWsError) -> bool {
1058    match error {
1059        BitmexWsError::TungsteniteError(_) => true, // Network errors are retryable
1060        BitmexWsError::ClientError(msg) => {
1061            // Retry on timeout and connection errors (case-insensitive)
1062            let msg_lower = msg.to_lowercase();
1063            msg_lower.contains("timeout")
1064                || msg_lower.contains("timed out")
1065                || msg_lower.contains("connection")
1066                || msg_lower.contains("network")
1067        }
1068        _ => false,
1069    }
1070}
1071
1072/// Creates a timeout error for BitMEX retry logic.
1073pub(crate) fn create_bitmex_timeout_error(msg: String) -> BitmexWsError {
1074    BitmexWsError::ClientError(msg)
1075}
1076
1077#[cfg(test)]
1078mod tests {
1079    use rstest::rstest;
1080
1081    use super::*;
1082
1083    #[rstest]
1084    fn test_is_heartbeat_message_detection() {
1085        assert!(FeedHandler::is_heartbeat_message("{\"op\":\"ping\"}"));
1086        assert!(FeedHandler::is_heartbeat_message("{\"op\":\"pong\"}"));
1087        assert!(!FeedHandler::is_heartbeat_message(
1088            "{\"op\":\"subscribe\",\"args\":[\"trade:XBTUSD\"]}"
1089        ));
1090    }
1091}