nautilus_bitmex/websocket/
handler.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! WebSocket message handler for BitMEX.
17
18use std::sync::{
19    Arc,
20    atomic::{AtomicBool, Ordering},
21};
22
23use ahash::AHashMap;
24use dashmap::DashMap;
25use nautilus_common::cache::quote::QuoteCache;
26use nautilus_core::{UnixNanos, time::get_atomic_clock_realtime};
27use nautilus_model::{
28    data::Data,
29    enums::{OrderStatus, OrderType},
30    identifiers::{AccountId, ClientOrderId},
31    instruments::{Instrument, InstrumentAny},
32    types::Price,
33};
34use nautilus_network::{
35    RECONNECTED,
36    retry::{RetryManager, create_websocket_retry_manager},
37    websocket::{AuthTracker, SubscriptionState, WebSocketClient},
38};
39use tokio_tungstenite::tungstenite::Message;
40use ustr::Ustr;
41
42use super::{
43    enums::{BitmexAction, BitmexWsAuthAction, BitmexWsOperation, BitmexWsTopic},
44    error::BitmexWsError,
45    messages::{
46        BitmexExecutionMsg, BitmexFundingMsg, BitmexHttpRequest, BitmexInstrumentMsg,
47        BitmexOrderBook10Msg, BitmexOrderBookMsg, BitmexPositionMsg, BitmexQuoteMsg,
48        BitmexTableMessage, BitmexTradeBinMsg, BitmexTradeMsg, BitmexWalletMsg, BitmexWsMessage,
49        NautilusWsMessage, OrderData,
50    },
51    parse::{
52        parse_book_msg_vec, parse_book10_msg_vec, parse_execution_msg, parse_funding_msg,
53        parse_instrument_msg, parse_order_msg, parse_order_update_msg, parse_position_msg,
54        parse_trade_bin_msg_vec, parse_trade_msg_vec, parse_wallet_msg,
55    },
56};
57use crate::common::{enums::BitmexExecType, parse::parse_contracts_quantity};
58
59/// Commands sent from the outer client to the inner message handler.
60#[derive(Debug)]
61#[allow(
62    clippy::large_enum_variant,
63    reason = "Commands are ephemeral and immediately consumed"
64)]
65pub enum HandlerCommand {
66    /// Set the WebSocketClient for the handler to use.
67    SetClient(WebSocketClient),
68    /// Disconnect the WebSocket connection.
69    Disconnect,
70    /// Send authentication payload to the WebSocket.
71    Authenticate { payload: String },
72    /// Subscribe to the given topics.
73    Subscribe { topics: Vec<String> },
74    /// Unsubscribe from the given topics.
75    Unsubscribe { topics: Vec<String> },
76    /// Initialize the instruments cache with the given instruments.
77    InitializeInstruments(Vec<InstrumentAny>),
78    /// Update a single instrument in the cache.
79    UpdateInstrument(InstrumentAny),
80}
81
82pub(super) struct FeedHandler {
83    account_id: AccountId,
84    signal: Arc<AtomicBool>,
85    client: Option<WebSocketClient>,
86    cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
87    raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
88    out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
89    auth_tracker: AuthTracker,
90    subscriptions: SubscriptionState,
91    retry_manager: RetryManager<BitmexWsError>,
92    instruments_cache: AHashMap<Ustr, InstrumentAny>,
93    order_type_cache: Arc<DashMap<ClientOrderId, OrderType>>,
94    order_symbol_cache: Arc<DashMap<ClientOrderId, Ustr>>,
95    quote_cache: QuoteCache,
96}
97
98impl FeedHandler {
99    /// Creates a new [`FeedHandler`] instance.
100    #[allow(clippy::too_many_arguments)]
101    pub(super) fn new(
102        signal: Arc<AtomicBool>,
103        cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
104        raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
105        out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
106        account_id: AccountId,
107        auth_tracker: AuthTracker,
108        subscriptions: SubscriptionState,
109        order_type_cache: Arc<DashMap<ClientOrderId, OrderType>>,
110        order_symbol_cache: Arc<DashMap<ClientOrderId, Ustr>>,
111    ) -> Self {
112        Self {
113            account_id,
114            signal,
115            client: None,
116            cmd_rx,
117            raw_rx,
118            out_tx,
119            auth_tracker,
120            subscriptions,
121            retry_manager: create_websocket_retry_manager(),
122            instruments_cache: AHashMap::new(),
123            order_type_cache,
124            order_symbol_cache,
125            quote_cache: QuoteCache::new(),
126        }
127    }
128
129    pub(super) fn is_stopped(&self) -> bool {
130        self.signal.load(Ordering::Relaxed)
131    }
132
133    pub(super) fn send(&self, msg: NautilusWsMessage) -> Result<(), ()> {
134        self.out_tx.send(msg).map_err(|_| ())
135    }
136
137    /// Sends a WebSocket message with retry logic.
138    async fn send_with_retry(&self, payload: String) -> anyhow::Result<()> {
139        if let Some(client) = &self.client {
140            self.retry_manager
141                .execute_with_retry(
142                    "websocket_send",
143                    || {
144                        let payload = payload.clone();
145                        async move {
146                            client.send_text(payload, None).await.map_err(|e| {
147                                BitmexWsError::ClientError(format!("Send failed: {e}"))
148                            })
149                        }
150                    },
151                    should_retry_bitmex_error,
152                    create_bitmex_timeout_error,
153                )
154                .await
155                .map_err(|e| anyhow::anyhow!("{e}"))
156        } else {
157            Err(anyhow::anyhow!("No active WebSocket client"))
158        }
159    }
160
161    #[inline]
162    fn get_instrument(
163        cache: &AHashMap<Ustr, InstrumentAny>,
164        symbol: &Ustr,
165    ) -> Option<InstrumentAny> {
166        cache.get(symbol).cloned()
167    }
168
169    pub(super) async fn next(&mut self) -> Option<NautilusWsMessage> {
170        let clock = get_atomic_clock_realtime();
171
172        loop {
173            tokio::select! {
174                Some(cmd) = self.cmd_rx.recv() => {
175                    match cmd {
176                        HandlerCommand::SetClient(client) => {
177                            tracing::debug!("WebSocketClient received by handler");
178                            self.client = Some(client);
179                        }
180                        HandlerCommand::Disconnect => {
181                            tracing::debug!("Disconnect command received");
182                            if let Some(client) = self.client.take() {
183                                client.disconnect().await;
184                            }
185                        }
186                        HandlerCommand::Authenticate { payload } => {
187                            tracing::debug!("Authenticate command received");
188                            if let Err(e) = self.send_with_retry(payload).await {
189                                tracing::error!(error = %e, "Failed to send authentication after retries");
190                            }
191                        }
192                        HandlerCommand::Subscribe { topics } => {
193                            for topic in topics {
194                                tracing::debug!(topic = %topic, "Subscribing to topic");
195                                if let Err(e) = self.send_with_retry(topic.clone()).await {
196                                    tracing::error!(topic = %topic, error = %e, "Failed to send subscription after retries");
197                                }
198                            }
199                        }
200                        HandlerCommand::Unsubscribe { topics } => {
201                            for topic in topics {
202                                tracing::debug!(topic = %topic, "Unsubscribing from topic");
203                                if let Err(e) = self.send_with_retry(topic.clone()).await {
204                                    tracing::error!(topic = %topic, error = %e, "Failed to send unsubscription after retries");
205                                }
206                            }
207                        }
208                        HandlerCommand::InitializeInstruments(instruments) => {
209                            for inst in instruments {
210                                self.instruments_cache.insert(inst.symbol().inner(), inst);
211                            }
212                        }
213                        HandlerCommand::UpdateInstrument(inst) => {
214                            self.instruments_cache.insert(inst.symbol().inner(), inst);
215                        }
216                    }
217                    // Continue processing following command
218                    continue;
219                }
220
221                _ = tokio::time::sleep(std::time::Duration::from_millis(100)) => {
222                    if self.signal.load(std::sync::atomic::Ordering::Relaxed) {
223                        tracing::debug!("Stop signal received during idle period");
224                        return None;
225                    }
226                    continue;
227                }
228
229                msg = self.raw_rx.recv() => {
230                    let msg = match msg {
231                        Some(msg) => msg,
232                        None => {
233                            tracing::debug!("WebSocket stream closed");
234                            return None;
235                        }
236                    };
237
238                    // Handle ping frames directly for minimal latency
239                    if let Message::Ping(data) = &msg {
240                        tracing::trace!("Received ping frame with {} bytes", data.len());
241                        if let Some(client) = &self.client
242                            && let Err(e) = client.send_pong(data.to_vec()).await
243                        {
244                            tracing::warn!(error = %e, "Failed to send pong frame");
245                        }
246                        continue;
247                    }
248
249                    let event = match Self::parse_raw_message(msg) {
250                        Some(event) => event,
251                        None => continue,
252                    };
253
254                    if self.signal.load(std::sync::atomic::Ordering::Relaxed) {
255                        tracing::debug!("Stop signal received");
256                        return None;
257                    }
258
259            match event {
260                BitmexWsMessage::Reconnected => {
261                    self.quote_cache.clear();
262                    return Some(NautilusWsMessage::Reconnected);
263                }
264                BitmexWsMessage::Subscription {
265                    success,
266                    subscribe,
267                    request,
268                    error,
269                } => {
270                    if let Some(msg) = self.handle_subscription_message(
271                        success,
272                        subscribe.as_ref(),
273                        request.as_ref(),
274                        error.as_deref(),
275                    ) {
276                        return Some(msg);
277                    }
278                    continue;
279                }
280                BitmexWsMessage::Table(table_msg) => {
281                    let ts_init = clock.get_time_ns();
282
283                    let msg = match table_msg {
284                        BitmexTableMessage::OrderBookL2 { action, data } => {
285                            self.handle_orderbook_l2(action, data, ts_init)
286                        }
287                        BitmexTableMessage::OrderBookL2_25 { action, data } => {
288                            self.handle_orderbook_l2(action, data, ts_init)
289                        }
290                        BitmexTableMessage::OrderBook10 { data, .. } => {
291                            self.handle_orderbook_10(data, ts_init)
292                        }
293                        BitmexTableMessage::Quote { data, .. } => {
294                            self.handle_quote(data, ts_init)
295                        }
296                        BitmexTableMessage::Trade { data, .. } => {
297                            self.handle_trade(data, ts_init)
298                        }
299                        BitmexTableMessage::TradeBin1m { action, data } => {
300                            self.handle_trade_bin(action, data, BitmexWsTopic::TradeBin1m, ts_init)
301                        }
302                        BitmexTableMessage::TradeBin5m { action, data } => {
303                            self.handle_trade_bin(action, data, BitmexWsTopic::TradeBin5m, ts_init)
304                        }
305                        BitmexTableMessage::TradeBin1h { action, data } => {
306                            self.handle_trade_bin(action, data, BitmexWsTopic::TradeBin1h, ts_init)
307                        }
308                        BitmexTableMessage::TradeBin1d { action, data } => {
309                            self.handle_trade_bin(action, data, BitmexWsTopic::TradeBin1d, ts_init)
310                        }
311                        // Execution messages
312                        // Note: BitMEX may send duplicate order status updates for the same order
313                        // (e.g., immediate response + stream update). This is expected behavior.
314                        BitmexTableMessage::Order { data, .. } => {
315                            self.handle_order(data)
316                        }
317                        BitmexTableMessage::Execution { data, .. } => {
318                            self.handle_execution(data)
319                        }
320                        BitmexTableMessage::Position { data, .. } => {
321                            self.handle_position(data)
322                        }
323                        BitmexTableMessage::Wallet { data, .. } => {
324                            self.handle_wallet(data, ts_init)
325                        }
326                        BitmexTableMessage::Margin { .. } => {
327                            // Skip margin messages - BitMEX uses account-level cross-margin
328                            // which doesn't map well to Nautilus's per-instrument margin model
329                            None
330                        }
331                        BitmexTableMessage::Instrument { action, data } => {
332                            self.handle_instrument(action, data, ts_init)
333                        }
334                        BitmexTableMessage::Funding { data, .. } => {
335                            self.handle_funding(data, ts_init)
336                        }
337                        _ => {
338                            // Other message types not yet implemented
339                            tracing::warn!("Unhandled table message type: {table_msg:?}");
340                            None
341                        }
342                    };
343
344                    if let Some(msg) = msg {
345                        return Some(msg);
346                    }
347                    continue;
348                }
349                BitmexWsMessage::Welcome { .. } | BitmexWsMessage::Error { .. } => continue,
350            }
351                }
352
353                // Handle shutdown - either channel closed or stream ended
354                else => {
355                    tracing::debug!("Handler shutting down: stream ended or command channel closed");
356                    return None;
357                }
358            }
359        }
360    }
361
362    fn parse_raw_message(msg: Message) -> Option<BitmexWsMessage> {
363        match msg {
364            Message::Text(text) => {
365                if text == RECONNECTED {
366                    tracing::info!("Received WebSocket reconnected signal");
367                    return Some(BitmexWsMessage::Reconnected);
368                }
369
370                tracing::trace!("Raw websocket message: {text}");
371
372                if Self::is_heartbeat_message(&text) {
373                    tracing::trace!("Ignoring heartbeat control message: {text}");
374                    return None;
375                }
376
377                match serde_json::from_str(&text) {
378                    Ok(msg) => match &msg {
379                        BitmexWsMessage::Welcome {
380                            version,
381                            heartbeat_enabled,
382                            limit,
383                            ..
384                        } => {
385                            tracing::info!(
386                                version = version,
387                                heartbeat = heartbeat_enabled,
388                                rate_limit = ?limit.remaining,
389                                "Welcome to the BitMEX Realtime API:",
390                            );
391                        }
392                        BitmexWsMessage::Subscription { .. } => return Some(msg),
393                        BitmexWsMessage::Error { status, error, .. } => {
394                            tracing::error!(
395                                status = status,
396                                error = error,
397                                "Received error from BitMEX"
398                            );
399                        }
400                        _ => return Some(msg),
401                    },
402                    Err(e) => {
403                        tracing::error!("Failed to parse WebSocket message: {e}: {text}");
404                    }
405                }
406            }
407            Message::Binary(msg) => {
408                tracing::debug!("Raw binary: {msg:?}");
409            }
410            Message::Close(_) => {
411                tracing::debug!("Received close message, waiting for reconnection");
412            }
413            Message::Ping(data) => {
414                // Handled in select! loop before parse_raw_message
415                tracing::trace!("Ping frame with {} bytes (already handled)", data.len());
416            }
417            Message::Pong(data) => {
418                tracing::trace!("Received pong frame with {} bytes", data.len());
419            }
420            Message::Frame(frame) => {
421                tracing::debug!("Received raw frame: {frame:?}");
422            }
423        }
424
425        None
426    }
427
428    fn is_heartbeat_message(text: &str) -> bool {
429        let trimmed = text.trim();
430
431        if !trimmed.starts_with('{') || trimmed.len() > 64 {
432            return false;
433        }
434
435        trimmed.contains("\"op\":\"ping\"") || trimmed.contains("\"op\":\"pong\"")
436    }
437
438    fn handle_subscription_ack(
439        &self,
440        success: bool,
441        request: Option<&BitmexHttpRequest>,
442        subscribe: Option<&String>,
443        error: Option<&str>,
444    ) {
445        let topics = Self::topics_from_request(request, subscribe);
446
447        if topics.is_empty() {
448            tracing::debug!("Subscription acknowledgement without topics");
449            return;
450        }
451
452        for topic in topics {
453            if success {
454                self.subscriptions.confirm_subscribe(topic);
455                tracing::debug!(topic = topic, "Subscription confirmed");
456            } else {
457                self.subscriptions.mark_failure(topic);
458                let reason = error.unwrap_or("Subscription rejected");
459                tracing::error!(topic = topic, error = reason, "Subscription failed");
460            }
461        }
462    }
463
464    fn handle_unsubscribe_ack(
465        &self,
466        success: bool,
467        request: Option<&BitmexHttpRequest>,
468        subscribe: Option<&String>,
469        error: Option<&str>,
470    ) {
471        let topics = Self::topics_from_request(request, subscribe);
472
473        if topics.is_empty() {
474            tracing::debug!("Unsubscription acknowledgement without topics");
475            return;
476        }
477
478        for topic in topics {
479            if success {
480                tracing::debug!(topic = topic, "Unsubscription confirmed");
481                self.subscriptions.confirm_unsubscribe(topic);
482            } else {
483                let reason = error.unwrap_or("Unsubscription rejected");
484                tracing::error!(
485                    topic = topic,
486                    error = reason,
487                    "Unsubscription failed - restoring subscription"
488                );
489                // Venue rejected unsubscribe, so we're still subscribed. Restore state:
490                self.subscriptions.confirm_unsubscribe(topic); // Clear pending_unsubscribe
491                self.subscriptions.mark_subscribe(topic); // Mark as subscribing
492                self.subscriptions.confirm_subscribe(topic); // Confirm subscription
493            }
494        }
495    }
496
497    fn topics_from_request<'a>(
498        request: Option<&'a BitmexHttpRequest>,
499        fallback: Option<&'a String>,
500    ) -> Vec<&'a str> {
501        if let Some(req) = request
502            && !req.args.is_empty()
503        {
504            return req.args.iter().filter_map(|arg| arg.as_str()).collect();
505        }
506
507        fallback.into_iter().map(|topic| topic.as_str()).collect()
508    }
509
510    fn handle_orderbook_l2(
511        &self,
512        action: BitmexAction,
513        data: Vec<BitmexOrderBookMsg>,
514        ts_init: UnixNanos,
515    ) -> Option<NautilusWsMessage> {
516        if data.is_empty() {
517            return None;
518        }
519        let data = parse_book_msg_vec(data, action, &self.instruments_cache, ts_init);
520        Some(NautilusWsMessage::Data(data))
521    }
522
523    fn handle_orderbook_10(
524        &self,
525        data: Vec<BitmexOrderBook10Msg>,
526        ts_init: UnixNanos,
527    ) -> Option<NautilusWsMessage> {
528        if data.is_empty() {
529            return None;
530        }
531        let data = parse_book10_msg_vec(data, &self.instruments_cache, ts_init);
532        Some(NautilusWsMessage::Data(data))
533    }
534
535    fn handle_quote(
536        &mut self,
537        mut data: Vec<BitmexQuoteMsg>,
538        ts_init: UnixNanos,
539    ) -> Option<NautilusWsMessage> {
540        // Index symbols may return empty quote data
541        if data.is_empty() {
542            return None;
543        }
544
545        let msg = data.remove(0);
546        let Some(instrument) = Self::get_instrument(&self.instruments_cache, &msg.symbol) else {
547            tracing::error!(
548                "Instrument cache miss: quote message dropped for symbol={}",
549                msg.symbol
550            );
551            return None;
552        };
553
554        let instrument_id = instrument.id();
555        let price_precision = instrument.price_precision();
556
557        let bid_price = msg.bid_price.map(|p| Price::new(p, price_precision));
558        let ask_price = msg.ask_price.map(|p| Price::new(p, price_precision));
559        let bid_size = msg
560            .bid_size
561            .map(|s| parse_contracts_quantity(s, &instrument));
562        let ask_size = msg
563            .ask_size
564            .map(|s| parse_contracts_quantity(s, &instrument));
565        let ts_event = UnixNanos::from(msg.timestamp);
566
567        match self.quote_cache.process(
568            instrument_id,
569            bid_price,
570            ask_price,
571            bid_size,
572            ask_size,
573            ts_event,
574            ts_init,
575        ) {
576            Ok(quote) => Some(NautilusWsMessage::Data(vec![Data::Quote(quote)])),
577            Err(e) => {
578                tracing::warn!(error = %e, "Failed to process quote");
579                None
580            }
581        }
582    }
583
584    fn handle_trade(
585        &self,
586        data: Vec<BitmexTradeMsg>,
587        ts_init: UnixNanos,
588    ) -> Option<NautilusWsMessage> {
589        if data.is_empty() {
590            return None;
591        }
592        let data = parse_trade_msg_vec(data, &self.instruments_cache, ts_init);
593        Some(NautilusWsMessage::Data(data))
594    }
595
596    fn handle_trade_bin(
597        &self,
598        action: BitmexAction,
599        data: Vec<BitmexTradeBinMsg>,
600        topic: BitmexWsTopic,
601        ts_init: UnixNanos,
602    ) -> Option<NautilusWsMessage> {
603        if action == BitmexAction::Partial || data.is_empty() {
604            return None;
605        }
606        let data = parse_trade_bin_msg_vec(data, topic, &self.instruments_cache, ts_init);
607        Some(NautilusWsMessage::Data(data))
608    }
609
610    fn handle_order(&mut self, data: Vec<OrderData>) -> Option<NautilusWsMessage> {
611        // Process all orders in the message
612        let mut reports = Vec::with_capacity(data.len());
613
614        for order_data in data {
615            match order_data {
616                OrderData::Full(order_msg) => {
617                    let Some(instrument) =
618                        Self::get_instrument(&self.instruments_cache, &order_msg.symbol)
619                    else {
620                        tracing::error!(
621                            "Instrument cache miss: order message dropped for symbol={}, order_id={}",
622                            order_msg.symbol,
623                            order_msg.order_id
624                        );
625                        continue;
626                    };
627
628                    match parse_order_msg(&order_msg, &instrument, &self.order_type_cache) {
629                        Ok(report) => {
630                            // Cache the order type and symbol AFTER successful parse
631                            if let Some(client_order_id) = &order_msg.cl_ord_id {
632                                let client_order_id = ClientOrderId::new(client_order_id);
633
634                                if let Some(ord_type) = &order_msg.ord_type {
635                                    let order_type: OrderType = (*ord_type).into();
636                                    self.order_type_cache.insert(client_order_id, order_type);
637                                }
638
639                                // Cache symbol for execution message routing
640                                self.order_symbol_cache
641                                    .insert(client_order_id, order_msg.symbol);
642                            }
643
644                            if is_terminal_order_status(report.order_status)
645                                && let Some(client_id) = report.client_order_id
646                            {
647                                self.order_type_cache.remove(&client_id);
648                                self.order_symbol_cache.remove(&client_id);
649                            }
650
651                            reports.push(report);
652                        }
653                        Err(e) => {
654                            tracing::error!(
655                                error = %e,
656                                symbol = %order_msg.symbol,
657                                order_id = %order_msg.order_id,
658                                time_in_force = ?order_msg.time_in_force,
659                                "Failed to parse full order message - potential data loss"
660                            );
661                            // TODO: Add metric counter for parse failures
662                            continue;
663                        }
664                    }
665                }
666                OrderData::Update(msg) => {
667                    let Some(instrument) =
668                        Self::get_instrument(&self.instruments_cache, &msg.symbol)
669                    else {
670                        tracing::error!(
671                            "Instrument cache miss: order update dropped for symbol={}, order_id={}",
672                            msg.symbol,
673                            msg.order_id
674                        );
675                        continue;
676                    };
677
678                    // Populate cache for execution message routing (handles edge case where update arrives before full snapshot)
679                    if let Some(cl_ord_id) = &msg.cl_ord_id {
680                        let client_order_id = ClientOrderId::new(cl_ord_id);
681                        self.order_symbol_cache.insert(client_order_id, msg.symbol);
682                    }
683
684                    if let Some(event) = parse_order_update_msg(&msg, &instrument, self.account_id)
685                    {
686                        return Some(NautilusWsMessage::OrderUpdated(event));
687                    } else {
688                        tracing::warn!(
689                            order_id = %msg.order_id,
690                            price = ?msg.price,
691                            "Skipped order update message (insufficient data)"
692                        );
693                    }
694                }
695            }
696        }
697
698        if reports.is_empty() {
699            return None;
700        }
701
702        Some(NautilusWsMessage::OrderStatusReports(reports))
703    }
704
705    fn handle_execution(&mut self, data: Vec<BitmexExecutionMsg>) -> Option<NautilusWsMessage> {
706        let mut fills = Vec::with_capacity(data.len());
707
708        for exec_msg in data {
709            // Try to get symbol, fall back to cache lookup if missing
710            let symbol_opt = if let Some(sym) = &exec_msg.symbol {
711                Some(*sym)
712            } else if let Some(cl_ord_id) = &exec_msg.cl_ord_id {
713                // Try to look up symbol from order_symbol_cache
714                let client_order_id = ClientOrderId::new(cl_ord_id);
715                self.order_symbol_cache
716                    .get(&client_order_id)
717                    .map(|r| *r.value())
718            } else {
719                None
720            };
721
722            let Some(symbol) = symbol_opt else {
723                // Symbol missing - log appropriately based on exec type and whether we had clOrdID
724                if let Some(cl_ord_id) = &exec_msg.cl_ord_id {
725                    if exec_msg.exec_type == Some(BitmexExecType::Trade) {
726                        tracing::warn!(
727                            cl_ord_id = %cl_ord_id,
728                            exec_id = ?exec_msg.exec_id,
729                            ord_rej_reason = ?exec_msg.ord_rej_reason,
730                            text = ?exec_msg.text,
731                            "Execution message missing symbol and not found in cache"
732                        );
733                    } else {
734                        tracing::debug!(
735                            cl_ord_id = %cl_ord_id,
736                            exec_id = ?exec_msg.exec_id,
737                            exec_type = ?exec_msg.exec_type,
738                            ord_rej_reason = ?exec_msg.ord_rej_reason,
739                            text = ?exec_msg.text,
740                            "Execution message missing symbol and not found in cache"
741                        );
742                    }
743                } else {
744                    // CancelReject messages without symbol/clOrdID are expected when using
745                    // redundant cancel broadcasting - one cancel succeeds, others arrive late
746                    // and BitMEX responds with CancelReject but doesn't populate the fields
747                    if exec_msg.exec_type == Some(BitmexExecType::CancelReject) {
748                        tracing::debug!(
749                            exec_id = ?exec_msg.exec_id,
750                            order_id = ?exec_msg.order_id,
751                            "CancelReject message missing symbol/clOrdID (expected with redundant cancels)"
752                        );
753                    } else {
754                        tracing::warn!(
755                            exec_id = ?exec_msg.exec_id,
756                            order_id = ?exec_msg.order_id,
757                            exec_type = ?exec_msg.exec_type,
758                            ord_rej_reason = ?exec_msg.ord_rej_reason,
759                            text = ?exec_msg.text,
760                            "Execution message missing both symbol and clOrdID, cannot process"
761                        );
762                    }
763                }
764                continue;
765            };
766
767            let Some(instrument) = Self::get_instrument(&self.instruments_cache, &symbol) else {
768                tracing::error!(
769                    "Instrument cache miss: execution message dropped for symbol={}, exec_id={:?}, exec_type={:?}, Liquidation/ADL fills may be lost",
770                    symbol,
771                    exec_msg.exec_id,
772                    exec_msg.exec_type
773                );
774                continue;
775            };
776
777            if let Some(fill) = parse_execution_msg(exec_msg, &instrument) {
778                fills.push(fill);
779            }
780        }
781
782        if fills.is_empty() {
783            return None;
784        }
785        Some(NautilusWsMessage::FillReports(fills))
786    }
787
788    fn handle_position(&self, data: Vec<BitmexPositionMsg>) -> Option<NautilusWsMessage> {
789        if let Some(pos_msg) = data.into_iter().next() {
790            let Some(instrument) = Self::get_instrument(&self.instruments_cache, &pos_msg.symbol)
791            else {
792                tracing::error!(
793                    "Instrument cache miss: position message dropped for symbol={}, account={}",
794                    pos_msg.symbol,
795                    pos_msg.account
796                );
797                return None;
798            };
799            let report = parse_position_msg(pos_msg, &instrument);
800            Some(NautilusWsMessage::PositionStatusReport(report))
801        } else {
802            None
803        }
804    }
805
806    fn handle_wallet(
807        &self,
808        data: Vec<BitmexWalletMsg>,
809        ts_init: UnixNanos,
810    ) -> Option<NautilusWsMessage> {
811        if let Some(wallet_msg) = data.into_iter().next() {
812            let account_state = parse_wallet_msg(wallet_msg, ts_init);
813            Some(NautilusWsMessage::AccountState(account_state))
814        } else {
815            None
816        }
817    }
818
819    fn handle_instrument(
820        &mut self,
821        action: BitmexAction,
822        data: Vec<BitmexInstrumentMsg>,
823        ts_init: UnixNanos,
824    ) -> Option<NautilusWsMessage> {
825        match action {
826            BitmexAction::Partial | BitmexAction::Insert => {
827                let mut instruments = Vec::with_capacity(data.len());
828                let mut temp_cache = AHashMap::new();
829
830                let data_for_prices = data.clone();
831
832                for msg in data {
833                    match msg.try_into() {
834                        Ok(http_inst) => {
835                            match crate::http::parse::parse_instrument_any(&http_inst, ts_init) {
836                                Some(instrument_any) => {
837                                    let symbol = instrument_any.symbol().inner();
838                                    temp_cache.insert(symbol, instrument_any.clone());
839                                    instruments.push(instrument_any);
840                                }
841                                None => {
842                                    log::warn!("Failed to parse instrument from WebSocket");
843                                }
844                            }
845                        }
846                        Err(e) => {
847                            log::debug!("Skipping instrument (missing required fields): {e}");
848                        }
849                    }
850                }
851
852                // Update instruments_cache with new instruments
853                for (symbol, instrument) in temp_cache.iter() {
854                    self.instruments_cache.insert(*symbol, instrument.clone());
855                }
856
857                if !instruments.is_empty()
858                    && let Err(e) = self
859                        .out_tx
860                        .send(NautilusWsMessage::Instruments(instruments))
861                {
862                    tracing::error!("Error sending instruments: {e}");
863                }
864
865                let mut data_msgs = Vec::with_capacity(data_for_prices.len());
866
867                for msg in data_for_prices {
868                    let parsed = parse_instrument_msg(msg, &temp_cache, ts_init);
869                    data_msgs.extend(parsed);
870                }
871
872                if data_msgs.is_empty() {
873                    return None;
874                }
875                Some(NautilusWsMessage::Data(data_msgs))
876            }
877            BitmexAction::Update => {
878                let mut data_msgs = Vec::with_capacity(data.len());
879
880                for msg in data {
881                    let parsed = parse_instrument_msg(msg, &self.instruments_cache, ts_init);
882                    data_msgs.extend(parsed);
883                }
884
885                if data_msgs.is_empty() {
886                    return None;
887                }
888                Some(NautilusWsMessage::Data(data_msgs))
889            }
890            BitmexAction::Delete => {
891                log::info!(
892                    "Received instrument delete action for {} instrument(s)",
893                    data.len()
894                );
895                None
896            }
897        }
898    }
899
900    fn handle_funding(
901        &self,
902        data: Vec<BitmexFundingMsg>,
903        ts_init: UnixNanos,
904    ) -> Option<NautilusWsMessage> {
905        let mut funding_updates = Vec::with_capacity(data.len());
906
907        for msg in data {
908            if let Some(parsed) = parse_funding_msg(msg, ts_init) {
909                funding_updates.push(parsed);
910            }
911        }
912
913        if !funding_updates.is_empty() {
914            Some(NautilusWsMessage::FundingRateUpdates(funding_updates))
915        } else {
916            None
917        }
918    }
919
920    fn handle_subscription_message(
921        &self,
922        success: bool,
923        subscribe: Option<&String>,
924        request: Option<&BitmexHttpRequest>,
925        error: Option<&str>,
926    ) -> Option<NautilusWsMessage> {
927        if let Some(req) = request {
928            if req
929                .op
930                .eq_ignore_ascii_case(BitmexWsAuthAction::AuthKeyExpires.as_ref())
931            {
932                if success {
933                    tracing::info!("WebSocket authenticated");
934                    self.auth_tracker.succeed();
935                    return Some(NautilusWsMessage::Authenticated);
936                } else {
937                    let reason = error.unwrap_or("Authentication rejected").to_string();
938                    tracing::error!(error = %reason, "WebSocket authentication failed");
939                    self.auth_tracker.fail(reason);
940                }
941                return None;
942            }
943
944            if req
945                .op
946                .eq_ignore_ascii_case(BitmexWsOperation::Subscribe.as_ref())
947            {
948                self.handle_subscription_ack(success, request, subscribe, error);
949                return None;
950            }
951
952            if req
953                .op
954                .eq_ignore_ascii_case(BitmexWsOperation::Unsubscribe.as_ref())
955            {
956                self.handle_unsubscribe_ack(success, request, subscribe, error);
957                return None;
958            }
959        }
960
961        if subscribe.is_some() {
962            self.handle_subscription_ack(success, request, subscribe, error);
963            return None;
964        }
965
966        if let Some(error) = error {
967            tracing::warn!(
968                success = success,
969                error = error,
970                "Unhandled subscription control message"
971            );
972        }
973
974        None
975    }
976}
977
978fn is_terminal_order_status(status: OrderStatus) -> bool {
979    matches!(
980        status,
981        OrderStatus::Canceled | OrderStatus::Expired | OrderStatus::Rejected | OrderStatus::Filled,
982    )
983}
984
985/// Returns `true` when a BitMEX error should be retried.
986pub(crate) fn should_retry_bitmex_error(error: &BitmexWsError) -> bool {
987    match error {
988        BitmexWsError::TungsteniteError(_) => true, // Network errors are retryable
989        BitmexWsError::ClientError(msg) => {
990            // Retry on timeout and connection errors (case-insensitive)
991            let msg_lower = msg.to_lowercase();
992            msg_lower.contains("timeout")
993                || msg_lower.contains("timed out")
994                || msg_lower.contains("connection")
995                || msg_lower.contains("network")
996        }
997        _ => false,
998    }
999}
1000
1001/// Creates a timeout error for BitMEX retry logic.
1002pub(crate) fn create_bitmex_timeout_error(msg: String) -> BitmexWsError {
1003    BitmexWsError::ClientError(msg)
1004}
1005
1006////////////////////////////////////////////////////////////////////////////////
1007// Tests
1008////////////////////////////////////////////////////////////////////////////////
1009
1010#[cfg(test)]
1011mod tests {
1012    use rstest::rstest;
1013
1014    use super::*;
1015
1016    #[rstest]
1017    fn test_is_heartbeat_message_detection() {
1018        assert!(FeedHandler::is_heartbeat_message("{\"op\":\"ping\"}"));
1019        assert!(FeedHandler::is_heartbeat_message("{\"op\":\"pong\"}"));
1020        assert!(!FeedHandler::is_heartbeat_message(
1021            "{\"op\":\"subscribe\",\"args\":[\"trade:XBTUSD\"]}"
1022        ));
1023    }
1024}