nautilus_bitmex/websocket/
handler.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! WebSocket message handler for BitMEX.
17
18use std::sync::{
19    Arc,
20    atomic::{AtomicBool, Ordering},
21};
22
23use ahash::AHashMap;
24use dashmap::DashMap;
25use nautilus_common::cache::quote::QuoteCache;
26use nautilus_core::{UnixNanos, time::get_atomic_clock_realtime};
27use nautilus_model::{
28    data::Data,
29    enums::{OrderStatus, OrderType},
30    identifiers::{AccountId, ClientOrderId},
31    instruments::{Instrument, InstrumentAny},
32    types::Price,
33};
34use nautilus_network::{
35    RECONNECTED,
36    retry::{RetryManager, create_websocket_retry_manager},
37    websocket::{AuthTracker, SubscriptionState, WebSocketClient},
38};
39use tokio_tungstenite::tungstenite::Message;
40use ustr::Ustr;
41
42use super::{
43    enums::{BitmexAction, BitmexWsAuthAction, BitmexWsOperation, BitmexWsTopic},
44    error::BitmexWsError,
45    messages::{
46        BitmexExecutionMsg, BitmexFundingMsg, BitmexHttpRequest, BitmexInstrumentMsg,
47        BitmexOrderBook10Msg, BitmexOrderBookMsg, BitmexPositionMsg, BitmexQuoteMsg,
48        BitmexTableMessage, BitmexTradeBinMsg, BitmexTradeMsg, BitmexWalletMsg, BitmexWsMessage,
49        NautilusWsMessage, OrderData,
50    },
51    parse::{
52        parse_book_msg_vec, parse_book10_msg_vec, parse_execution_msg, parse_funding_msg,
53        parse_instrument_msg, parse_order_msg, parse_order_update_msg, parse_position_msg,
54        parse_trade_bin_msg_vec, parse_trade_msg_vec, parse_wallet_msg,
55    },
56};
57use crate::{
58    common::{enums::BitmexExecType, parse::parse_contracts_quantity},
59    http::parse::{InstrumentParseResult, parse_instrument_any},
60};
61
62/// Commands sent from the outer client to the inner message handler.
63#[derive(Debug)]
64#[allow(
65    clippy::large_enum_variant,
66    reason = "Commands are ephemeral and immediately consumed"
67)]
68pub enum HandlerCommand {
69    /// Set the WebSocketClient for the handler to use.
70    SetClient(WebSocketClient),
71    /// Disconnect the WebSocket connection.
72    Disconnect,
73    /// Send authentication payload to the WebSocket.
74    Authenticate { payload: String },
75    /// Subscribe to the given topics.
76    Subscribe { topics: Vec<String> },
77    /// Unsubscribe from the given topics.
78    Unsubscribe { topics: Vec<String> },
79    /// Initialize the instruments cache with the given instruments.
80    InitializeInstruments(Vec<InstrumentAny>),
81    /// Update a single instrument in the cache.
82    UpdateInstrument(InstrumentAny),
83}
84
85pub(super) struct FeedHandler {
86    account_id: AccountId,
87    signal: Arc<AtomicBool>,
88    client: Option<WebSocketClient>,
89    cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
90    raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
91    out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
92    auth_tracker: AuthTracker,
93    subscriptions: SubscriptionState,
94    retry_manager: RetryManager<BitmexWsError>,
95    instruments_cache: AHashMap<Ustr, InstrumentAny>,
96    order_type_cache: Arc<DashMap<ClientOrderId, OrderType>>,
97    order_symbol_cache: Arc<DashMap<ClientOrderId, Ustr>>,
98    quote_cache: QuoteCache,
99}
100
101impl FeedHandler {
102    /// Creates a new [`FeedHandler`] instance.
103    #[allow(clippy::too_many_arguments)]
104    pub(super) fn new(
105        signal: Arc<AtomicBool>,
106        cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
107        raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
108        out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
109        account_id: AccountId,
110        auth_tracker: AuthTracker,
111        subscriptions: SubscriptionState,
112        order_type_cache: Arc<DashMap<ClientOrderId, OrderType>>,
113        order_symbol_cache: Arc<DashMap<ClientOrderId, Ustr>>,
114    ) -> Self {
115        Self {
116            account_id,
117            signal,
118            client: None,
119            cmd_rx,
120            raw_rx,
121            out_tx,
122            auth_tracker,
123            subscriptions,
124            retry_manager: create_websocket_retry_manager(),
125            instruments_cache: AHashMap::new(),
126            order_type_cache,
127            order_symbol_cache,
128            quote_cache: QuoteCache::new(),
129        }
130    }
131
132    pub(super) fn is_stopped(&self) -> bool {
133        self.signal.load(Ordering::Relaxed)
134    }
135
136    pub(super) fn send(&self, msg: NautilusWsMessage) -> Result<(), ()> {
137        self.out_tx.send(msg).map_err(|_| ())
138    }
139
140    /// Sends a WebSocket message with retry logic.
141    async fn send_with_retry(&self, payload: String) -> anyhow::Result<()> {
142        if let Some(client) = &self.client {
143            self.retry_manager
144                .execute_with_retry(
145                    "websocket_send",
146                    || {
147                        let payload = payload.clone();
148                        async move {
149                            client.send_text(payload, None).await.map_err(|e| {
150                                BitmexWsError::ClientError(format!("Send failed: {e}"))
151                            })
152                        }
153                    },
154                    should_retry_bitmex_error,
155                    create_bitmex_timeout_error,
156                )
157                .await
158                .map_err(|e| anyhow::anyhow!("{e}"))
159        } else {
160            Err(anyhow::anyhow!("No active WebSocket client"))
161        }
162    }
163
164    #[inline]
165    fn get_instrument(
166        cache: &AHashMap<Ustr, InstrumentAny>,
167        symbol: &Ustr,
168    ) -> Option<InstrumentAny> {
169        cache.get(symbol).cloned()
170    }
171
172    pub(super) async fn next(&mut self) -> Option<NautilusWsMessage> {
173        let clock = get_atomic_clock_realtime();
174
175        loop {
176            tokio::select! {
177                Some(cmd) = self.cmd_rx.recv() => {
178                    match cmd {
179                        HandlerCommand::SetClient(client) => {
180                            log::debug!("WebSocketClient received by handler");
181                            self.client = Some(client);
182                        }
183                        HandlerCommand::Disconnect => {
184                            log::debug!("Disconnect command received");
185                            if let Some(client) = self.client.take() {
186                                client.disconnect().await;
187                            }
188                        }
189                        HandlerCommand::Authenticate { payload } => {
190                            log::debug!("Authenticate command received");
191                            if let Err(e) = self.send_with_retry(payload).await {
192                                log::error!("Failed to send authentication after retries: {e}");
193                            }
194                        }
195                        HandlerCommand::Subscribe { topics } => {
196                            for topic in topics {
197                                log::debug!("Subscribing to topic: {topic}");
198                                if let Err(e) = self.send_with_retry(topic.clone()).await {
199                                    log::error!("Failed to send subscription after retries: topic={topic}, error={e}");
200                                }
201                            }
202                        }
203                        HandlerCommand::Unsubscribe { topics } => {
204                            for topic in topics {
205                                log::debug!("Unsubscribing from topic: {topic}");
206                                if let Err(e) = self.send_with_retry(topic.clone()).await {
207                                    log::error!("Failed to send unsubscription after retries: topic={topic}, error={e}");
208                                }
209                            }
210                        }
211                        HandlerCommand::InitializeInstruments(instruments) => {
212                            for inst in instruments {
213                                self.instruments_cache.insert(inst.symbol().inner(), inst);
214                            }
215                        }
216                        HandlerCommand::UpdateInstrument(inst) => {
217                            self.instruments_cache.insert(inst.symbol().inner(), inst);
218                        }
219                    }
220                    // Continue processing following command
221                    continue;
222                }
223
224                () = tokio::time::sleep(std::time::Duration::from_millis(100)) => {
225                    if self.signal.load(std::sync::atomic::Ordering::Relaxed) {
226                        log::debug!("Stop signal received during idle period");
227                        return None;
228                    }
229                    continue;
230                }
231
232                msg = self.raw_rx.recv() => {
233                    let msg = match msg {
234                        Some(msg) => msg,
235                        None => {
236                            log::debug!("WebSocket stream closed");
237                            return None;
238                        }
239                    };
240
241                    // Handle ping frames directly for minimal latency
242                    if let Message::Ping(data) = &msg {
243                        log::trace!("Received ping frame with {} bytes", data.len());
244                        if let Some(client) = &self.client
245                            && let Err(e) = client.send_pong(data.to_vec()).await
246                        {
247                            log::warn!("Failed to send pong frame: {e}");
248                        }
249                        continue;
250                    }
251
252                    let event = match Self::parse_raw_message(msg) {
253                        Some(event) => event,
254                        None => continue,
255                    };
256
257                    if self.signal.load(std::sync::atomic::Ordering::Relaxed) {
258                        log::debug!("Stop signal received");
259                        return None;
260                    }
261
262            match event {
263                BitmexWsMessage::Reconnected => {
264                    self.quote_cache.clear();
265                    return Some(NautilusWsMessage::Reconnected);
266                }
267                BitmexWsMessage::Subscription {
268                    success,
269                    subscribe,
270                    request,
271                    error,
272                } => {
273                    if let Some(msg) = self.handle_subscription_message(
274                        success,
275                        subscribe.as_ref(),
276                        request.as_ref(),
277                        error.as_deref(),
278                    ) {
279                        return Some(msg);
280                    }
281                    continue;
282                }
283                BitmexWsMessage::Table(table_msg) => {
284                    let ts_init = clock.get_time_ns();
285
286                    let msg = match table_msg {
287                        BitmexTableMessage::OrderBookL2 { action, data } => {
288                            self.handle_orderbook_l2(action, data, ts_init)
289                        }
290                        BitmexTableMessage::OrderBookL2_25 { action, data } => {
291                            self.handle_orderbook_l2(action, data, ts_init)
292                        }
293                        BitmexTableMessage::OrderBook10 { data, .. } => {
294                            self.handle_orderbook_10(data, ts_init)
295                        }
296                        BitmexTableMessage::Quote { data, .. } => {
297                            self.handle_quote(data, ts_init)
298                        }
299                        BitmexTableMessage::Trade { data, .. } => {
300                            self.handle_trade(data, ts_init)
301                        }
302                        BitmexTableMessage::TradeBin1m { action, data } => {
303                            self.handle_trade_bin(action, data, BitmexWsTopic::TradeBin1m, ts_init)
304                        }
305                        BitmexTableMessage::TradeBin5m { action, data } => {
306                            self.handle_trade_bin(action, data, BitmexWsTopic::TradeBin5m, ts_init)
307                        }
308                        BitmexTableMessage::TradeBin1h { action, data } => {
309                            self.handle_trade_bin(action, data, BitmexWsTopic::TradeBin1h, ts_init)
310                        }
311                        BitmexTableMessage::TradeBin1d { action, data } => {
312                            self.handle_trade_bin(action, data, BitmexWsTopic::TradeBin1d, ts_init)
313                        }
314                        // Execution messages
315                        // Note: BitMEX may send duplicate order status updates for the same order
316                        // (e.g., immediate response + stream update). This is expected behavior.
317                        BitmexTableMessage::Order { data, .. } => {
318                            self.handle_order(data)
319                        }
320                        BitmexTableMessage::Execution { data, .. } => {
321                            self.handle_execution(data)
322                        }
323                        BitmexTableMessage::Position { data, .. } => {
324                            self.handle_position(data)
325                        }
326                        BitmexTableMessage::Wallet { data, .. } => {
327                            self.handle_wallet(data, ts_init)
328                        }
329                        BitmexTableMessage::Margin { .. } => {
330                            // Skip margin messages - BitMEX uses account-level cross-margin
331                            // which doesn't map well to Nautilus's per-instrument margin model
332                            None
333                        }
334                        BitmexTableMessage::Instrument { action, data } => {
335                            self.handle_instrument(action, data, ts_init)
336                        }
337                        BitmexTableMessage::Funding { data, .. } => {
338                            self.handle_funding(data, ts_init)
339                        }
340                        _ => {
341                            // Other message types not yet implemented
342                            log::warn!("Unhandled table message type: {table_msg:?}");
343                            None
344                        }
345                    };
346
347                    if let Some(msg) = msg {
348                        return Some(msg);
349                    }
350                    continue;
351                }
352                BitmexWsMessage::Welcome { .. } | BitmexWsMessage::Error { .. } => continue,
353            }
354                }
355
356                // Handle shutdown - either channel closed or stream ended
357                else => {
358                    log::debug!("Handler shutting down: stream ended or command channel closed");
359                    return None;
360                }
361            }
362        }
363    }
364
365    fn parse_raw_message(msg: Message) -> Option<BitmexWsMessage> {
366        match msg {
367            Message::Text(text) => {
368                if text == RECONNECTED {
369                    log::info!("Received WebSocket reconnected signal");
370                    return Some(BitmexWsMessage::Reconnected);
371                }
372
373                log::trace!("Raw websocket message: {text}");
374
375                if Self::is_heartbeat_message(&text) {
376                    log::trace!("Ignoring heartbeat control message: {text}");
377                    return None;
378                }
379
380                match serde_json::from_str(&text) {
381                    Ok(msg) => match &msg {
382                        BitmexWsMessage::Welcome {
383                            version,
384                            heartbeat_enabled,
385                            limit,
386                            ..
387                        } => {
388                            log::info!(
389                                "Welcome to the BitMEX Realtime API: version={}, heartbeat={}, rate_limit={:?}",
390                                version,
391                                heartbeat_enabled,
392                                limit.remaining,
393                            );
394                        }
395                        BitmexWsMessage::Subscription { .. } => return Some(msg),
396                        BitmexWsMessage::Error { status, error, .. } => {
397                            log::error!(
398                                "Received error from BitMEX: status={status}, error={error}",
399                            );
400                        }
401                        _ => return Some(msg),
402                    },
403                    Err(e) => {
404                        log::error!("Failed to parse WebSocket message: {e}: {text}");
405                    }
406                }
407            }
408            Message::Binary(msg) => {
409                log::debug!("Raw binary: {msg:?}");
410            }
411            Message::Close(_) => {
412                log::debug!("Received close message, waiting for reconnection");
413            }
414            Message::Ping(data) => {
415                // Handled in select! loop before parse_raw_message
416                log::trace!("Ping frame with {} bytes (already handled)", data.len());
417            }
418            Message::Pong(data) => {
419                log::trace!("Received pong frame with {} bytes", data.len());
420            }
421            Message::Frame(frame) => {
422                log::debug!("Received raw frame: {frame:?}");
423            }
424        }
425
426        None
427    }
428
429    fn is_heartbeat_message(text: &str) -> bool {
430        let trimmed = text.trim();
431
432        if !trimmed.starts_with('{') || trimmed.len() > 64 {
433            return false;
434        }
435
436        trimmed.contains("\"op\":\"ping\"") || trimmed.contains("\"op\":\"pong\"")
437    }
438
439    fn handle_subscription_ack(
440        &self,
441        success: bool,
442        request: Option<&BitmexHttpRequest>,
443        subscribe: Option<&String>,
444        error: Option<&str>,
445    ) {
446        let topics = Self::topics_from_request(request, subscribe);
447
448        if topics.is_empty() {
449            log::debug!("Subscription acknowledgement without topics");
450            return;
451        }
452
453        for topic in topics {
454            if success {
455                self.subscriptions.confirm_subscribe(topic);
456                log::debug!("Subscription confirmed: topic={topic}");
457            } else {
458                self.subscriptions.mark_failure(topic);
459                let reason = error.unwrap_or("Subscription rejected");
460                log::error!("Subscription failed: topic={topic}, error={reason}");
461            }
462        }
463    }
464
465    fn handle_unsubscribe_ack(
466        &self,
467        success: bool,
468        request: Option<&BitmexHttpRequest>,
469        subscribe: Option<&String>,
470        error: Option<&str>,
471    ) {
472        let topics = Self::topics_from_request(request, subscribe);
473
474        if topics.is_empty() {
475            log::debug!("Unsubscription acknowledgement without topics");
476            return;
477        }
478
479        for topic in topics {
480            if success {
481                log::debug!("Unsubscription confirmed: topic={topic}");
482                self.subscriptions.confirm_unsubscribe(topic);
483            } else {
484                let reason = error.unwrap_or("Unsubscription rejected");
485                log::error!(
486                    "Unsubscription failed - restoring subscription: topic={topic}, error={reason}",
487                );
488                // Venue rejected unsubscribe, so we're still subscribed. Restore state:
489                self.subscriptions.confirm_unsubscribe(topic); // Clear pending_unsubscribe
490                self.subscriptions.mark_subscribe(topic); // Mark as subscribing
491                self.subscriptions.confirm_subscribe(topic); // Confirm subscription
492            }
493        }
494    }
495
496    fn topics_from_request<'a>(
497        request: Option<&'a BitmexHttpRequest>,
498        fallback: Option<&'a String>,
499    ) -> Vec<&'a str> {
500        if let Some(req) = request
501            && !req.args.is_empty()
502        {
503            return req.args.iter().filter_map(|arg| arg.as_str()).collect();
504        }
505
506        fallback.into_iter().map(|topic| topic.as_str()).collect()
507    }
508
509    fn handle_orderbook_l2(
510        &self,
511        action: BitmexAction,
512        data: Vec<BitmexOrderBookMsg>,
513        ts_init: UnixNanos,
514    ) -> Option<NautilusWsMessage> {
515        if data.is_empty() {
516            return None;
517        }
518        let data = parse_book_msg_vec(data, action, &self.instruments_cache, ts_init);
519        Some(NautilusWsMessage::Data(data))
520    }
521
522    fn handle_orderbook_10(
523        &self,
524        data: Vec<BitmexOrderBook10Msg>,
525        ts_init: UnixNanos,
526    ) -> Option<NautilusWsMessage> {
527        if data.is_empty() {
528            return None;
529        }
530        let data = parse_book10_msg_vec(data, &self.instruments_cache, ts_init);
531        Some(NautilusWsMessage::Data(data))
532    }
533
534    fn handle_quote(
535        &mut self,
536        mut data: Vec<BitmexQuoteMsg>,
537        ts_init: UnixNanos,
538    ) -> Option<NautilusWsMessage> {
539        // Index symbols may return empty quote data
540        if data.is_empty() {
541            return None;
542        }
543
544        let msg = data.remove(0);
545        let Some(instrument) = Self::get_instrument(&self.instruments_cache, &msg.symbol) else {
546            log::error!(
547                "Instrument cache miss: quote message dropped for symbol={}",
548                msg.symbol
549            );
550            return None;
551        };
552
553        let instrument_id = instrument.id();
554        let price_precision = instrument.price_precision();
555
556        let bid_price = msg.bid_price.map(|p| Price::new(p, price_precision));
557        let ask_price = msg.ask_price.map(|p| Price::new(p, price_precision));
558        let bid_size = msg
559            .bid_size
560            .map(|s| parse_contracts_quantity(s, &instrument));
561        let ask_size = msg
562            .ask_size
563            .map(|s| parse_contracts_quantity(s, &instrument));
564        let ts_event = UnixNanos::from(msg.timestamp);
565
566        match self.quote_cache.process(
567            instrument_id,
568            bid_price,
569            ask_price,
570            bid_size,
571            ask_size,
572            ts_event,
573            ts_init,
574        ) {
575            Ok(quote) => Some(NautilusWsMessage::Data(vec![Data::Quote(quote)])),
576            Err(e) => {
577                log::warn!("Failed to process quote: {e}");
578                None
579            }
580        }
581    }
582
583    fn handle_trade(
584        &self,
585        data: Vec<BitmexTradeMsg>,
586        ts_init: UnixNanos,
587    ) -> Option<NautilusWsMessage> {
588        if data.is_empty() {
589            return None;
590        }
591        let data = parse_trade_msg_vec(data, &self.instruments_cache, ts_init);
592        Some(NautilusWsMessage::Data(data))
593    }
594
595    fn handle_trade_bin(
596        &self,
597        action: BitmexAction,
598        data: Vec<BitmexTradeBinMsg>,
599        topic: BitmexWsTopic,
600        ts_init: UnixNanos,
601    ) -> Option<NautilusWsMessage> {
602        if action == BitmexAction::Partial || data.is_empty() {
603            return None;
604        }
605        let data = parse_trade_bin_msg_vec(data, topic, &self.instruments_cache, ts_init);
606        Some(NautilusWsMessage::Data(data))
607    }
608
609    fn handle_order(&mut self, data: Vec<OrderData>) -> Option<NautilusWsMessage> {
610        // Process all orders in the message
611        let mut reports = Vec::with_capacity(data.len());
612
613        for order_data in data {
614            match order_data {
615                OrderData::Full(order_msg) => {
616                    let Some(instrument) =
617                        Self::get_instrument(&self.instruments_cache, &order_msg.symbol)
618                    else {
619                        log::error!(
620                            "Instrument cache miss: order message dropped for symbol={}, order_id={}",
621                            order_msg.symbol,
622                            order_msg.order_id
623                        );
624                        continue;
625                    };
626
627                    match parse_order_msg(&order_msg, &instrument, &self.order_type_cache) {
628                        Ok(report) => {
629                            // Cache the order type and symbol AFTER successful parse
630                            if let Some(client_order_id) = &order_msg.cl_ord_id {
631                                let client_order_id = ClientOrderId::new(client_order_id);
632
633                                if let Some(ord_type) = &order_msg.ord_type {
634                                    let order_type: OrderType = (*ord_type).into();
635                                    self.order_type_cache.insert(client_order_id, order_type);
636                                }
637
638                                // Cache symbol for execution message routing
639                                self.order_symbol_cache
640                                    .insert(client_order_id, order_msg.symbol);
641                            }
642
643                            if is_terminal_order_status(report.order_status)
644                                && let Some(client_id) = report.client_order_id
645                            {
646                                self.order_type_cache.remove(&client_id);
647                                self.order_symbol_cache.remove(&client_id);
648                            }
649
650                            reports.push(report);
651                        }
652                        Err(e) => {
653                            log::error!(
654                                "Failed to parse full order message - potential data loss: \
655                                error={e}, symbol={}, order_id={}, time_in_force={:?}",
656                                order_msg.symbol,
657                                order_msg.order_id,
658                                order_msg.time_in_force,
659                            );
660                            // TODO: Add metric counter for parse failures
661                            continue;
662                        }
663                    }
664                }
665                OrderData::Update(msg) => {
666                    let Some(instrument) =
667                        Self::get_instrument(&self.instruments_cache, &msg.symbol)
668                    else {
669                        log::error!(
670                            "Instrument cache miss: order update dropped for symbol={}, order_id={}",
671                            msg.symbol,
672                            msg.order_id
673                        );
674                        continue;
675                    };
676
677                    // Populate cache for execution message routing (handles edge case where update arrives before full snapshot)
678                    if let Some(cl_ord_id) = &msg.cl_ord_id {
679                        let client_order_id = ClientOrderId::new(cl_ord_id);
680                        self.order_symbol_cache.insert(client_order_id, msg.symbol);
681                    }
682
683                    if let Some(event) = parse_order_update_msg(&msg, &instrument, self.account_id)
684                    {
685                        return Some(NautilusWsMessage::OrderUpdated(event));
686                    } else {
687                        log::warn!(
688                            "Skipped order update message (insufficient data): \
689                            order_id={}, price={:?}",
690                            msg.order_id,
691                            msg.price,
692                        );
693                    }
694                }
695            }
696        }
697
698        if reports.is_empty() {
699            return None;
700        }
701
702        Some(NautilusWsMessage::OrderStatusReports(reports))
703    }
704
705    fn handle_execution(&mut self, data: Vec<BitmexExecutionMsg>) -> Option<NautilusWsMessage> {
706        let mut fills = Vec::with_capacity(data.len());
707
708        for exec_msg in data {
709            // Try to get symbol, fall back to cache lookup if missing
710            let symbol_opt = if let Some(sym) = &exec_msg.symbol {
711                Some(*sym)
712            } else if let Some(cl_ord_id) = &exec_msg.cl_ord_id {
713                // Try to look up symbol from order_symbol_cache
714                let client_order_id = ClientOrderId::new(cl_ord_id);
715                self.order_symbol_cache
716                    .get(&client_order_id)
717                    .map(|r| *r.value())
718            } else {
719                None
720            };
721
722            let Some(symbol) = symbol_opt else {
723                // Symbol missing - log appropriately based on exec type and whether we had clOrdID
724                if let Some(cl_ord_id) = &exec_msg.cl_ord_id {
725                    if exec_msg.exec_type == Some(BitmexExecType::Trade) {
726                        log::warn!(
727                            "Execution message missing symbol and not found in cache: \
728                            cl_ord_id={cl_ord_id}, exec_id={:?}, ord_rej_reason={:?}, text={:?}",
729                            exec_msg.exec_id,
730                            exec_msg.ord_rej_reason,
731                            exec_msg.text,
732                        );
733                    } else {
734                        log::debug!(
735                            "Execution message missing symbol and not found in cache: \
736                            cl_ord_id={cl_ord_id}, exec_id={:?}, exec_type={:?}, \
737                            ord_rej_reason={:?}, text={:?}",
738                            exec_msg.exec_id,
739                            exec_msg.exec_type,
740                            exec_msg.ord_rej_reason,
741                            exec_msg.text,
742                        );
743                    }
744                } else {
745                    // CancelReject messages without symbol/clOrdID are expected when using
746                    // redundant cancel broadcasting - one cancel succeeds, others arrive late
747                    // and BitMEX responds with CancelReject but doesn't populate the fields
748                    if exec_msg.exec_type == Some(BitmexExecType::CancelReject) {
749                        log::debug!(
750                            "CancelReject message missing symbol/clOrdID (expected with redundant cancels): \
751                            exec_id={:?}, order_id={:?}",
752                            exec_msg.exec_id,
753                            exec_msg.order_id,
754                        );
755                    } else {
756                        log::warn!(
757                            "Execution message missing both symbol and clOrdID, cannot process: \
758                            exec_id={:?}, order_id={:?}, exec_type={:?}, \
759                            ord_rej_reason={:?}, text={:?}",
760                            exec_msg.exec_id,
761                            exec_msg.order_id,
762                            exec_msg.exec_type,
763                            exec_msg.ord_rej_reason,
764                            exec_msg.text,
765                        );
766                    }
767                }
768                continue;
769            };
770
771            let Some(instrument) = Self::get_instrument(&self.instruments_cache, &symbol) else {
772                log::error!(
773                    "Instrument cache miss: execution message dropped for symbol={}, exec_id={:?}, exec_type={:?}, Liquidation/ADL fills may be lost",
774                    symbol,
775                    exec_msg.exec_id,
776                    exec_msg.exec_type
777                );
778                continue;
779            };
780
781            if let Some(fill) = parse_execution_msg(exec_msg, &instrument) {
782                fills.push(fill);
783            }
784        }
785
786        if fills.is_empty() {
787            return None;
788        }
789        Some(NautilusWsMessage::FillReports(fills))
790    }
791
792    fn handle_position(&self, data: Vec<BitmexPositionMsg>) -> Option<NautilusWsMessage> {
793        if let Some(pos_msg) = data.into_iter().next() {
794            let Some(instrument) = Self::get_instrument(&self.instruments_cache, &pos_msg.symbol)
795            else {
796                log::error!(
797                    "Instrument cache miss: position message dropped for symbol={}, account={}",
798                    pos_msg.symbol,
799                    pos_msg.account
800                );
801                return None;
802            };
803            let report = parse_position_msg(pos_msg, &instrument);
804            Some(NautilusWsMessage::PositionStatusReport(report))
805        } else {
806            None
807        }
808    }
809
810    fn handle_wallet(
811        &self,
812        data: Vec<BitmexWalletMsg>,
813        ts_init: UnixNanos,
814    ) -> Option<NautilusWsMessage> {
815        if let Some(wallet_msg) = data.into_iter().next() {
816            let account_state = parse_wallet_msg(wallet_msg, ts_init);
817            Some(NautilusWsMessage::AccountState(account_state))
818        } else {
819            None
820        }
821    }
822
823    fn handle_instrument(
824        &mut self,
825        action: BitmexAction,
826        data: Vec<BitmexInstrumentMsg>,
827        ts_init: UnixNanos,
828    ) -> Option<NautilusWsMessage> {
829        match action {
830            BitmexAction::Partial | BitmexAction::Insert => {
831                let mut instruments = Vec::with_capacity(data.len());
832                let mut temp_cache = AHashMap::new();
833
834                let data_for_prices = data.clone();
835
836                for msg in data {
837                    match msg.try_into() {
838                        Ok(http_inst) => {
839                            match parse_instrument_any(&http_inst, ts_init) {
840                                InstrumentParseResult::Ok(boxed) => {
841                                    let instrument_any = *boxed;
842                                    let symbol = instrument_any.symbol().inner();
843                                    temp_cache.insert(symbol, instrument_any.clone());
844                                    instruments.push(instrument_any);
845                                }
846                                InstrumentParseResult::Unsupported { .. }
847                                | InstrumentParseResult::Inactive { .. } => {
848                                    // Silently skip unsupported or inactive instruments
849                                }
850                                InstrumentParseResult::Failed {
851                                    symbol,
852                                    instrument_type,
853                                    error,
854                                } => {
855                                    log::warn!(
856                                        "Failed to parse instrument {symbol} ({instrument_type:?}): {error}"
857                                    );
858                                }
859                            }
860                        }
861                        Err(e) => {
862                            log::debug!("Skipping instrument (missing required fields): {e}");
863                        }
864                    }
865                }
866
867                // Update instruments_cache with new instruments
868                for (symbol, instrument) in &temp_cache {
869                    self.instruments_cache.insert(*symbol, instrument.clone());
870                }
871
872                if !instruments.is_empty()
873                    && let Err(e) = self
874                        .out_tx
875                        .send(NautilusWsMessage::Instruments(instruments))
876                {
877                    log::error!("Error sending instruments: {e}");
878                }
879
880                let mut data_msgs = Vec::with_capacity(data_for_prices.len());
881
882                for msg in data_for_prices {
883                    let parsed = parse_instrument_msg(msg, &temp_cache, ts_init);
884                    data_msgs.extend(parsed);
885                }
886
887                if data_msgs.is_empty() {
888                    return None;
889                }
890                Some(NautilusWsMessage::Data(data_msgs))
891            }
892            BitmexAction::Update => {
893                let mut data_msgs = Vec::with_capacity(data.len());
894
895                for msg in data {
896                    let parsed = parse_instrument_msg(msg, &self.instruments_cache, ts_init);
897                    data_msgs.extend(parsed);
898                }
899
900                if data_msgs.is_empty() {
901                    return None;
902                }
903                Some(NautilusWsMessage::Data(data_msgs))
904            }
905            BitmexAction::Delete => {
906                log::info!(
907                    "Received instrument delete action for {} instrument(s)",
908                    data.len()
909                );
910                None
911            }
912        }
913    }
914
915    fn handle_funding(
916        &self,
917        data: Vec<BitmexFundingMsg>,
918        ts_init: UnixNanos,
919    ) -> Option<NautilusWsMessage> {
920        let mut funding_updates = Vec::with_capacity(data.len());
921
922        for msg in data {
923            if let Some(parsed) = parse_funding_msg(msg, ts_init) {
924                funding_updates.push(parsed);
925            }
926        }
927
928        if funding_updates.is_empty() {
929            None
930        } else {
931            Some(NautilusWsMessage::FundingRateUpdates(funding_updates))
932        }
933    }
934
935    fn handle_subscription_message(
936        &self,
937        success: bool,
938        subscribe: Option<&String>,
939        request: Option<&BitmexHttpRequest>,
940        error: Option<&str>,
941    ) -> Option<NautilusWsMessage> {
942        if let Some(req) = request {
943            if req
944                .op
945                .eq_ignore_ascii_case(BitmexWsAuthAction::AuthKeyExpires.as_ref())
946            {
947                if success {
948                    log::info!("WebSocket authenticated");
949                    self.auth_tracker.succeed();
950                    return Some(NautilusWsMessage::Authenticated);
951                } else {
952                    let reason = error.unwrap_or("Authentication rejected").to_string();
953                    log::error!("WebSocket authentication failed: {reason}");
954                    self.auth_tracker.fail(reason);
955                }
956                return None;
957            }
958
959            if req
960                .op
961                .eq_ignore_ascii_case(BitmexWsOperation::Subscribe.as_ref())
962            {
963                self.handle_subscription_ack(success, request, subscribe, error);
964                return None;
965            }
966
967            if req
968                .op
969                .eq_ignore_ascii_case(BitmexWsOperation::Unsubscribe.as_ref())
970            {
971                self.handle_unsubscribe_ack(success, request, subscribe, error);
972                return None;
973            }
974        }
975
976        if subscribe.is_some() {
977            self.handle_subscription_ack(success, request, subscribe, error);
978            return None;
979        }
980
981        if let Some(error) = error {
982            log::warn!("Unhandled subscription control message: success={success}, error={error}");
983        }
984
985        None
986    }
987}
988
989fn is_terminal_order_status(status: OrderStatus) -> bool {
990    matches!(
991        status,
992        OrderStatus::Canceled | OrderStatus::Expired | OrderStatus::Rejected | OrderStatus::Filled,
993    )
994}
995
996/// Returns `true` when a BitMEX error should be retried.
997pub(crate) fn should_retry_bitmex_error(error: &BitmexWsError) -> bool {
998    match error {
999        BitmexWsError::TungsteniteError(_) => true, // Network errors are retryable
1000        BitmexWsError::ClientError(msg) => {
1001            // Retry on timeout and connection errors (case-insensitive)
1002            let msg_lower = msg.to_lowercase();
1003            msg_lower.contains("timeout")
1004                || msg_lower.contains("timed out")
1005                || msg_lower.contains("connection")
1006                || msg_lower.contains("network")
1007        }
1008        _ => false,
1009    }
1010}
1011
1012/// Creates a timeout error for BitMEX retry logic.
1013pub(crate) fn create_bitmex_timeout_error(msg: String) -> BitmexWsError {
1014    BitmexWsError::ClientError(msg)
1015}
1016
1017#[cfg(test)]
1018mod tests {
1019    use rstest::rstest;
1020
1021    use super::*;
1022
1023    #[rstest]
1024    fn test_is_heartbeat_message_detection() {
1025        assert!(FeedHandler::is_heartbeat_message("{\"op\":\"ping\"}"));
1026        assert!(FeedHandler::is_heartbeat_message("{\"op\":\"pong\"}"));
1027        assert!(!FeedHandler::is_heartbeat_message(
1028            "{\"op\":\"subscribe\",\"args\":[\"trade:XBTUSD\"]}"
1029        ));
1030    }
1031}