nautilus_bitmex/websocket/
handler.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! WebSocket message handler for BitMEX.
17
18use std::sync::{
19    Arc,
20    atomic::{AtomicBool, Ordering},
21};
22
23use ahash::AHashMap;
24use dashmap::DashMap;
25use nautilus_common::cache::quote::QuoteCache;
26use nautilus_core::{UnixNanos, time::get_atomic_clock_realtime};
27use nautilus_model::{
28    data::Data,
29    enums::{OrderStatus, OrderType},
30    identifiers::{AccountId, ClientOrderId},
31    instruments::{Instrument, InstrumentAny},
32    types::Price,
33};
34use nautilus_network::{
35    RECONNECTED,
36    retry::{RetryManager, create_websocket_retry_manager},
37    websocket::{AuthTracker, SubscriptionState, WebSocketClient},
38};
39use tokio_tungstenite::tungstenite::Message;
40use ustr::Ustr;
41
42use super::{
43    enums::{BitmexAction, BitmexWsAuthAction, BitmexWsOperation, BitmexWsTopic},
44    error::BitmexWsError,
45    messages::{
46        BitmexExecutionMsg, BitmexFundingMsg, BitmexHttpRequest, BitmexInstrumentMsg,
47        BitmexOrderBook10Msg, BitmexOrderBookMsg, BitmexPositionMsg, BitmexQuoteMsg,
48        BitmexTableMessage, BitmexTradeBinMsg, BitmexTradeMsg, BitmexWalletMsg, BitmexWsMessage,
49        NautilusWsMessage, OrderData,
50    },
51    parse::{
52        parse_book_msg_vec, parse_book10_msg_vec, parse_execution_msg, parse_funding_msg,
53        parse_instrument_msg, parse_order_msg, parse_order_update_msg, parse_position_msg,
54        parse_trade_bin_msg_vec, parse_trade_msg_vec, parse_wallet_msg,
55    },
56};
57use crate::{
58    common::{enums::BitmexExecType, parse::parse_contracts_quantity},
59    http::parse::{InstrumentParseResult, parse_instrument_any},
60};
61
62/// Commands sent from the outer client to the inner message handler.
63#[derive(Debug)]
64#[allow(
65    clippy::large_enum_variant,
66    reason = "Commands are ephemeral and immediately consumed"
67)]
68pub enum HandlerCommand {
69    /// Set the WebSocketClient for the handler to use.
70    SetClient(WebSocketClient),
71    /// Disconnect the WebSocket connection.
72    Disconnect,
73    /// Send authentication payload to the WebSocket.
74    Authenticate { payload: String },
75    /// Subscribe to the given topics.
76    Subscribe { topics: Vec<String> },
77    /// Unsubscribe from the given topics.
78    Unsubscribe { topics: Vec<String> },
79    /// Initialize the instruments cache with the given instruments.
80    InitializeInstruments(Vec<InstrumentAny>),
81    /// Update a single instrument in the cache.
82    UpdateInstrument(InstrumentAny),
83}
84
85pub(super) struct FeedHandler {
86    account_id: AccountId,
87    signal: Arc<AtomicBool>,
88    client: Option<WebSocketClient>,
89    cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
90    raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
91    out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
92    auth_tracker: AuthTracker,
93    subscriptions: SubscriptionState,
94    retry_manager: RetryManager<BitmexWsError>,
95    instruments_cache: AHashMap<Ustr, InstrumentAny>,
96    order_type_cache: Arc<DashMap<ClientOrderId, OrderType>>,
97    order_symbol_cache: Arc<DashMap<ClientOrderId, Ustr>>,
98    quote_cache: QuoteCache,
99}
100
101impl FeedHandler {
102    /// Creates a new [`FeedHandler`] instance.
103    #[allow(clippy::too_many_arguments)]
104    pub(super) fn new(
105        signal: Arc<AtomicBool>,
106        cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
107        raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
108        out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
109        account_id: AccountId,
110        auth_tracker: AuthTracker,
111        subscriptions: SubscriptionState,
112        order_type_cache: Arc<DashMap<ClientOrderId, OrderType>>,
113        order_symbol_cache: Arc<DashMap<ClientOrderId, Ustr>>,
114    ) -> Self {
115        Self {
116            account_id,
117            signal,
118            client: None,
119            cmd_rx,
120            raw_rx,
121            out_tx,
122            auth_tracker,
123            subscriptions,
124            retry_manager: create_websocket_retry_manager(),
125            instruments_cache: AHashMap::new(),
126            order_type_cache,
127            order_symbol_cache,
128            quote_cache: QuoteCache::new(),
129        }
130    }
131
132    pub(super) fn is_stopped(&self) -> bool {
133        self.signal.load(Ordering::Relaxed)
134    }
135
136    pub(super) fn send(&self, msg: NautilusWsMessage) -> Result<(), ()> {
137        self.out_tx.send(msg).map_err(|_| ())
138    }
139
140    /// Sends a WebSocket message with retry logic.
141    async fn send_with_retry(&self, payload: String) -> anyhow::Result<()> {
142        if let Some(client) = &self.client {
143            self.retry_manager
144                .execute_with_retry(
145                    "websocket_send",
146                    || {
147                        let payload = payload.clone();
148                        async move {
149                            client.send_text(payload, None).await.map_err(|e| {
150                                BitmexWsError::ClientError(format!("Send failed: {e}"))
151                            })
152                        }
153                    },
154                    should_retry_bitmex_error,
155                    create_bitmex_timeout_error,
156                )
157                .await
158                .map_err(|e| anyhow::anyhow!("{e}"))
159        } else {
160            Err(anyhow::anyhow!("No active WebSocket client"))
161        }
162    }
163
164    #[inline]
165    fn get_instrument(
166        cache: &AHashMap<Ustr, InstrumentAny>,
167        symbol: &Ustr,
168    ) -> Option<InstrumentAny> {
169        cache.get(symbol).cloned()
170    }
171
172    pub(super) async fn next(&mut self) -> Option<NautilusWsMessage> {
173        let clock = get_atomic_clock_realtime();
174
175        loop {
176            tokio::select! {
177                Some(cmd) = self.cmd_rx.recv() => {
178                    match cmd {
179                        HandlerCommand::SetClient(client) => {
180                            tracing::debug!("WebSocketClient received by handler");
181                            self.client = Some(client);
182                        }
183                        HandlerCommand::Disconnect => {
184                            tracing::debug!("Disconnect command received");
185                            if let Some(client) = self.client.take() {
186                                client.disconnect().await;
187                            }
188                        }
189                        HandlerCommand::Authenticate { payload } => {
190                            tracing::debug!("Authenticate command received");
191                            if let Err(e) = self.send_with_retry(payload).await {
192                                tracing::error!(error = %e, "Failed to send authentication after retries");
193                            }
194                        }
195                        HandlerCommand::Subscribe { topics } => {
196                            for topic in topics {
197                                tracing::debug!(topic = %topic, "Subscribing to topic");
198                                if let Err(e) = self.send_with_retry(topic.clone()).await {
199                                    tracing::error!(topic = %topic, error = %e, "Failed to send subscription after retries");
200                                }
201                            }
202                        }
203                        HandlerCommand::Unsubscribe { topics } => {
204                            for topic in topics {
205                                tracing::debug!(topic = %topic, "Unsubscribing from topic");
206                                if let Err(e) = self.send_with_retry(topic.clone()).await {
207                                    tracing::error!(topic = %topic, error = %e, "Failed to send unsubscription after retries");
208                                }
209                            }
210                        }
211                        HandlerCommand::InitializeInstruments(instruments) => {
212                            for inst in instruments {
213                                self.instruments_cache.insert(inst.symbol().inner(), inst);
214                            }
215                        }
216                        HandlerCommand::UpdateInstrument(inst) => {
217                            self.instruments_cache.insert(inst.symbol().inner(), inst);
218                        }
219                    }
220                    // Continue processing following command
221                    continue;
222                }
223
224                _ = tokio::time::sleep(std::time::Duration::from_millis(100)) => {
225                    if self.signal.load(std::sync::atomic::Ordering::Relaxed) {
226                        tracing::debug!("Stop signal received during idle period");
227                        return None;
228                    }
229                    continue;
230                }
231
232                msg = self.raw_rx.recv() => {
233                    let msg = match msg {
234                        Some(msg) => msg,
235                        None => {
236                            tracing::debug!("WebSocket stream closed");
237                            return None;
238                        }
239                    };
240
241                    // Handle ping frames directly for minimal latency
242                    if let Message::Ping(data) = &msg {
243                        tracing::trace!("Received ping frame with {} bytes", data.len());
244                        if let Some(client) = &self.client
245                            && let Err(e) = client.send_pong(data.to_vec()).await
246                        {
247                            tracing::warn!(error = %e, "Failed to send pong frame");
248                        }
249                        continue;
250                    }
251
252                    let event = match Self::parse_raw_message(msg) {
253                        Some(event) => event,
254                        None => continue,
255                    };
256
257                    if self.signal.load(std::sync::atomic::Ordering::Relaxed) {
258                        tracing::debug!("Stop signal received");
259                        return None;
260                    }
261
262            match event {
263                BitmexWsMessage::Reconnected => {
264                    self.quote_cache.clear();
265                    return Some(NautilusWsMessage::Reconnected);
266                }
267                BitmexWsMessage::Subscription {
268                    success,
269                    subscribe,
270                    request,
271                    error,
272                } => {
273                    if let Some(msg) = self.handle_subscription_message(
274                        success,
275                        subscribe.as_ref(),
276                        request.as_ref(),
277                        error.as_deref(),
278                    ) {
279                        return Some(msg);
280                    }
281                    continue;
282                }
283                BitmexWsMessage::Table(table_msg) => {
284                    let ts_init = clock.get_time_ns();
285
286                    let msg = match table_msg {
287                        BitmexTableMessage::OrderBookL2 { action, data } => {
288                            self.handle_orderbook_l2(action, data, ts_init)
289                        }
290                        BitmexTableMessage::OrderBookL2_25 { action, data } => {
291                            self.handle_orderbook_l2(action, data, ts_init)
292                        }
293                        BitmexTableMessage::OrderBook10 { data, .. } => {
294                            self.handle_orderbook_10(data, ts_init)
295                        }
296                        BitmexTableMessage::Quote { data, .. } => {
297                            self.handle_quote(data, ts_init)
298                        }
299                        BitmexTableMessage::Trade { data, .. } => {
300                            self.handle_trade(data, ts_init)
301                        }
302                        BitmexTableMessage::TradeBin1m { action, data } => {
303                            self.handle_trade_bin(action, data, BitmexWsTopic::TradeBin1m, ts_init)
304                        }
305                        BitmexTableMessage::TradeBin5m { action, data } => {
306                            self.handle_trade_bin(action, data, BitmexWsTopic::TradeBin5m, ts_init)
307                        }
308                        BitmexTableMessage::TradeBin1h { action, data } => {
309                            self.handle_trade_bin(action, data, BitmexWsTopic::TradeBin1h, ts_init)
310                        }
311                        BitmexTableMessage::TradeBin1d { action, data } => {
312                            self.handle_trade_bin(action, data, BitmexWsTopic::TradeBin1d, ts_init)
313                        }
314                        // Execution messages
315                        // Note: BitMEX may send duplicate order status updates for the same order
316                        // (e.g., immediate response + stream update). This is expected behavior.
317                        BitmexTableMessage::Order { data, .. } => {
318                            self.handle_order(data)
319                        }
320                        BitmexTableMessage::Execution { data, .. } => {
321                            self.handle_execution(data)
322                        }
323                        BitmexTableMessage::Position { data, .. } => {
324                            self.handle_position(data)
325                        }
326                        BitmexTableMessage::Wallet { data, .. } => {
327                            self.handle_wallet(data, ts_init)
328                        }
329                        BitmexTableMessage::Margin { .. } => {
330                            // Skip margin messages - BitMEX uses account-level cross-margin
331                            // which doesn't map well to Nautilus's per-instrument margin model
332                            None
333                        }
334                        BitmexTableMessage::Instrument { action, data } => {
335                            self.handle_instrument(action, data, ts_init)
336                        }
337                        BitmexTableMessage::Funding { data, .. } => {
338                            self.handle_funding(data, ts_init)
339                        }
340                        _ => {
341                            // Other message types not yet implemented
342                            tracing::warn!("Unhandled table message type: {table_msg:?}");
343                            None
344                        }
345                    };
346
347                    if let Some(msg) = msg {
348                        return Some(msg);
349                    }
350                    continue;
351                }
352                BitmexWsMessage::Welcome { .. } | BitmexWsMessage::Error { .. } => continue,
353            }
354                }
355
356                // Handle shutdown - either channel closed or stream ended
357                else => {
358                    tracing::debug!("Handler shutting down: stream ended or command channel closed");
359                    return None;
360                }
361            }
362        }
363    }
364
365    fn parse_raw_message(msg: Message) -> Option<BitmexWsMessage> {
366        match msg {
367            Message::Text(text) => {
368                if text == RECONNECTED {
369                    tracing::info!("Received WebSocket reconnected signal");
370                    return Some(BitmexWsMessage::Reconnected);
371                }
372
373                tracing::trace!("Raw websocket message: {text}");
374
375                if Self::is_heartbeat_message(&text) {
376                    tracing::trace!("Ignoring heartbeat control message: {text}");
377                    return None;
378                }
379
380                match serde_json::from_str(&text) {
381                    Ok(msg) => match &msg {
382                        BitmexWsMessage::Welcome {
383                            version,
384                            heartbeat_enabled,
385                            limit,
386                            ..
387                        } => {
388                            tracing::info!(
389                                version = version,
390                                heartbeat = heartbeat_enabled,
391                                rate_limit = ?limit.remaining,
392                                "Welcome to the BitMEX Realtime API:",
393                            );
394                        }
395                        BitmexWsMessage::Subscription { .. } => return Some(msg),
396                        BitmexWsMessage::Error { status, error, .. } => {
397                            tracing::error!(
398                                status = status,
399                                error = error,
400                                "Received error from BitMEX"
401                            );
402                        }
403                        _ => return Some(msg),
404                    },
405                    Err(e) => {
406                        tracing::error!("Failed to parse WebSocket message: {e}: {text}");
407                    }
408                }
409            }
410            Message::Binary(msg) => {
411                tracing::debug!("Raw binary: {msg:?}");
412            }
413            Message::Close(_) => {
414                tracing::debug!("Received close message, waiting for reconnection");
415            }
416            Message::Ping(data) => {
417                // Handled in select! loop before parse_raw_message
418                tracing::trace!("Ping frame with {} bytes (already handled)", data.len());
419            }
420            Message::Pong(data) => {
421                tracing::trace!("Received pong frame with {} bytes", data.len());
422            }
423            Message::Frame(frame) => {
424                tracing::debug!("Received raw frame: {frame:?}");
425            }
426        }
427
428        None
429    }
430
431    fn is_heartbeat_message(text: &str) -> bool {
432        let trimmed = text.trim();
433
434        if !trimmed.starts_with('{') || trimmed.len() > 64 {
435            return false;
436        }
437
438        trimmed.contains("\"op\":\"ping\"") || trimmed.contains("\"op\":\"pong\"")
439    }
440
441    fn handle_subscription_ack(
442        &self,
443        success: bool,
444        request: Option<&BitmexHttpRequest>,
445        subscribe: Option<&String>,
446        error: Option<&str>,
447    ) {
448        let topics = Self::topics_from_request(request, subscribe);
449
450        if topics.is_empty() {
451            tracing::debug!("Subscription acknowledgement without topics");
452            return;
453        }
454
455        for topic in topics {
456            if success {
457                self.subscriptions.confirm_subscribe(topic);
458                tracing::debug!(topic = topic, "Subscription confirmed");
459            } else {
460                self.subscriptions.mark_failure(topic);
461                let reason = error.unwrap_or("Subscription rejected");
462                tracing::error!(topic = topic, error = reason, "Subscription failed");
463            }
464        }
465    }
466
467    fn handle_unsubscribe_ack(
468        &self,
469        success: bool,
470        request: Option<&BitmexHttpRequest>,
471        subscribe: Option<&String>,
472        error: Option<&str>,
473    ) {
474        let topics = Self::topics_from_request(request, subscribe);
475
476        if topics.is_empty() {
477            tracing::debug!("Unsubscription acknowledgement without topics");
478            return;
479        }
480
481        for topic in topics {
482            if success {
483                tracing::debug!(topic = topic, "Unsubscription confirmed");
484                self.subscriptions.confirm_unsubscribe(topic);
485            } else {
486                let reason = error.unwrap_or("Unsubscription rejected");
487                tracing::error!(
488                    topic = topic,
489                    error = reason,
490                    "Unsubscription failed - restoring subscription"
491                );
492                // Venue rejected unsubscribe, so we're still subscribed. Restore state:
493                self.subscriptions.confirm_unsubscribe(topic); // Clear pending_unsubscribe
494                self.subscriptions.mark_subscribe(topic); // Mark as subscribing
495                self.subscriptions.confirm_subscribe(topic); // Confirm subscription
496            }
497        }
498    }
499
500    fn topics_from_request<'a>(
501        request: Option<&'a BitmexHttpRequest>,
502        fallback: Option<&'a String>,
503    ) -> Vec<&'a str> {
504        if let Some(req) = request
505            && !req.args.is_empty()
506        {
507            return req.args.iter().filter_map(|arg| arg.as_str()).collect();
508        }
509
510        fallback.into_iter().map(|topic| topic.as_str()).collect()
511    }
512
513    fn handle_orderbook_l2(
514        &self,
515        action: BitmexAction,
516        data: Vec<BitmexOrderBookMsg>,
517        ts_init: UnixNanos,
518    ) -> Option<NautilusWsMessage> {
519        if data.is_empty() {
520            return None;
521        }
522        let data = parse_book_msg_vec(data, action, &self.instruments_cache, ts_init);
523        Some(NautilusWsMessage::Data(data))
524    }
525
526    fn handle_orderbook_10(
527        &self,
528        data: Vec<BitmexOrderBook10Msg>,
529        ts_init: UnixNanos,
530    ) -> Option<NautilusWsMessage> {
531        if data.is_empty() {
532            return None;
533        }
534        let data = parse_book10_msg_vec(data, &self.instruments_cache, ts_init);
535        Some(NautilusWsMessage::Data(data))
536    }
537
538    fn handle_quote(
539        &mut self,
540        mut data: Vec<BitmexQuoteMsg>,
541        ts_init: UnixNanos,
542    ) -> Option<NautilusWsMessage> {
543        // Index symbols may return empty quote data
544        if data.is_empty() {
545            return None;
546        }
547
548        let msg = data.remove(0);
549        let Some(instrument) = Self::get_instrument(&self.instruments_cache, &msg.symbol) else {
550            tracing::error!(
551                "Instrument cache miss: quote message dropped for symbol={}",
552                msg.symbol
553            );
554            return None;
555        };
556
557        let instrument_id = instrument.id();
558        let price_precision = instrument.price_precision();
559
560        let bid_price = msg.bid_price.map(|p| Price::new(p, price_precision));
561        let ask_price = msg.ask_price.map(|p| Price::new(p, price_precision));
562        let bid_size = msg
563            .bid_size
564            .map(|s| parse_contracts_quantity(s, &instrument));
565        let ask_size = msg
566            .ask_size
567            .map(|s| parse_contracts_quantity(s, &instrument));
568        let ts_event = UnixNanos::from(msg.timestamp);
569
570        match self.quote_cache.process(
571            instrument_id,
572            bid_price,
573            ask_price,
574            bid_size,
575            ask_size,
576            ts_event,
577            ts_init,
578        ) {
579            Ok(quote) => Some(NautilusWsMessage::Data(vec![Data::Quote(quote)])),
580            Err(e) => {
581                tracing::warn!(error = %e, "Failed to process quote");
582                None
583            }
584        }
585    }
586
587    fn handle_trade(
588        &self,
589        data: Vec<BitmexTradeMsg>,
590        ts_init: UnixNanos,
591    ) -> Option<NautilusWsMessage> {
592        if data.is_empty() {
593            return None;
594        }
595        let data = parse_trade_msg_vec(data, &self.instruments_cache, ts_init);
596        Some(NautilusWsMessage::Data(data))
597    }
598
599    fn handle_trade_bin(
600        &self,
601        action: BitmexAction,
602        data: Vec<BitmexTradeBinMsg>,
603        topic: BitmexWsTopic,
604        ts_init: UnixNanos,
605    ) -> Option<NautilusWsMessage> {
606        if action == BitmexAction::Partial || data.is_empty() {
607            return None;
608        }
609        let data = parse_trade_bin_msg_vec(data, topic, &self.instruments_cache, ts_init);
610        Some(NautilusWsMessage::Data(data))
611    }
612
613    fn handle_order(&mut self, data: Vec<OrderData>) -> Option<NautilusWsMessage> {
614        // Process all orders in the message
615        let mut reports = Vec::with_capacity(data.len());
616
617        for order_data in data {
618            match order_data {
619                OrderData::Full(order_msg) => {
620                    let Some(instrument) =
621                        Self::get_instrument(&self.instruments_cache, &order_msg.symbol)
622                    else {
623                        tracing::error!(
624                            "Instrument cache miss: order message dropped for symbol={}, order_id={}",
625                            order_msg.symbol,
626                            order_msg.order_id
627                        );
628                        continue;
629                    };
630
631                    match parse_order_msg(&order_msg, &instrument, &self.order_type_cache) {
632                        Ok(report) => {
633                            // Cache the order type and symbol AFTER successful parse
634                            if let Some(client_order_id) = &order_msg.cl_ord_id {
635                                let client_order_id = ClientOrderId::new(client_order_id);
636
637                                if let Some(ord_type) = &order_msg.ord_type {
638                                    let order_type: OrderType = (*ord_type).into();
639                                    self.order_type_cache.insert(client_order_id, order_type);
640                                }
641
642                                // Cache symbol for execution message routing
643                                self.order_symbol_cache
644                                    .insert(client_order_id, order_msg.symbol);
645                            }
646
647                            if is_terminal_order_status(report.order_status)
648                                && let Some(client_id) = report.client_order_id
649                            {
650                                self.order_type_cache.remove(&client_id);
651                                self.order_symbol_cache.remove(&client_id);
652                            }
653
654                            reports.push(report);
655                        }
656                        Err(e) => {
657                            tracing::error!(
658                                error = %e,
659                                symbol = %order_msg.symbol,
660                                order_id = %order_msg.order_id,
661                                time_in_force = ?order_msg.time_in_force,
662                                "Failed to parse full order message - potential data loss"
663                            );
664                            // TODO: Add metric counter for parse failures
665                            continue;
666                        }
667                    }
668                }
669                OrderData::Update(msg) => {
670                    let Some(instrument) =
671                        Self::get_instrument(&self.instruments_cache, &msg.symbol)
672                    else {
673                        tracing::error!(
674                            "Instrument cache miss: order update dropped for symbol={}, order_id={}",
675                            msg.symbol,
676                            msg.order_id
677                        );
678                        continue;
679                    };
680
681                    // Populate cache for execution message routing (handles edge case where update arrives before full snapshot)
682                    if let Some(cl_ord_id) = &msg.cl_ord_id {
683                        let client_order_id = ClientOrderId::new(cl_ord_id);
684                        self.order_symbol_cache.insert(client_order_id, msg.symbol);
685                    }
686
687                    if let Some(event) = parse_order_update_msg(&msg, &instrument, self.account_id)
688                    {
689                        return Some(NautilusWsMessage::OrderUpdated(event));
690                    } else {
691                        tracing::warn!(
692                            order_id = %msg.order_id,
693                            price = ?msg.price,
694                            "Skipped order update message (insufficient data)"
695                        );
696                    }
697                }
698            }
699        }
700
701        if reports.is_empty() {
702            return None;
703        }
704
705        Some(NautilusWsMessage::OrderStatusReports(reports))
706    }
707
708    fn handle_execution(&mut self, data: Vec<BitmexExecutionMsg>) -> Option<NautilusWsMessage> {
709        let mut fills = Vec::with_capacity(data.len());
710
711        for exec_msg in data {
712            // Try to get symbol, fall back to cache lookup if missing
713            let symbol_opt = if let Some(sym) = &exec_msg.symbol {
714                Some(*sym)
715            } else if let Some(cl_ord_id) = &exec_msg.cl_ord_id {
716                // Try to look up symbol from order_symbol_cache
717                let client_order_id = ClientOrderId::new(cl_ord_id);
718                self.order_symbol_cache
719                    .get(&client_order_id)
720                    .map(|r| *r.value())
721            } else {
722                None
723            };
724
725            let Some(symbol) = symbol_opt else {
726                // Symbol missing - log appropriately based on exec type and whether we had clOrdID
727                if let Some(cl_ord_id) = &exec_msg.cl_ord_id {
728                    if exec_msg.exec_type == Some(BitmexExecType::Trade) {
729                        tracing::warn!(
730                            cl_ord_id = %cl_ord_id,
731                            exec_id = ?exec_msg.exec_id,
732                            ord_rej_reason = ?exec_msg.ord_rej_reason,
733                            text = ?exec_msg.text,
734                            "Execution message missing symbol and not found in cache"
735                        );
736                    } else {
737                        tracing::debug!(
738                            cl_ord_id = %cl_ord_id,
739                            exec_id = ?exec_msg.exec_id,
740                            exec_type = ?exec_msg.exec_type,
741                            ord_rej_reason = ?exec_msg.ord_rej_reason,
742                            text = ?exec_msg.text,
743                            "Execution message missing symbol and not found in cache"
744                        );
745                    }
746                } else {
747                    // CancelReject messages without symbol/clOrdID are expected when using
748                    // redundant cancel broadcasting - one cancel succeeds, others arrive late
749                    // and BitMEX responds with CancelReject but doesn't populate the fields
750                    if exec_msg.exec_type == Some(BitmexExecType::CancelReject) {
751                        tracing::debug!(
752                            exec_id = ?exec_msg.exec_id,
753                            order_id = ?exec_msg.order_id,
754                            "CancelReject message missing symbol/clOrdID (expected with redundant cancels)"
755                        );
756                    } else {
757                        tracing::warn!(
758                            exec_id = ?exec_msg.exec_id,
759                            order_id = ?exec_msg.order_id,
760                            exec_type = ?exec_msg.exec_type,
761                            ord_rej_reason = ?exec_msg.ord_rej_reason,
762                            text = ?exec_msg.text,
763                            "Execution message missing both symbol and clOrdID, cannot process"
764                        );
765                    }
766                }
767                continue;
768            };
769
770            let Some(instrument) = Self::get_instrument(&self.instruments_cache, &symbol) else {
771                tracing::error!(
772                    "Instrument cache miss: execution message dropped for symbol={}, exec_id={:?}, exec_type={:?}, Liquidation/ADL fills may be lost",
773                    symbol,
774                    exec_msg.exec_id,
775                    exec_msg.exec_type
776                );
777                continue;
778            };
779
780            if let Some(fill) = parse_execution_msg(exec_msg, &instrument) {
781                fills.push(fill);
782            }
783        }
784
785        if fills.is_empty() {
786            return None;
787        }
788        Some(NautilusWsMessage::FillReports(fills))
789    }
790
791    fn handle_position(&self, data: Vec<BitmexPositionMsg>) -> Option<NautilusWsMessage> {
792        if let Some(pos_msg) = data.into_iter().next() {
793            let Some(instrument) = Self::get_instrument(&self.instruments_cache, &pos_msg.symbol)
794            else {
795                tracing::error!(
796                    "Instrument cache miss: position message dropped for symbol={}, account={}",
797                    pos_msg.symbol,
798                    pos_msg.account
799                );
800                return None;
801            };
802            let report = parse_position_msg(pos_msg, &instrument);
803            Some(NautilusWsMessage::PositionStatusReport(report))
804        } else {
805            None
806        }
807    }
808
809    fn handle_wallet(
810        &self,
811        data: Vec<BitmexWalletMsg>,
812        ts_init: UnixNanos,
813    ) -> Option<NautilusWsMessage> {
814        if let Some(wallet_msg) = data.into_iter().next() {
815            let account_state = parse_wallet_msg(wallet_msg, ts_init);
816            Some(NautilusWsMessage::AccountState(account_state))
817        } else {
818            None
819        }
820    }
821
822    fn handle_instrument(
823        &mut self,
824        action: BitmexAction,
825        data: Vec<BitmexInstrumentMsg>,
826        ts_init: UnixNanos,
827    ) -> Option<NautilusWsMessage> {
828        match action {
829            BitmexAction::Partial | BitmexAction::Insert => {
830                let mut instruments = Vec::with_capacity(data.len());
831                let mut temp_cache = AHashMap::new();
832
833                let data_for_prices = data.clone();
834
835                for msg in data {
836                    match msg.try_into() {
837                        Ok(http_inst) => {
838                            match parse_instrument_any(&http_inst, ts_init) {
839                                InstrumentParseResult::Ok(boxed) => {
840                                    let instrument_any = *boxed;
841                                    let symbol = instrument_any.symbol().inner();
842                                    temp_cache.insert(symbol, instrument_any.clone());
843                                    instruments.push(instrument_any);
844                                }
845                                InstrumentParseResult::Unsupported { .. } => {
846                                    // Silently skip unsupported instrument types
847                                }
848                                InstrumentParseResult::Failed {
849                                    symbol,
850                                    instrument_type,
851                                    error,
852                                } => {
853                                    log::warn!(
854                                        "Failed to parse instrument {symbol} ({instrument_type:?}): {error}"
855                                    );
856                                }
857                            }
858                        }
859                        Err(e) => {
860                            log::debug!("Skipping instrument (missing required fields): {e}");
861                        }
862                    }
863                }
864
865                // Update instruments_cache with new instruments
866                for (symbol, instrument) in &temp_cache {
867                    self.instruments_cache.insert(*symbol, instrument.clone());
868                }
869
870                if !instruments.is_empty()
871                    && let Err(e) = self
872                        .out_tx
873                        .send(NautilusWsMessage::Instruments(instruments))
874                {
875                    tracing::error!("Error sending instruments: {e}");
876                }
877
878                let mut data_msgs = Vec::with_capacity(data_for_prices.len());
879
880                for msg in data_for_prices {
881                    let parsed = parse_instrument_msg(msg, &temp_cache, ts_init);
882                    data_msgs.extend(parsed);
883                }
884
885                if data_msgs.is_empty() {
886                    return None;
887                }
888                Some(NautilusWsMessage::Data(data_msgs))
889            }
890            BitmexAction::Update => {
891                let mut data_msgs = Vec::with_capacity(data.len());
892
893                for msg in data {
894                    let parsed = parse_instrument_msg(msg, &self.instruments_cache, ts_init);
895                    data_msgs.extend(parsed);
896                }
897
898                if data_msgs.is_empty() {
899                    return None;
900                }
901                Some(NautilusWsMessage::Data(data_msgs))
902            }
903            BitmexAction::Delete => {
904                log::info!(
905                    "Received instrument delete action for {} instrument(s)",
906                    data.len()
907                );
908                None
909            }
910        }
911    }
912
913    fn handle_funding(
914        &self,
915        data: Vec<BitmexFundingMsg>,
916        ts_init: UnixNanos,
917    ) -> Option<NautilusWsMessage> {
918        let mut funding_updates = Vec::with_capacity(data.len());
919
920        for msg in data {
921            if let Some(parsed) = parse_funding_msg(msg, ts_init) {
922                funding_updates.push(parsed);
923            }
924        }
925
926        if !funding_updates.is_empty() {
927            Some(NautilusWsMessage::FundingRateUpdates(funding_updates))
928        } else {
929            None
930        }
931    }
932
933    fn handle_subscription_message(
934        &self,
935        success: bool,
936        subscribe: Option<&String>,
937        request: Option<&BitmexHttpRequest>,
938        error: Option<&str>,
939    ) -> Option<NautilusWsMessage> {
940        if let Some(req) = request {
941            if req
942                .op
943                .eq_ignore_ascii_case(BitmexWsAuthAction::AuthKeyExpires.as_ref())
944            {
945                if success {
946                    tracing::info!("WebSocket authenticated");
947                    self.auth_tracker.succeed();
948                    return Some(NautilusWsMessage::Authenticated);
949                } else {
950                    let reason = error.unwrap_or("Authentication rejected").to_string();
951                    tracing::error!(error = %reason, "WebSocket authentication failed");
952                    self.auth_tracker.fail(reason);
953                }
954                return None;
955            }
956
957            if req
958                .op
959                .eq_ignore_ascii_case(BitmexWsOperation::Subscribe.as_ref())
960            {
961                self.handle_subscription_ack(success, request, subscribe, error);
962                return None;
963            }
964
965            if req
966                .op
967                .eq_ignore_ascii_case(BitmexWsOperation::Unsubscribe.as_ref())
968            {
969                self.handle_unsubscribe_ack(success, request, subscribe, error);
970                return None;
971            }
972        }
973
974        if subscribe.is_some() {
975            self.handle_subscription_ack(success, request, subscribe, error);
976            return None;
977        }
978
979        if let Some(error) = error {
980            tracing::warn!(
981                success = success,
982                error = error,
983                "Unhandled subscription control message"
984            );
985        }
986
987        None
988    }
989}
990
991fn is_terminal_order_status(status: OrderStatus) -> bool {
992    matches!(
993        status,
994        OrderStatus::Canceled | OrderStatus::Expired | OrderStatus::Rejected | OrderStatus::Filled,
995    )
996}
997
998/// Returns `true` when a BitMEX error should be retried.
999pub(crate) fn should_retry_bitmex_error(error: &BitmexWsError) -> bool {
1000    match error {
1001        BitmexWsError::TungsteniteError(_) => true, // Network errors are retryable
1002        BitmexWsError::ClientError(msg) => {
1003            // Retry on timeout and connection errors (case-insensitive)
1004            let msg_lower = msg.to_lowercase();
1005            msg_lower.contains("timeout")
1006                || msg_lower.contains("timed out")
1007                || msg_lower.contains("connection")
1008                || msg_lower.contains("network")
1009        }
1010        _ => false,
1011    }
1012}
1013
1014/// Creates a timeout error for BitMEX retry logic.
1015pub(crate) fn create_bitmex_timeout_error(msg: String) -> BitmexWsError {
1016    BitmexWsError::ClientError(msg)
1017}
1018
1019#[cfg(test)]
1020mod tests {
1021    use rstest::rstest;
1022
1023    use super::*;
1024
1025    #[rstest]
1026    fn test_is_heartbeat_message_detection() {
1027        assert!(FeedHandler::is_heartbeat_message("{\"op\":\"ping\"}"));
1028        assert!(FeedHandler::is_heartbeat_message("{\"op\":\"pong\"}"));
1029        assert!(!FeedHandler::is_heartbeat_message(
1030            "{\"op\":\"subscribe\",\"args\":[\"trade:XBTUSD\"]}"
1031        ));
1032    }
1033}