nautilus_bitmex/websocket/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides the WebSocket client integration for the [BitMEX](https://bitmex.com) WebSocket API.
17//!
18//! This module defines and implements a [`BitmexWebSocketClient`] for
19//! connecting to BitMEX WebSocket streams. It handles authentication (when credentials
20//! are provided), manages subscriptions to market data and account update channels,
21//! and parses incoming messages into structured Nautilus domain objects.
22
23use std::sync::{
24    Arc,
25    atomic::{AtomicBool, Ordering},
26};
27
28use ahash::{AHashMap, AHashSet};
29use dashmap::DashMap;
30use futures_util::Stream;
31use nautilus_common::runtime::get_runtime;
32use nautilus_core::{
33    consts::NAUTILUS_USER_AGENT, env::get_env_var, time::get_atomic_clock_realtime,
34};
35use nautilus_model::{
36    data::{Data, bar::BarType},
37    identifiers::{AccountId, InstrumentId},
38    instruments::{Instrument, InstrumentAny},
39};
40use nautilus_network::{
41    RECONNECTED,
42    websocket::{WebSocketClient, WebSocketConfig, channel_message_handler},
43};
44use reqwest::header::USER_AGENT;
45use tokio::{sync::RwLock, time::Duration};
46use tokio_tungstenite::tungstenite::Message;
47use ustr::Ustr;
48
49use super::{
50    cache::QuoteCache,
51    enums::{
52        BitmexAction, BitmexWsAuthAction, BitmexWsAuthChannel, BitmexWsOperation, BitmexWsTopic,
53    },
54    error::BitmexWsError,
55    messages::{
56        BitmexAuthentication, BitmexSubscription, BitmexTableMessage, BitmexWsMessage,
57        NautilusWsMessage, OrderData,
58    },
59    parse::{
60        is_index_symbol, parse_book_msg_vec, parse_book10_msg_vec, parse_order_update_msg,
61        parse_trade_bin_msg_vec, parse_trade_msg_vec, parse_wallet_msg, topic_from_bar_spec,
62    },
63};
64use crate::{
65    common::{consts::BITMEX_WS_URL, credential::Credential, enums::BitmexExecType},
66    websocket::parse::{
67        parse_execution_msg, parse_funding_msg, parse_instrument_msg, parse_order_msg,
68        parse_position_msg,
69    },
70};
71
72/// Provides a WebSocket client for connecting to the [BitMEX](https://bitmex.com) real-time API.
73#[derive(Clone, Debug)]
74#[cfg_attr(
75    feature = "python",
76    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.adapters")
77)]
78pub struct BitmexWebSocketClient {
79    url: String,
80    credential: Option<Credential>,
81    heartbeat: Option<u64>,
82    inner: Arc<RwLock<Option<WebSocketClient>>>,
83    rx: Option<Arc<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>>>,
84    signal: Arc<AtomicBool>,
85    task_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
86    subscriptions: Arc<DashMap<String, AHashSet<Ustr>>>,
87    instruments_cache: Arc<AHashMap<Ustr, InstrumentAny>>,
88    account_id: AccountId,
89}
90
91impl BitmexWebSocketClient {
92    /// Creates a new [`BitmexWebSocketClient`] instance.
93    ///
94    /// # Errors
95    ///
96    /// Returns an error if only one of `api_key` or `api_secret` is provided (both or neither required).
97    pub fn new(
98        url: Option<String>,
99        api_key: Option<String>,
100        api_secret: Option<String>,
101        account_id: Option<AccountId>,
102        heartbeat: Option<u64>,
103    ) -> anyhow::Result<Self> {
104        let credential = match (api_key, api_secret) {
105            (Some(key), Some(secret)) => Some(Credential::new(key, secret)),
106            (None, None) => None,
107            _ => anyhow::bail!("Both `api_key` and `api_secret` must be provided together"),
108        };
109
110        let account_id = account_id.unwrap_or(AccountId::from("BITMEX-master"));
111
112        Ok(Self {
113            url: url.unwrap_or(BITMEX_WS_URL.to_string()),
114            credential,
115            heartbeat,
116            inner: Arc::new(RwLock::new(None)),
117            rx: None,
118            signal: Arc::new(AtomicBool::new(false)),
119            task_handle: None,
120            subscriptions: Arc::new(DashMap::new()),
121            instruments_cache: Arc::new(AHashMap::new()),
122            account_id,
123        })
124    }
125
126    /// Creates a new authenticated [`BitmexWebSocketClient`] using environment variables.
127    ///
128    /// # Errors
129    ///
130    /// Returns an error if environment variables are not set or credentials are invalid.
131    pub fn from_env() -> anyhow::Result<Self> {
132        let url = get_env_var("BITMEX_WS_URL")?;
133        let api_key = get_env_var("BITMEX_API_KEY")?;
134        let api_secret = get_env_var("BITMEX_API_SECRET")?;
135
136        Self::new(Some(url), Some(api_key), Some(api_secret), None, None)
137    }
138
139    /// Returns the websocket url being used by the client.
140    #[must_use]
141    pub const fn url(&self) -> &str {
142        self.url.as_str()
143    }
144
145    /// Returns the public API key being used by the client.
146    #[must_use]
147    pub fn api_key(&self) -> Option<&str> {
148        self.credential.as_ref().map(|c| c.api_key.as_str())
149    }
150
151    /// Returns a value indicating whether the client is active.
152    #[must_use]
153    pub fn is_active(&self) -> bool {
154        match self.inner.try_read() {
155            Ok(guard) => match &*guard {
156                Some(inner) => inner.is_active(),
157                None => false,
158            },
159            Err(_) => false,
160        }
161    }
162
163    /// Returns a value indicating whether the client is closed.
164    #[must_use]
165    pub fn is_closed(&self) -> bool {
166        match self.inner.try_read() {
167            Ok(guard) => match &*guard {
168                Some(inner) => inner.is_closed(),
169                None => true,
170            },
171            Err(_) => true,
172        }
173    }
174
175    /// Sets the account ID.
176    pub fn set_account_id(&mut self, account_id: AccountId) {
177        self.account_id = account_id;
178    }
179
180    /// Initialize the instruments cache with the given `instruments`.
181    pub fn initialize_instruments_cache(&mut self, instruments: Vec<InstrumentAny>) {
182        let mut instruments_cache: AHashMap<Ustr, InstrumentAny> = AHashMap::new();
183        for inst in instruments {
184            instruments_cache.insert(inst.symbol().inner(), inst.clone());
185        }
186
187        self.instruments_cache = Arc::new(instruments_cache);
188    }
189
190    /// Connect to the BitMEX WebSocket server.
191    ///
192    /// # Errors
193    ///
194    /// Returns an error if the WebSocket connection fails or authentication fails (if credentials provided).
195    ///
196    /// # Panics
197    ///
198    /// Panics if subscription or authentication messages fail to serialize to JSON.
199    pub async fn connect(&mut self) -> Result<(), BitmexWsError> {
200        let reader = self.connect_inner().await?;
201
202        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<NautilusWsMessage>();
203        self.rx = Some(Arc::new(rx));
204        let signal = self.signal.clone();
205
206        let instruments_cache = self.instruments_cache.clone();
207        let account_id = self.account_id;
208        let inner_client = self.inner.clone();
209        let credential = self.credential.clone();
210        let subscriptions = self.subscriptions.clone();
211
212        let stream_handle = get_runtime().spawn(async move {
213            let mut handler =
214                BitmexWsMessageHandler::new(reader, signal, tx, instruments_cache, account_id);
215
216            // Run message processing with reconnection handling
217            loop {
218                match handler.next().await {
219                    Some(NautilusWsMessage::Reconnected) => {
220                        log::info!("Reconnecting WebSocket");
221
222                        let inner_guard = inner_client.read().await;
223                        if let Some(inner) = &*inner_guard {
224                            // Re-authenticate if we have credentials
225                            if let Some(cred) = &credential {
226                                let expires = (chrono::Utc::now() + chrono::Duration::seconds(30))
227                                    .timestamp();
228                                let signature = cred.sign("GET", "/realtime", expires, "");
229
230                                let auth_message = BitmexAuthentication {
231                                    op: BitmexWsAuthAction::AuthKeyExpires,
232                                    args: (cred.api_key.to_string(), expires, signature),
233                                };
234
235                                let auth_json = match serde_json::to_string(&auth_message) {
236                                    Ok(json) => json,
237                                    Err(e) => {
238                                        tracing::error!(error = %e, "Failed to serialize auth message");
239                                        continue;
240                                    }
241                                };
242                                if let Err(e) = inner
243                                    .send_text(auth_json, None)
244                                    .await
245                                {
246                                    log::error!(
247                                        "Failed to re-authenticate after reconnection: {e}"
248                                    );
249                                } else {
250                                    log::info!("Re-authenticated after reconnection");
251                                }
252                            }
253
254                            // Always resubscribe to instruments
255                            let subscribe_msg = BitmexSubscription {
256                                op: BitmexWsOperation::Subscribe,
257                                args: vec!["instrument".to_string()],
258                            };
259
260                            let subscribe_json = match serde_json::to_string(&subscribe_msg) {
261                                Ok(json) => json,
262                                Err(e) => {
263                                    tracing::error!(error = %e, "Failed to serialize subscribe message");
264                                    continue;
265                                }
266                            };
267                            if let Err(e) = inner
268                                .send_text(subscribe_json, None)
269                                .await
270                            {
271                                log::error!(
272                                    "Failed to subscribe to instruments after reconnection: {e}"
273                                );
274                            }
275
276                            // Restore all tracked subscriptions
277                            let mut topics_to_restore = Vec::new();
278                            for entry in subscriptions.iter() {
279                                let (channel, symbols) = entry.pair();
280                                if symbols.is_empty() {
281                                    topics_to_restore.push(channel.clone());
282                                } else {
283                                    for symbol in symbols.iter() {
284                                        topics_to_restore.push(format!("{channel}:{symbol}"));
285                                    }
286                                }
287                            }
288
289                            if !topics_to_restore.is_empty() {
290                                let message = BitmexSubscription {
291                                    op: BitmexWsOperation::Subscribe,
292                                    args: topics_to_restore.clone(),
293                                };
294
295                                let msg_json = match serde_json::to_string(&message) {
296                                    Ok(json) => json,
297                                    Err(e) => {
298                                        tracing::error!(error = %e, topics = ?topics_to_restore, "Failed to serialize message");
299                                        continue;
300                                    }
301                                };
302                                if let Err(e) = inner
303                                    .send_text(msg_json, None)
304                                    .await
305                                {
306                                    log::error!(
307                                        "Failed to restore subscriptions after reconnection: {e}"
308                                    );
309                                } else {
310                                    log::info!(
311                                        "Restored {} subscriptions after reconnection",
312                                        topics_to_restore.len()
313                                    );
314                                }
315                            }
316                        }
317                    }
318                    Some(msg) => {
319                        if let Err(e) = handler.tx.send(msg) {
320                            tracing::error!("Error sending message: {e}");
321                            break;
322                        }
323                    }
324                    None => {
325                        // Stream ended - check if it's a stop signal
326                        if handler.handler.signal.load(Ordering::Relaxed) {
327                            tracing::debug!("Stop signal received, ending message processing");
328                            break;
329                        }
330                        // Otherwise it's an unexpected stream end
331                        tracing::warn!("WebSocket stream ended unexpectedly");
332                        break;
333                    }
334                }
335            }
336        });
337
338        self.task_handle = Some(Arc::new(stream_handle));
339
340        {
341            let inner_guard = self.inner.read().await;
342            if let Some(inner) = &*inner_guard {
343                let subscribe_msg = BitmexSubscription {
344                    op: BitmexWsOperation::Subscribe,
345                    args: vec!["instrument".to_string()],
346                };
347
348                match serde_json::to_string(&subscribe_msg) {
349                    Ok(subscribe_json) => {
350                        if let Err(e) = inner.send_text(subscribe_json, None).await {
351                            log::error!("Failed to subscribe to instruments: {e}");
352                        } else {
353                            log::debug!("Subscribed to all instruments");
354                        }
355                    }
356                    Err(e) => {
357                        tracing::error!(error = %e, "Failed to serialize resubscribe message");
358                    }
359                }
360            }
361        }
362
363        Ok(())
364    }
365
366    /// Connect to the WebSocket and return a message receiver.
367    ///
368    /// # Errors
369    ///
370    /// Returns an error if the WebSocket connection fails or if authentication fails (when credentials are provided).
371    async fn connect_inner(
372        &mut self,
373    ) -> Result<tokio::sync::mpsc::UnboundedReceiver<Message>, BitmexWsError> {
374        let (message_handler, rx) = channel_message_handler();
375
376        let config = WebSocketConfig {
377            url: self.url.clone(),
378            headers: vec![(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())],
379            heartbeat: self.heartbeat,
380            heartbeat_msg: None,
381            message_handler: Some(message_handler),
382            ping_handler: None,
383            reconnect_timeout_ms: Some(5_000),
384            reconnect_delay_initial_ms: None, // Use default
385            reconnect_delay_max_ms: None,     // Use default
386            reconnect_backoff_factor: None,   // Use default
387            reconnect_jitter_ms: None,        // Use default
388        };
389
390        let keyed_quotas = vec![];
391        let client = WebSocketClient::connect(
392            config,
393            None, // post_reconnection
394            keyed_quotas,
395            None, // default_quota
396        )
397        .await
398        .map_err(|e| BitmexWsError::ClientError(e.to_string()))?;
399
400        {
401            let mut inner_guard = self.inner.write().await;
402            *inner_guard = Some(client);
403        }
404
405        if self.credential.is_some() {
406            self.authenticate().await?;
407        }
408
409        Ok(rx)
410    }
411
412    /// Authenticate the WebSocket connection using the provided credentials.
413    ///
414    /// # Errors
415    ///
416    /// Returns an error if the WebSocket is not connected, if authentication fails,
417    /// or if credentials are not available.
418    async fn authenticate(&self) -> Result<(), BitmexWsError> {
419        let credential = match &self.credential {
420            Some(credential) => credential,
421            None => {
422                return Err(BitmexWsError::AuthenticationError(
423                    "API credentials not available to authenticate".to_string(),
424                ));
425            }
426        };
427
428        let expires = (chrono::Utc::now() + chrono::Duration::seconds(30)).timestamp();
429        let signature = credential.sign("GET", "/realtime", expires, "");
430
431        let auth_message = BitmexAuthentication {
432            op: BitmexWsAuthAction::AuthKeyExpires,
433            args: (credential.api_key.to_string(), expires, signature),
434        };
435
436        {
437            let inner_guard = self.inner.read().await;
438            if let Some(inner) = &*inner_guard {
439                let auth_json = serde_json::to_string(&auth_message).map_err(|e| {
440                    BitmexWsError::AuthenticationError(format!(
441                        "Failed to serialize auth message: {e}"
442                    ))
443                })?;
444                inner
445                    .send_text(auth_json, None)
446                    .await
447                    .map_err(|e| BitmexWsError::AuthenticationError(e.to_string()))
448            } else {
449                log::error!("Cannot authenticate: not connected");
450                Ok(())
451            }
452        }
453    }
454
455    /// Wait until the WebSocket connection is active.
456    ///
457    /// # Errors
458    ///
459    /// Returns an error if the connection times out.
460    pub async fn wait_until_active(&self, timeout_secs: f64) -> Result<(), BitmexWsError> {
461        let timeout = tokio::time::Duration::from_secs_f64(timeout_secs);
462
463        tokio::time::timeout(timeout, async {
464            while !self.is_active() {
465                tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
466            }
467        })
468        .await
469        .map_err(|_| {
470            BitmexWsError::ClientError(format!(
471                "WebSocket connection timeout after {timeout_secs} seconds"
472            ))
473        })?;
474
475        Ok(())
476    }
477
478    /// Provides the internal stream as a channel-based stream.
479    ///
480    /// # Panics
481    ///
482    /// This function panics:
483    /// - If the websocket is not connected.
484    /// - If `stream` has already been called somewhere else (stream receiver is then taken).
485    pub fn stream(&mut self) -> impl Stream<Item = NautilusWsMessage> + use<> {
486        let rx = self
487            .rx
488            .take()
489            .expect("Stream receiver already taken or not connected");
490        let mut rx = Arc::try_unwrap(rx).expect("Cannot take ownership - other references exist");
491        async_stream::stream! {
492            while let Some(msg) = rx.recv().await {
493                yield msg;
494            }
495        }
496    }
497
498    /// Closes the client.
499    ///
500    /// # Errors
501    ///
502    /// Returns an error if the WebSocket is not connected or if closing fails.
503    ///
504    /// # Panics
505    ///
506    /// Panics if the task handle cannot be unwrapped (should never happen in normal usage).
507    pub async fn close(&mut self) -> Result<(), BitmexWsError> {
508        log::debug!("Starting close process");
509
510        self.signal.store(true, Ordering::Relaxed);
511
512        {
513            let inner_guard = self.inner.read().await;
514            if let Some(inner) = &*inner_guard {
515                log::debug!("Disconnecting websocket");
516
517                match tokio::time::timeout(Duration::from_secs(3), inner.disconnect()).await {
518                    Ok(()) => log::debug!("Websocket disconnected successfully"),
519                    Err(_) => {
520                        log::warn!(
521                            "Timeout waiting for websocket disconnect, continuing with cleanup"
522                        );
523                    }
524                }
525            } else {
526                log::debug!("No active connection to disconnect");
527            }
528        }
529
530        // Clean up task handle with timeout
531        if let Some(task_handle) = self.task_handle.take() {
532            match Arc::try_unwrap(task_handle) {
533                Ok(handle) => {
534                    log::debug!("Waiting for task handle to complete");
535                    match tokio::time::timeout(Duration::from_secs(2), handle).await {
536                        Ok(Ok(())) => log::debug!("Task handle completed successfully"),
537                        Ok(Err(e)) => log::error!("Task handle encountered an error: {e:?}"),
538                        Err(_) => {
539                            log::warn!(
540                                "Timeout waiting for task handle, task may still be running"
541                            );
542                            // The task will be dropped and should clean up automatically
543                        }
544                    }
545                }
546                Err(arc_handle) => {
547                    log::debug!(
548                        "Cannot take ownership of task handle - other references exist, aborting task"
549                    );
550                    arc_handle.abort();
551                }
552            }
553        } else {
554            log::debug!("No task handle to await");
555        }
556
557        log::debug!("Closed");
558
559        Ok(())
560    }
561
562    /// Subscribe to the specified topics.
563    ///
564    /// # Errors
565    ///
566    /// Returns an error if the WebSocket is not connected or if sending the subscription message fails.
567    ///
568    /// # Panics
569    ///
570    /// Panics if serialization of WebSocket messages fails (should never happen).
571    pub async fn subscribe(&self, topics: Vec<String>) -> Result<(), BitmexWsError> {
572        log::debug!("Subscribing to topics: {topics:?}");
573
574        // Track subscriptions
575        for topic in &topics {
576            if let Some((channel, symbol)) = topic.split_once(':') {
577                self.subscriptions
578                    .entry(channel.to_string())
579                    .or_default()
580                    .insert(Ustr::from(symbol));
581            } else {
582                // Topic without symbol (e.g., "execution", "order")
583                self.subscriptions.entry(topic.clone()).or_default();
584            }
585        }
586
587        let message = BitmexSubscription {
588            op: BitmexWsOperation::Subscribe,
589            args: topics.clone(),
590        };
591
592        {
593            let inner_guard = self.inner.read().await;
594            if let Some(inner) = &*inner_guard {
595                let msg_json = serde_json::to_string(&message).map_err(|e| {
596                    BitmexWsError::SubscriptionError(format!(
597                        "Failed to serialize subscribe message: {e}"
598                    ))
599                })?;
600                inner
601                    .send_text(msg_json, None)
602                    .await
603                    .map_err(|e| BitmexWsError::SubscriptionError(e.to_string()))?;
604            } else {
605                log::error!("Cannot send message: not connected");
606            }
607        }
608
609        Ok(())
610    }
611
612    /// Unsubscribe from the specified topics.
613    ///
614    /// # Errors
615    ///
616    /// Returns an error if the WebSocket is not connected or if sending the unsubscription message fails.
617    async fn unsubscribe(&self, topics: Vec<String>) -> Result<(), BitmexWsError> {
618        log::debug!("Attempting to unsubscribe from topics: {topics:?}");
619
620        if self.signal.load(Ordering::Relaxed) {
621            log::debug!("Shutdown signal detected, skipping unsubscribe");
622            return Ok(());
623        }
624
625        for topic in &topics {
626            if let Some((channel, symbol)) = topic.split_once(':') {
627                if let Some(mut entry) = self.subscriptions.get_mut(channel) {
628                    entry.remove(&Ustr::from(symbol));
629                    if entry.is_empty() {
630                        drop(entry);
631                        self.subscriptions.remove(channel);
632                    }
633                }
634            } else {
635                self.subscriptions.remove(topic);
636            }
637        }
638
639        let message = BitmexSubscription {
640            op: BitmexWsOperation::Unsubscribe,
641            args: topics.clone(),
642        };
643
644        {
645            let inner_guard = self.inner.read().await;
646            if let Some(inner) = &*inner_guard {
647                let msg_json = match serde_json::to_string(&message) {
648                    Ok(json) => json,
649                    Err(e) => {
650                        tracing::error!(error = %e, "Failed to serialize unsubscribe message");
651                        return Ok(());
652                    }
653                };
654                if let Err(e) = inner.send_text(msg_json, None).await {
655                    log::debug!("Error sending unsubscribe message: {e}");
656                }
657            } else {
658                log::debug!("Cannot send unsubscribe message: not connected");
659            }
660        }
661
662        Ok(())
663    }
664
665    /// Get the current number of active subscriptions.
666    #[must_use]
667    pub fn subscription_count(&self) -> usize {
668        self.subscriptions.len()
669    }
670
671    /// Get active subscriptions for a specific instrument.
672    #[must_use]
673    pub fn get_subscriptions(&self, instrument_id: InstrumentId) -> Vec<String> {
674        let symbol = instrument_id.symbol.inner();
675        let mut channels = Vec::with_capacity(self.subscriptions.len());
676
677        for entry in self.subscriptions.iter() {
678            let (channel, symbols) = entry.pair();
679            if symbols.contains(&symbol) {
680                // Return the full topic string (e.g., "orderBookL2:XBTUSD")
681                channels.push(format!("{channel}:{symbol}"));
682            } else if symbols.is_empty() && (channel == "execution" || channel == "order") {
683                // These are account-level subscriptions without symbols
684                channels.push(channel.clone());
685            }
686        }
687
688        channels
689    }
690
691    /// Subscribe to instrument updates for all instruments on the venue.
692    ///
693    /// # Errors
694    ///
695    /// Returns an error if the WebSocket is not connected or if the subscription fails.
696    pub async fn subscribe_instruments(&self) -> Result<(), BitmexWsError> {
697        // Already subscribed automatically on connection
698        log::debug!("Already subscribed to all instruments on connection, skipping");
699        Ok(())
700    }
701
702    /// Subscribe to instrument updates (mark/index prices) for the specified instrument.
703    ///
704    /// # Errors
705    ///
706    /// Returns an error if the WebSocket is not connected or if the subscription fails.
707    pub async fn subscribe_instrument(
708        &self,
709        instrument_id: InstrumentId,
710    ) -> Result<(), BitmexWsError> {
711        // Already subscribed to all instruments on connection
712        log::debug!(
713            "Already subscribed to all instruments on connection (includes {instrument_id}), skipping"
714        );
715        Ok(())
716    }
717
718    /// Subscribe to order book updates for the specified instrument.
719    ///
720    /// # Errors
721    ///
722    /// Returns an error if the WebSocket is not connected or if the subscription fails.
723    pub async fn subscribe_book(&self, instrument_id: InstrumentId) -> Result<(), BitmexWsError> {
724        let topic = BitmexWsTopic::OrderBookL2;
725        let symbol = instrument_id.symbol.as_str();
726        self.subscribe(vec![format!("{topic}:{symbol}")]).await
727    }
728
729    /// Subscribe to order book L2 (25 levels) updates for the specified instrument.
730    ///
731    /// # Errors
732    ///
733    /// Returns an error if the WebSocket is not connected or if the subscription fails.
734    pub async fn subscribe_book_25(
735        &self,
736        instrument_id: InstrumentId,
737    ) -> Result<(), BitmexWsError> {
738        let topic = BitmexWsTopic::OrderBookL2_25;
739        let symbol = instrument_id.symbol.as_str();
740        self.subscribe(vec![format!("{topic}:{symbol}")]).await
741    }
742
743    /// Subscribe to order book depth 10 updates for the specified instrument.
744    ///
745    /// # Errors
746    ///
747    /// Returns an error if the WebSocket is not connected or if the subscription fails.
748    pub async fn subscribe_book_depth10(
749        &self,
750        instrument_id: InstrumentId,
751    ) -> Result<(), BitmexWsError> {
752        let topic = BitmexWsTopic::OrderBook10;
753        let symbol = instrument_id.symbol.as_str();
754        self.subscribe(vec![format!("{topic}:{symbol}")]).await
755    }
756
757    /// Subscribe to quote updates for the specified instrument.
758    ///
759    /// Note: Index symbols (starting with '.') do not have quotes and will be silently ignored.
760    ///
761    /// # Errors
762    ///
763    /// Returns an error if the WebSocket is not connected or if the subscription fails.
764    pub async fn subscribe_quotes(&self, instrument_id: InstrumentId) -> Result<(), BitmexWsError> {
765        let symbol = instrument_id.symbol.inner();
766
767        // Index symbols don't have quotes (bid/ask), only a single price
768        if is_index_symbol(&instrument_id.symbol.inner()) {
769            tracing::warn!("Ignoring quote subscription for index symbol: {symbol}");
770            return Ok(());
771        }
772
773        let topic = BitmexWsTopic::Quote;
774        self.subscribe(vec![format!("{topic}:{symbol}")]).await
775    }
776
777    /// Subscribe to trade updates for the specified instrument.
778    ///
779    /// Note: Index symbols (starting with '.') do not have trades and will be silently ignored.
780    ///
781    /// # Errors
782    ///
783    /// Returns an error if the WebSocket is not connected or if the subscription fails.
784    pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> Result<(), BitmexWsError> {
785        let symbol = instrument_id.symbol.inner();
786
787        // Index symbols don't have trades
788        if is_index_symbol(&symbol) {
789            tracing::warn!("Ignoring trade subscription for index symbol: {symbol}");
790            return Ok(());
791        }
792
793        let topic = BitmexWsTopic::Trade;
794        self.subscribe(vec![format!("{topic}:{symbol}")]).await
795    }
796
797    /// Subscribe to mark price updates for the specified instrument.
798    ///
799    /// # Errors
800    ///
801    /// Returns an error if the WebSocket is not connected or if the subscription fails.
802    pub async fn subscribe_mark_prices(
803        &self,
804        instrument_id: InstrumentId,
805    ) -> Result<(), BitmexWsError> {
806        self.subscribe_instrument(instrument_id).await
807    }
808
809    /// Subscribe to index price updates for the specified instrument.
810    ///
811    /// # Errors
812    ///
813    /// Returns an error if the WebSocket is not connected or if the subscription fails.
814    pub async fn subscribe_index_prices(
815        &self,
816        instrument_id: InstrumentId,
817    ) -> Result<(), BitmexWsError> {
818        self.subscribe_instrument(instrument_id).await
819    }
820
821    /// Subscribe to funding rate updates for the specified instrument.
822    ///
823    /// # Errors
824    ///
825    /// Returns an error if the WebSocket is not connected or if the subscription fails.
826    pub async fn subscribe_funding_rates(
827        &self,
828        instrument_id: InstrumentId,
829    ) -> Result<(), BitmexWsError> {
830        let topic = BitmexWsTopic::Funding;
831        let symbol = instrument_id.symbol.as_str();
832        self.subscribe(vec![format!("{topic}:{symbol}")]).await
833    }
834
835    /// Subscribe to bar updates for the specified bar type.
836    ///
837    /// # Errors
838    ///
839    /// Returns an error if the WebSocket is not connected or if the subscription fails.
840    pub async fn subscribe_bars(&self, bar_type: BarType) -> Result<(), BitmexWsError> {
841        let topic = topic_from_bar_spec(bar_type.spec());
842        let symbol = bar_type.instrument_id().symbol.to_string();
843        self.subscribe(vec![format!("{topic}:{symbol}")]).await
844    }
845
846    /// Unsubscribe from instrument updates for all instruments on the venue.
847    ///
848    /// # Errors
849    ///
850    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
851    pub async fn unsubscribe_instruments(&self) -> Result<(), BitmexWsError> {
852        // No-op: instruments are required for proper operation
853        log::debug!(
854            "Instruments subscription maintained for proper operation, skipping unsubscribe"
855        );
856        Ok(())
857    }
858
859    /// Unsubscribe from instrument updates (mark/index prices) for the specified instrument.
860    ///
861    /// # Errors
862    ///
863    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
864    pub async fn unsubscribe_instrument(
865        &self,
866        instrument_id: InstrumentId,
867    ) -> Result<(), BitmexWsError> {
868        // No-op: instruments are required for proper operation
869        log::debug!(
870            "Instruments subscription maintained for proper operation (includes {instrument_id}), skipping unsubscribe"
871        );
872        Ok(())
873    }
874
875    /// Unsubscribe from order book updates for the specified instrument.
876    ///
877    /// # Errors
878    ///
879    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
880    pub async fn unsubscribe_book(&self, instrument_id: InstrumentId) -> Result<(), BitmexWsError> {
881        let topic = BitmexWsTopic::OrderBookL2;
882        let symbol = instrument_id.symbol.as_str();
883        self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
884    }
885
886    /// Unsubscribe from order book L2 (25 levels) updates for the specified instrument.
887    ///
888    /// # Errors
889    ///
890    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
891    pub async fn unsubscribe_book_25(
892        &self,
893        instrument_id: InstrumentId,
894    ) -> Result<(), BitmexWsError> {
895        let topic = BitmexWsTopic::OrderBookL2_25;
896        let symbol = instrument_id.symbol.as_str();
897        self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
898    }
899
900    /// Unsubscribe from order book depth 10 updates for the specified instrument.
901    ///
902    /// # Errors
903    ///
904    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
905    pub async fn unsubscribe_book_depth10(
906        &self,
907        instrument_id: InstrumentId,
908    ) -> Result<(), BitmexWsError> {
909        let topic = BitmexWsTopic::OrderBook10;
910        let symbol = instrument_id.symbol.as_str();
911        self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
912    }
913
914    /// Unsubscribe from quote updates for the specified instrument.
915    ///
916    /// # Errors
917    ///
918    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
919    pub async fn unsubscribe_quotes(
920        &self,
921        instrument_id: InstrumentId,
922    ) -> Result<(), BitmexWsError> {
923        let symbol = instrument_id.symbol.inner();
924
925        // Index symbols don't have quotes
926        if is_index_symbol(&symbol) {
927            return Ok(());
928        }
929
930        let topic = BitmexWsTopic::Quote;
931        self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
932    }
933
934    /// Unsubscribe from trade updates for the specified instrument.
935    ///
936    /// # Errors
937    ///
938    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
939    pub async fn unsubscribe_trades(
940        &self,
941        instrument_id: InstrumentId,
942    ) -> Result<(), BitmexWsError> {
943        let symbol = instrument_id.symbol.inner();
944
945        // Index symbols don't have trades
946        if is_index_symbol(&symbol) {
947            return Ok(());
948        }
949
950        let topic = BitmexWsTopic::Trade;
951        self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
952    }
953
954    /// Unsubscribe from mark price updates for the specified instrument.
955    ///
956    /// # Errors
957    ///
958    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
959    pub async fn unsubscribe_mark_prices(
960        &self,
961        instrument_id: InstrumentId,
962    ) -> Result<(), BitmexWsError> {
963        // No-op: instrument channel shared with index prices
964        log::debug!(
965            "Mark prices for {instrument_id} uses shared instrument channel, skipping unsubscribe"
966        );
967        Ok(())
968    }
969
970    /// Unsubscribe from index price updates for the specified instrument.
971    ///
972    /// # Errors
973    ///
974    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
975    pub async fn unsubscribe_index_prices(
976        &self,
977        instrument_id: InstrumentId,
978    ) -> Result<(), BitmexWsError> {
979        // No-op: instrument channel shared with mark prices
980        log::debug!(
981            "Index prices for {instrument_id} uses shared instrument channel, skipping unsubscribe"
982        );
983        Ok(())
984    }
985
986    /// Unsubscribe from funding rate updates for the specified instrument.
987    ///
988    /// # Errors
989    ///
990    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
991    pub async fn unsubscribe_funding_rates(
992        &self,
993        instrument_id: InstrumentId,
994    ) -> Result<(), BitmexWsError> {
995        // No-op: unsubscribing during shutdown causes race conditions
996        log::debug!(
997            "Funding rates for {instrument_id}, skipping unsubscribe to avoid shutdown race"
998        );
999        Ok(())
1000    }
1001
1002    /// Unsubscribe from bar updates for the specified bar type.
1003    ///
1004    /// # Errors
1005    ///
1006    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1007    pub async fn unsubscribe_bars(&self, bar_type: BarType) -> Result<(), BitmexWsError> {
1008        let topic = topic_from_bar_spec(bar_type.spec());
1009        let symbol = bar_type.instrument_id().symbol.to_string();
1010        self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
1011    }
1012
1013    /// Subscribe to order updates for the authenticated account.
1014    ///
1015    /// # Errors
1016    ///
1017    /// Returns an error if the WebSocket is not connected, not authenticated, or if the subscription fails.
1018    pub async fn subscribe_orders(&self) -> Result<(), BitmexWsError> {
1019        if self.credential.is_none() {
1020            return Err(BitmexWsError::MissingCredentials);
1021        }
1022        self.subscribe(vec![BitmexWsAuthChannel::Order.to_string()])
1023            .await
1024    }
1025
1026    /// Subscribe to execution updates for the authenticated account.
1027    ///
1028    /// # Errors
1029    ///
1030    /// Returns an error if the WebSocket is not connected, not authenticated, or if the subscription fails.
1031    pub async fn subscribe_executions(&self) -> Result<(), BitmexWsError> {
1032        if self.credential.is_none() {
1033            return Err(BitmexWsError::MissingCredentials);
1034        }
1035        self.subscribe(vec![BitmexWsAuthChannel::Execution.to_string()])
1036            .await
1037    }
1038
1039    /// Subscribe to position updates for the authenticated account.
1040    ///
1041    /// # Errors
1042    ///
1043    /// Returns an error if the WebSocket is not connected, not authenticated, or if the subscription fails.
1044    pub async fn subscribe_positions(&self) -> Result<(), BitmexWsError> {
1045        if self.credential.is_none() {
1046            return Err(BitmexWsError::MissingCredentials);
1047        }
1048        self.subscribe(vec![BitmexWsAuthChannel::Position.to_string()])
1049            .await
1050    }
1051
1052    /// Subscribe to margin updates for the authenticated account.
1053    ///
1054    /// # Errors
1055    ///
1056    /// Returns an error if the WebSocket is not connected, not authenticated, or if the subscription fails.
1057    pub async fn subscribe_margin(&self) -> Result<(), BitmexWsError> {
1058        if self.credential.is_none() {
1059            return Err(BitmexWsError::MissingCredentials);
1060        }
1061        self.subscribe(vec![BitmexWsAuthChannel::Margin.to_string()])
1062            .await
1063    }
1064
1065    /// Subscribe to wallet updates for the authenticated account.
1066    ///
1067    /// # Errors
1068    ///
1069    /// Returns an error if the WebSocket is not connected, not authenticated, or if the subscription fails.
1070    pub async fn subscribe_wallet(&self) -> Result<(), BitmexWsError> {
1071        if self.credential.is_none() {
1072            return Err(BitmexWsError::MissingCredentials);
1073        }
1074        self.subscribe(vec![BitmexWsAuthChannel::Wallet.to_string()])
1075            .await
1076    }
1077
1078    /// Unsubscribe from order updates for the authenticated account.
1079    ///
1080    /// # Errors
1081    ///
1082    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1083    pub async fn unsubscribe_orders(&self) -> Result<(), BitmexWsError> {
1084        self.unsubscribe(vec![BitmexWsAuthChannel::Order.to_string()])
1085            .await
1086    }
1087
1088    /// Unsubscribe from execution updates for the authenticated account.
1089    ///
1090    /// # Errors
1091    ///
1092    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1093    pub async fn unsubscribe_executions(&self) -> Result<(), BitmexWsError> {
1094        self.unsubscribe(vec![BitmexWsAuthChannel::Execution.to_string()])
1095            .await
1096    }
1097
1098    /// Unsubscribe from position updates for the authenticated account.
1099    ///
1100    /// # Errors
1101    ///
1102    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1103    pub async fn unsubscribe_positions(&self) -> Result<(), BitmexWsError> {
1104        self.unsubscribe(vec![BitmexWsAuthChannel::Position.to_string()])
1105            .await
1106    }
1107
1108    /// Unsubscribe from margin updates for the authenticated account.
1109    ///
1110    /// # Errors
1111    ///
1112    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1113    pub async fn unsubscribe_margin(&self) -> Result<(), BitmexWsError> {
1114        self.unsubscribe(vec![BitmexWsAuthChannel::Margin.to_string()])
1115            .await
1116    }
1117
1118    /// Unsubscribe from wallet updates for the authenticated account.
1119    ///
1120    /// # Errors
1121    ///
1122    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1123    pub async fn unsubscribe_wallet(&self) -> Result<(), BitmexWsError> {
1124        self.unsubscribe(vec![BitmexWsAuthChannel::Wallet.to_string()])
1125            .await
1126    }
1127}
1128
1129struct BitmexFeedHandler {
1130    receiver: tokio::sync::mpsc::UnboundedReceiver<Message>,
1131    signal: Arc<AtomicBool>,
1132}
1133
1134impl BitmexFeedHandler {
1135    /// Creates a new [`BitmexFeedHandler`] instance.
1136    pub fn new(
1137        receiver: tokio::sync::mpsc::UnboundedReceiver<Message>,
1138        signal: Arc<AtomicBool>,
1139    ) -> Self {
1140        Self { receiver, signal }
1141    }
1142
1143    /// Get the next message from the WebSocket stream.
1144    async fn next(&mut self) -> Option<BitmexWsMessage> {
1145        loop {
1146            tokio::select! {
1147                msg = self.receiver.recv() => match msg {
1148                    Some(msg) => match msg {
1149                        Message::Text(text) => {
1150                            if text == RECONNECTED {
1151                                tracing::info!("Received WebSocket reconnection signal");
1152                                return Some(BitmexWsMessage::Reconnected);
1153                            }
1154
1155                            tracing::trace!("Raw websocket message: {text}");
1156
1157                            match serde_json::from_str(&text) {
1158                                Ok(msg) => match &msg {
1159                                    BitmexWsMessage::Welcome {
1160                                        version,
1161                                        heartbeat_enabled,
1162                                        limit,
1163                                        ..
1164                                    } => {
1165                                        tracing::info!(
1166                                            version = version,
1167                                            heartbeat = heartbeat_enabled,
1168                                            rate_limit = limit.remaining,
1169                                            "Welcome to the BitMEX Realtime API:",
1170                                        );
1171                                    }
1172                                    BitmexWsMessage::Subscription {
1173                                        success: _,
1174                                        subscribe: _,
1175                                        error,
1176                                    } => {
1177                                        if let Some(error) = error {
1178                                            tracing::error!("Subscription error: {error}");
1179                                        }
1180                                    }
1181                                    BitmexWsMessage::Error { status, error, .. } => {
1182                                        tracing::error!(
1183                                            status = status,
1184                                            error = error,
1185                                            "Received error from BitMEX"
1186                                        );
1187                                    }
1188                                    _ => return Some(msg),
1189                                },
1190                                Err(e) => {
1191                                    tracing::error!("Failed to parse WebSocket message: {e}: {text}");
1192                                }
1193                            }
1194                        }
1195                        Message::Binary(msg) => {
1196                            tracing::debug!("Raw binary: {msg:?}");
1197                        }
1198                        Message::Close(_) => {
1199                            tracing::debug!("Received close message");
1200                            return None;
1201                        }
1202                        msg => match msg {
1203                            Message::Ping(data) => {
1204                                tracing::trace!("Received ping frame with {} bytes", data.len());
1205                            }
1206                            Message::Pong(data) => {
1207                                tracing::trace!("Received pong frame with {} bytes", data.len());
1208                            }
1209                            Message::Frame(frame) => {
1210                                tracing::debug!("Received raw frame: {frame:?}");
1211                            }
1212                            _ => {
1213                                tracing::warn!("Unexpected message type: {msg:?}");
1214                            }
1215                        },
1216                    }
1217                    None => {
1218                        tracing::info!("WebSocket stream closed");
1219                        return None;
1220                    }
1221                },
1222                _ = tokio::time::sleep(Duration::from_millis(1)) => {
1223                    if self.signal.load(std::sync::atomic::Ordering::Relaxed) {
1224                        tracing::debug!("Stop signal received");
1225                        return None;
1226                    }
1227                }
1228            }
1229        }
1230    }
1231}
1232
1233struct BitmexWsMessageHandler {
1234    handler: BitmexFeedHandler,
1235    tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
1236    instruments_cache: Arc<AHashMap<Ustr, InstrumentAny>>,
1237    #[allow(dead_code)] // May be needed for future account-specific processing
1238    account_id: AccountId,
1239}
1240
1241impl BitmexWsMessageHandler {
1242    /// Creates a new [`BitmexWsMessageHandler`] instance.
1243    pub fn new(
1244        receiver: tokio::sync::mpsc::UnboundedReceiver<Message>,
1245        signal: Arc<AtomicBool>,
1246        tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
1247        instruments_cache: Arc<AHashMap<Ustr, InstrumentAny>>,
1248        account_id: AccountId,
1249    ) -> Self {
1250        let handler = BitmexFeedHandler::new(receiver, signal);
1251        Self {
1252            handler,
1253            tx,
1254            instruments_cache,
1255            account_id,
1256        }
1257    }
1258
1259    // Run is now handled inline in the connect() method where we have access to reconnection resources
1260
1261    /// Get price precision for a symbol from the instruments cache.
1262    ///
1263    /// # Panics
1264    ///
1265    /// Panics if the instrument is not found in the cache.
1266    #[inline]
1267    fn get_price_precision(&self, symbol: &Ustr) -> u8 {
1268        self.instruments_cache
1269            .get(symbol).map_or_else(|| panic!("Instrument '{symbol}' not found in cache; ensure all instruments are loaded before starting websocket"), Instrument::price_precision)
1270    }
1271
1272    async fn next(&mut self) -> Option<NautilusWsMessage> {
1273        let clock = get_atomic_clock_realtime();
1274        let mut quote_cache = QuoteCache::new();
1275
1276        while let Some(msg) = self.handler.next().await {
1277            match msg {
1278                BitmexWsMessage::Reconnected => {
1279                    // Return reconnection signal to outer loop
1280                    return Some(NautilusWsMessage::Reconnected);
1281                }
1282                BitmexWsMessage::Table(table_msg) => {
1283                    let ts_init = clock.get_time_ns();
1284
1285                    return Some(match table_msg {
1286                        BitmexTableMessage::OrderBookL2 { action, data } => {
1287                            if data.is_empty() {
1288                                continue;
1289                            }
1290                            let price_precision = self.get_price_precision(&data[0].symbol);
1291                            let data = parse_book_msg_vec(data, action, price_precision, ts_init);
1292
1293                            NautilusWsMessage::Data(data)
1294                        }
1295                        BitmexTableMessage::OrderBookL2_25 { action, data } => {
1296                            if data.is_empty() {
1297                                continue;
1298                            }
1299                            let price_precision = self.get_price_precision(&data[0].symbol);
1300                            let data = parse_book_msg_vec(data, action, price_precision, ts_init);
1301
1302                            NautilusWsMessage::Data(data)
1303                        }
1304                        BitmexTableMessage::OrderBook10 { data, .. } => {
1305                            if data.is_empty() {
1306                                continue;
1307                            }
1308                            let price_precision = self.get_price_precision(&data[0].symbol);
1309                            let data = parse_book10_msg_vec(data, price_precision, ts_init);
1310
1311                            NautilusWsMessage::Data(data)
1312                        }
1313                        BitmexTableMessage::Quote { mut data, .. } => {
1314                            // Index symbols may return empty quote data
1315                            if data.is_empty() {
1316                                continue;
1317                            }
1318
1319                            let msg = data.remove(0);
1320                            let price_precision = self.get_price_precision(&msg.symbol);
1321
1322                            if let Some(quote) = quote_cache.process(&msg, price_precision, ts_init)
1323                            {
1324                                NautilusWsMessage::Data(vec![Data::Quote(quote)])
1325                            } else {
1326                                continue;
1327                            }
1328                        }
1329                        BitmexTableMessage::Trade { data, .. } => {
1330                            if data.is_empty() {
1331                                continue;
1332                            }
1333                            let price_precision = self.get_price_precision(&data[0].symbol);
1334                            let data = parse_trade_msg_vec(data, price_precision, ts_init);
1335
1336                            NautilusWsMessage::Data(data)
1337                        }
1338                        BitmexTableMessage::TradeBin1m { action, data } => {
1339                            if action == BitmexAction::Partial || data.is_empty() {
1340                                continue;
1341                            }
1342                            let price_precision = self.get_price_precision(&data[0].symbol);
1343                            let data = parse_trade_bin_msg_vec(
1344                                data,
1345                                BitmexWsTopic::TradeBin1m,
1346                                price_precision,
1347                                ts_init,
1348                            );
1349
1350                            NautilusWsMessage::Data(data)
1351                        }
1352                        BitmexTableMessage::TradeBin5m { action, data } => {
1353                            if action == BitmexAction::Partial || data.is_empty() {
1354                                continue;
1355                            }
1356                            let price_precision = self.get_price_precision(&data[0].symbol);
1357                            let data = parse_trade_bin_msg_vec(
1358                                data,
1359                                BitmexWsTopic::TradeBin5m,
1360                                price_precision,
1361                                ts_init,
1362                            );
1363
1364                            NautilusWsMessage::Data(data)
1365                        }
1366                        BitmexTableMessage::TradeBin1h { action, data } => {
1367                            if action == BitmexAction::Partial || data.is_empty() {
1368                                continue;
1369                            }
1370                            let price_precision = self.get_price_precision(&data[0].symbol);
1371                            let data = parse_trade_bin_msg_vec(
1372                                data,
1373                                BitmexWsTopic::TradeBin1h,
1374                                price_precision,
1375                                ts_init,
1376                            );
1377
1378                            NautilusWsMessage::Data(data)
1379                        }
1380                        BitmexTableMessage::TradeBin1d { action, data } => {
1381                            if action == BitmexAction::Partial || data.is_empty() {
1382                                continue;
1383                            }
1384                            let price_precision = self.get_price_precision(&data[0].symbol);
1385                            let data = parse_trade_bin_msg_vec(
1386                                data,
1387                                BitmexWsTopic::TradeBin1d,
1388                                price_precision,
1389                                ts_init,
1390                            );
1391
1392                            NautilusWsMessage::Data(data)
1393                        }
1394                        // Execution messages
1395                        // Note: BitMEX may send duplicate order status updates for the same order
1396                        // (e.g., immediate response + stream update). This is expected behavior.
1397                        BitmexTableMessage::Order { data, .. } => {
1398                            // Process all orders in the message
1399                            let mut reports = Vec::with_capacity(data.len());
1400
1401                            for order_data in data {
1402                                match order_data {
1403                                    OrderData::Full(order_msg) => {
1404                                        let price_precision =
1405                                            self.get_price_precision(&order_msg.symbol);
1406                                        match parse_order_msg(&order_msg, price_precision) {
1407                                            Ok(report) => reports.push(report),
1408                                            Err(e) => {
1409                                                tracing::error!(
1410                                                    error = %e,
1411                                                    symbol = %order_msg.symbol,
1412                                                    order_id = %order_msg.order_id,
1413                                                    time_in_force = ?order_msg.time_in_force,
1414                                                    "Failed to parse full order message - potential data loss"
1415                                                );
1416                                                // TODO: Add metric counter for parse failures
1417                                                continue;
1418                                            }
1419                                        }
1420                                    }
1421                                    OrderData::Update(msg) => {
1422                                        let price_precision = self.get_price_precision(&msg.symbol);
1423
1424                                        if let Some(event) = parse_order_update_msg(
1425                                            &msg,
1426                                            price_precision,
1427                                            self.account_id,
1428                                        ) {
1429                                            return Some(NautilusWsMessage::OrderUpdated(event));
1430                                        } else {
1431                                            tracing::warn!(
1432                                                order_id = %msg.order_id,
1433                                                price = ?msg.price,
1434                                                "Skipped order update message (insufficient data)"
1435                                            );
1436                                        }
1437                                    }
1438                                }
1439                            }
1440
1441                            if reports.is_empty() {
1442                                continue;
1443                            }
1444
1445                            NautilusWsMessage::OrderStatusReports(reports)
1446                        }
1447                        BitmexTableMessage::Execution { data, .. } => {
1448                            let mut fills = Vec::with_capacity(data.len());
1449
1450                            for exec_msg in data {
1451                                // Skip if symbol is missing (shouldn't happen for valid trades)
1452                                let Some(symbol) = &exec_msg.symbol else {
1453                                    // Only log as warn for Trade executions, debug for others to reduce noise
1454                                    if exec_msg.exec_type == Some(BitmexExecType::Trade) {
1455                                        tracing::warn!(
1456                                            "Execution message missing symbol: {:?}",
1457                                            exec_msg.exec_id
1458                                        );
1459                                    } else {
1460                                        tracing::debug!(
1461                                            "Execution message missing symbol: {:?} (exec_type: {:?})",
1462                                            exec_msg.exec_id,
1463                                            exec_msg.exec_type
1464                                        );
1465                                    }
1466                                    continue;
1467                                };
1468                                let price_precision = self.get_price_precision(symbol);
1469
1470                                if let Some(fill) = parse_execution_msg(exec_msg, price_precision) {
1471                                    fills.push(fill);
1472                                }
1473                            }
1474
1475                            if fills.is_empty() {
1476                                continue;
1477                            }
1478                            NautilusWsMessage::FillReports(fills)
1479                        }
1480                        BitmexTableMessage::Position { data, .. } => {
1481                            if let Some(pos_msg) = data.into_iter().next() {
1482                                let report = parse_position_msg(pos_msg);
1483                                NautilusWsMessage::PositionStatusReport(report)
1484                            } else {
1485                                continue;
1486                            }
1487                        }
1488                        BitmexTableMessage::Wallet { data, .. } => {
1489                            if let Some(wallet_msg) = data.into_iter().next() {
1490                                let account_state = parse_wallet_msg(wallet_msg, ts_init);
1491                                NautilusWsMessage::AccountState(account_state)
1492                            } else {
1493                                continue;
1494                            }
1495                        }
1496                        BitmexTableMessage::Margin { .. } => {
1497                            // TODO: Implement proper margin parsing with instrument_id
1498                            // For now, we'll skip margin messages as they need an instrument_id
1499                            // which requires more context about the position
1500                            continue;
1501                        }
1502                        BitmexTableMessage::Instrument { data, .. } => {
1503                            let ts_init = clock.get_time_ns();
1504                            let mut data_msgs = Vec::with_capacity(data.len());
1505
1506                            for msg in data {
1507                                let parsed =
1508                                    parse_instrument_msg(msg, &self.instruments_cache, ts_init);
1509                                data_msgs.extend(parsed);
1510                            }
1511
1512                            if data_msgs.is_empty() {
1513                                continue;
1514                            }
1515                            NautilusWsMessage::Data(data_msgs)
1516                        }
1517                        BitmexTableMessage::Funding { data, .. } => {
1518                            let ts_init = clock.get_time_ns();
1519                            let mut funding_updates = Vec::with_capacity(data.len());
1520
1521                            for msg in data {
1522                                if let Some(parsed) = parse_funding_msg(msg, ts_init) {
1523                                    funding_updates.push(parsed);
1524                                }
1525                            }
1526
1527                            if !funding_updates.is_empty() {
1528                                NautilusWsMessage::FundingRateUpdates(funding_updates)
1529                            } else {
1530                                continue;
1531                            }
1532                        }
1533                        _ => {
1534                            // Other message types not yet implemented
1535                            tracing::warn!("Unhandled table message type: {table_msg:?}");
1536                            continue;
1537                        }
1538                    });
1539                }
1540                _ => {
1541                    // Other BitmexWsMessage types (Welcome, Subscription, Error) are
1542                    // already handled in BitmexFeedHandler::next()
1543                    continue;
1544                }
1545            }
1546        }
1547
1548        None
1549    }
1550}
1551
1552////////////////////////////////////////////////////////////////////////////////
1553// Tests
1554////////////////////////////////////////////////////////////////////////////////
1555
1556#[cfg(test)]
1557mod tests {
1558    use ahash::AHashSet;
1559    use rstest::rstest;
1560    use ustr::Ustr;
1561
1562    use super::*;
1563
1564    #[rstest]
1565    fn test_reconnect_topics_restoration_logic() {
1566        // Create real client with credentials
1567        let client = BitmexWebSocketClient::new(
1568            Some("ws://test.com".to_string()),
1569            Some("test_key".to_string()),
1570            Some("test_secret".to_string()),
1571            Some(AccountId::new("BITMEX-TEST")),
1572            None,
1573        )
1574        .unwrap();
1575
1576        // Populate subscriptions like they would be during normal operation
1577        client.subscriptions.insert("trade".to_string(), {
1578            let mut set = AHashSet::new();
1579            set.insert(Ustr::from("XBTUSD"));
1580            set.insert(Ustr::from("ETHUSD"));
1581            set
1582        });
1583
1584        client.subscriptions.insert("orderBookL2".to_string(), {
1585            let mut set = AHashSet::new();
1586            set.insert(Ustr::from("XBTUSD"));
1587            set
1588        });
1589
1590        // Private channels (no symbols)
1591        client
1592            .subscriptions
1593            .insert("order".to_string(), AHashSet::new());
1594        client
1595            .subscriptions
1596            .insert("position".to_string(), AHashSet::new());
1597
1598        // Test the actual reconnection topic building logic from lines 258-268
1599        let mut topics_to_restore = Vec::new();
1600        for entry in client.subscriptions.iter() {
1601            let (channel, symbols) = entry.pair();
1602            if symbols.is_empty() {
1603                topics_to_restore.push(channel.clone());
1604            } else {
1605                for symbol in symbols.iter() {
1606                    topics_to_restore.push(format!("{channel}:{symbol}"));
1607                }
1608            }
1609        }
1610
1611        // Verify it builds the correct restoration topics
1612        assert!(topics_to_restore.contains(&"trade:XBTUSD".to_string()));
1613        assert!(topics_to_restore.contains(&"trade:ETHUSD".to_string()));
1614        assert!(topics_to_restore.contains(&"orderBookL2:XBTUSD".to_string()));
1615        assert!(topics_to_restore.contains(&"order".to_string()));
1616        assert!(topics_to_restore.contains(&"position".to_string()));
1617        assert_eq!(topics_to_restore.len(), 5);
1618    }
1619
1620    #[rstest]
1621    fn test_reconnect_auth_message_building() {
1622        // Test with credentials
1623        let client_with_creds = BitmexWebSocketClient::new(
1624            Some("ws://test.com".to_string()),
1625            Some("test_key".to_string()),
1626            Some("test_secret".to_string()),
1627            Some(AccountId::new("BITMEX-TEST")),
1628            None,
1629        )
1630        .unwrap();
1631
1632        // Test the actual auth message building logic from lines 220-228
1633        if let Some(cred) = &client_with_creds.credential {
1634            let expires = (chrono::Utc::now() + chrono::Duration::seconds(30)).timestamp();
1635            let signature = cred.sign("GET", "/realtime", expires, "");
1636
1637            let auth_message = BitmexAuthentication {
1638                op: BitmexWsAuthAction::AuthKeyExpires,
1639                args: (cred.api_key.to_string(), expires, signature),
1640            };
1641
1642            // Verify auth message structure
1643            assert_eq!(auth_message.op, BitmexWsAuthAction::AuthKeyExpires);
1644            assert_eq!(auth_message.args.0, "test_key");
1645            assert!(auth_message.args.1 > 0); // expires should be positive
1646            assert!(!auth_message.args.2.is_empty()); // signature should exist
1647        } else {
1648            panic!("Client should have credentials");
1649        }
1650
1651        // Test without credentials
1652        let client_no_creds = BitmexWebSocketClient::new(
1653            Some("ws://test.com".to_string()),
1654            None,
1655            None,
1656            Some(AccountId::new("BITMEX-TEST")),
1657            None,
1658        )
1659        .unwrap();
1660
1661        assert!(client_no_creds.credential.is_none());
1662    }
1663
1664    #[rstest]
1665    fn test_subscription_state_after_unsubscribe() {
1666        let client = BitmexWebSocketClient::new(
1667            Some("ws://test.com".to_string()),
1668            Some("test_key".to_string()),
1669            Some("test_secret".to_string()),
1670            Some(AccountId::new("BITMEX-TEST")),
1671            None,
1672        )
1673        .unwrap();
1674
1675        // Set up initial subscriptions
1676        client.subscriptions.insert("trade".to_string(), {
1677            let mut set = AHashSet::new();
1678            set.insert(Ustr::from("XBTUSD"));
1679            set.insert(Ustr::from("ETHUSD"));
1680            set
1681        });
1682
1683        client.subscriptions.insert("orderBookL2".to_string(), {
1684            let mut set = AHashSet::new();
1685            set.insert(Ustr::from("XBTUSD"));
1686            set
1687        });
1688
1689        // Simulate unsubscribe logic (like from unsubscribe() method lines 586-599)
1690        let topic = "trade:ETHUSD";
1691        if let Some((channel, symbol)) = topic.split_once(':')
1692            && let Some(mut entry) = client.subscriptions.get_mut(channel)
1693        {
1694            entry.remove(&Ustr::from(symbol));
1695            if entry.is_empty() {
1696                drop(entry);
1697                client.subscriptions.remove(channel);
1698            }
1699        }
1700
1701        // Build restoration topics after unsubscribe
1702        let mut topics_to_restore = Vec::new();
1703        for entry in client.subscriptions.iter() {
1704            let (channel, symbols) = entry.pair();
1705            if symbols.is_empty() {
1706                topics_to_restore.push(channel.clone());
1707            } else {
1708                for symbol in symbols.iter() {
1709                    topics_to_restore.push(format!("{channel}:{symbol}"));
1710                }
1711            }
1712        }
1713
1714        // Should have XBTUSD trade but not ETHUSD trade
1715        assert!(topics_to_restore.contains(&"trade:XBTUSD".to_string()));
1716        assert!(!topics_to_restore.contains(&"trade:ETHUSD".to_string()));
1717        assert!(topics_to_restore.contains(&"orderBookL2:XBTUSD".to_string()));
1718        assert_eq!(topics_to_restore.len(), 2);
1719    }
1720}