nautilus_bitmex/websocket/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides the WebSocket client integration for the [BitMEX](https://bitmex.com) WebSocket API.
17//!
18//! This module defines and implements a [`BitmexWebSocketClient`] for
19//! connecting to BitMEX WebSocket streams. It handles authentication (when credentials
20//! are provided), manages subscriptions to market data and account update channels,
21//! and parses incoming messages into structured Nautilus domain objects.
22
23use std::{
24    collections::HashSet,
25    sync::{
26        Arc,
27        atomic::{AtomicBool, Ordering},
28    },
29};
30
31use ahash::{AHashMap, AHashSet};
32use dashmap::DashMap;
33use futures_util::Stream;
34use nautilus_common::runtime::get_runtime;
35use nautilus_core::{
36    consts::NAUTILUS_USER_AGENT, env::get_env_var, time::get_atomic_clock_realtime,
37};
38use nautilus_model::{
39    data::{Data, bar::BarType},
40    enums::{OrderStatus, OrderType},
41    identifiers::{AccountId, ClientOrderId, InstrumentId},
42    instruments::{Instrument, InstrumentAny},
43};
44use nautilus_network::{
45    RECONNECTED,
46    websocket::{PingHandler, WebSocketClient, WebSocketConfig, channel_message_handler},
47};
48use reqwest::header::USER_AGENT;
49use tokio::{sync::RwLock, time::Duration};
50use tokio_tungstenite::tungstenite::Message;
51use ustr::Ustr;
52
53use super::{
54    cache::QuoteCache,
55    enums::{
56        BitmexAction, BitmexWsAuthAction, BitmexWsAuthChannel, BitmexWsOperation, BitmexWsTopic,
57    },
58    error::BitmexWsError,
59    messages::{
60        BitmexAuthentication, BitmexHttpRequest, BitmexSubscription, BitmexTableMessage,
61        BitmexWsMessage, NautilusWsMessage, OrderData,
62    },
63    parse::{
64        is_index_symbol, parse_book_msg_vec, parse_book10_msg_vec, parse_order_update_msg,
65        parse_trade_bin_msg_vec, parse_trade_msg_vec, parse_wallet_msg, topic_from_bar_spec,
66    },
67};
68use crate::{
69    common::{consts::BITMEX_WS_URL, credential::Credential, enums::BitmexExecType},
70    websocket::{
71        auth::{AUTHENTICATION_TIMEOUT_SECS, AuthResultReceiver, AuthTracker},
72        parse::{
73            parse_execution_msg, parse_funding_msg, parse_instrument_msg, parse_order_msg,
74            parse_position_msg,
75        },
76        subscription::SubscriptionState,
77    },
78};
79
80/// Provides a WebSocket client for connecting to the [BitMEX](https://bitmex.com) real-time API.
81///
82/// Key runtime patterns:
83/// - Authentication handshakes are managed by the internal auth tracker, ensuring resubscriptions
84///   occur only after BitMEX acknowledges `authKey` messages.
85/// - The subscription state maintains pending and confirmed topics so reconnection replay is
86///   deterministic and per-topic errors are surfaced.
87#[derive(Clone, Debug)]
88#[cfg_attr(
89    feature = "python",
90    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.adapters")
91)]
92pub struct BitmexWebSocketClient {
93    url: String,
94    credential: Option<Credential>,
95    heartbeat: Option<u64>,
96    inner: Arc<RwLock<Option<WebSocketClient>>>,
97    rx: Option<Arc<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>>>,
98    signal: Arc<AtomicBool>,
99    task_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
100    account_id: AccountId,
101    auth_tracker: AuthTracker,
102    subscriptions: SubscriptionState,
103    instruments_cache: Arc<AHashMap<Ustr, InstrumentAny>>,
104    order_type_cache: Arc<DashMap<ClientOrderId, OrderType>>,
105    order_symbol_cache: Arc<DashMap<ClientOrderId, Ustr>>,
106}
107
108impl BitmexWebSocketClient {
109    /// Creates a new [`BitmexWebSocketClient`] instance.
110    ///
111    /// # Errors
112    ///
113    /// Returns an error if only one of `api_key` or `api_secret` is provided (both or neither required).
114    pub fn new(
115        url: Option<String>,
116        api_key: Option<String>,
117        api_secret: Option<String>,
118        account_id: Option<AccountId>,
119        heartbeat: Option<u64>,
120    ) -> anyhow::Result<Self> {
121        let credential = match (api_key, api_secret) {
122            (Some(key), Some(secret)) => Some(Credential::new(key, secret)),
123            (None, None) => None,
124            _ => anyhow::bail!("Both `api_key` and `api_secret` must be provided together"),
125        };
126
127        let account_id = account_id.unwrap_or(AccountId::from("BITMEX-master"));
128
129        Ok(Self {
130            url: url.unwrap_or(BITMEX_WS_URL.to_string()),
131            credential,
132            heartbeat,
133            inner: Arc::new(RwLock::new(None)),
134            rx: None,
135            signal: Arc::new(AtomicBool::new(false)),
136            task_handle: None,
137            account_id,
138            auth_tracker: AuthTracker::new(),
139            subscriptions: SubscriptionState::new(),
140            instruments_cache: Arc::new(AHashMap::new()),
141            order_type_cache: Arc::new(DashMap::new()),
142            order_symbol_cache: Arc::new(DashMap::new()),
143        })
144    }
145
146    /// Creates a new authenticated [`BitmexWebSocketClient`] using environment variables.
147    ///
148    /// # Errors
149    ///
150    /// Returns an error if environment variables are not set or credentials are invalid.
151    pub fn from_env() -> anyhow::Result<Self> {
152        let url = get_env_var("BITMEX_WS_URL")?;
153        let api_key = get_env_var("BITMEX_API_KEY")?;
154        let api_secret = get_env_var("BITMEX_API_SECRET")?;
155
156        Self::new(Some(url), Some(api_key), Some(api_secret), None, None)
157    }
158
159    /// Returns the websocket url being used by the client.
160    #[must_use]
161    pub const fn url(&self) -> &str {
162        self.url.as_str()
163    }
164
165    /// Returns the public API key being used by the client.
166    #[must_use]
167    pub fn api_key(&self) -> Option<&str> {
168        self.credential.as_ref().map(|c| c.api_key.as_str())
169    }
170
171    /// Returns a value indicating whether the client is active.
172    #[must_use]
173    pub fn is_active(&self) -> bool {
174        match self.inner.try_read() {
175            Ok(guard) => match &*guard {
176                Some(inner) => inner.is_active(),
177                None => false,
178            },
179            Err(_) => false,
180        }
181    }
182
183    /// Returns a value indicating whether the client is closed.
184    #[must_use]
185    pub fn is_closed(&self) -> bool {
186        match self.inner.try_read() {
187            Ok(guard) => match &*guard {
188                Some(inner) => inner.is_closed(),
189                None => true,
190            },
191            Err(_) => true,
192        }
193    }
194
195    /// Sets the account ID.
196    pub fn set_account_id(&mut self, account_id: AccountId) {
197        self.account_id = account_id;
198    }
199
200    /// Initialize the instruments cache with the given `instruments`.
201    pub fn initialize_instruments_cache(&mut self, instruments: Vec<InstrumentAny>) {
202        let mut instruments_cache: AHashMap<Ustr, InstrumentAny> = AHashMap::new();
203        for inst in instruments {
204            instruments_cache.insert(inst.symbol().inner(), inst.clone());
205        }
206
207        self.instruments_cache = Arc::new(instruments_cache);
208    }
209
210    /// Connect to the BitMEX WebSocket server.
211    ///
212    /// # Errors
213    ///
214    /// Returns an error if the WebSocket connection fails or authentication fails (if credentials provided).
215    ///
216    /// # Panics
217    ///
218    /// Panics if subscription or authentication messages fail to serialize to JSON.
219    pub async fn connect(&mut self) -> Result<(), BitmexWsError> {
220        let reader = self.connect_inner().await?;
221
222        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<NautilusWsMessage>();
223        self.rx = Some(Arc::new(rx));
224        let signal = self.signal.clone();
225
226        let account_id = self.account_id;
227        let inner_client = self.inner.clone();
228        let credential = self.credential.clone();
229        let auth_tracker = self.auth_tracker.clone();
230        let subscriptions = self.subscriptions.clone();
231        let instruments_cache = self.instruments_cache.clone();
232        let order_type_cache = self.order_type_cache.clone();
233        let order_symbol_cache = self.order_symbol_cache.clone();
234
235        let stream_handle = get_runtime().spawn(async move {
236            let mut handler = BitmexWsMessageHandler::new(
237                reader,
238                signal,
239                tx,
240                account_id,
241                auth_tracker.clone(),
242                subscriptions.clone(),
243                instruments_cache,
244                order_type_cache,
245                order_symbol_cache,
246            );
247
248            // Run message processing with reconnection handling
249            loop {
250                match handler.next().await {
251                    Some(NautilusWsMessage::Reconnected) => {
252                        log::info!("Reconnecting WebSocket");
253
254                        let has_client = {
255                            let guard = inner_client.read().await;
256                            guard.is_some()
257                        };
258
259                        if !has_client {
260                            log::warn!("Reconnection signaled but WebSocket client unavailable");
261                            continue;
262                        }
263
264                        let confirmed = subscriptions.confirmed();
265                        let pending = subscriptions.pending();
266                        let mut restore_set: HashSet<String> = HashSet::new();
267
268                        let mut collect_topics = |map: &DashMap<String, AHashSet<Ustr>>| {
269                            for entry in map.iter() {
270                                let (channel, symbols) = entry.pair();
271
272                                if channel == BitmexWsTopic::Instrument.as_ref() {
273                                    continue;
274                                }
275
276                                if symbols.is_empty() {
277                                    restore_set.insert(channel.clone());
278                                } else {
279                                    for symbol in symbols.iter() {
280                                        restore_set.insert(format!("{channel}:{symbol}"));
281                                    }
282                                }
283                            }
284                        };
285
286                        collect_topics(&confirmed);
287                        collect_topics(&pending);
288
289                        let mut topics_to_restore: Vec<String> = restore_set.into_iter().collect();
290                        topics_to_restore.sort();
291
292                        let auth_rx_opt = if let Some(cred) = &credential {
293                            match BitmexWebSocketClient::issue_authentication_request(
294                                &inner_client,
295                                cred,
296                                &auth_tracker,
297                            )
298                            .await
299                            {
300                                Ok(rx) => Some(rx),
301                                Err(e) => {
302                                    log::error!(
303                                        "Failed to send re-authentication request after reconnection: {e}"
304                                    );
305                                    continue;
306                                }
307                            }
308                        } else {
309                            None
310                        };
311
312                        let inner_for_task = inner_client.clone();
313                        let state_for_task = subscriptions.clone();
314                        let auth_tracker_for_task = auth_tracker.clone();
315                        let auth_rx_for_task = auth_rx_opt;
316                        let topics_to_restore_clone = topics_to_restore.clone();
317                        get_runtime().spawn(async move {
318                            if let Some(rx) = auth_rx_for_task {
319                                if let Err(e) = auth_tracker_for_task
320                                    .wait_for_result(
321                                        Duration::from_secs(AUTHENTICATION_TIMEOUT_SECS),
322                                        rx,
323                                    )
324                                    .await
325                                {
326                                    log::error!("Authentication after reconnection failed: {e}");
327                                    return;
328                                }
329                                log::info!("Re-authenticated after reconnection");
330                            }
331
332                            let mut all_topics =
333                                Vec::with_capacity(1 + topics_to_restore_clone.len());
334                            all_topics.push(BitmexWsTopic::Instrument.as_ref().to_string());
335                            all_topics.extend(topics_to_restore_clone.iter().cloned());
336
337                            for topic in &all_topics {
338                                state_for_task.mark_subscribe(topic.as_str());
339                            }
340
341                            if let Err(e) = BitmexWebSocketClient::send_topics(
342                                &inner_for_task,
343                                BitmexWsOperation::Subscribe,
344                                all_topics.clone(),
345                            )
346                            .await
347                            {
348                                log::error!(
349                                    "Failed to restore subscriptions after reconnection: {e}"
350                                );
351                                // Leave topics pending so the next reconnect attempt retries them.
352                            } else {
353                                log::info!(
354                                    "Restored {} subscriptions after reconnection",
355                                    all_topics.len()
356                                );
357                            }
358                        });
359                    }
360                    Some(msg) => {
361                        if let Err(e) = handler.tx.send(msg) {
362                            tracing::error!("Error sending message: {e}");
363                            break;
364                        }
365                    }
366                    None => {
367                        // Stream ended - check if it's a stop signal
368                        if handler.handler.signal.load(Ordering::Relaxed) {
369                            tracing::debug!("Stop signal received, ending message processing");
370                            break;
371                        }
372                        // Otherwise it's an unexpected stream end
373                        tracing::warn!("WebSocket stream ended unexpectedly");
374                        break;
375                    }
376                }
377            }
378        });
379
380        self.task_handle = Some(Arc::new(stream_handle));
381
382        if self.credential.is_some() {
383            self.authenticate().await?;
384        }
385
386        {
387            let inner_guard = self.inner.read().await;
388            if let Some(inner) = &*inner_guard {
389                self.subscriptions
390                    .mark_subscribe(BitmexWsTopic::Instrument.as_ref());
391
392                let subscribe_msg = BitmexSubscription {
393                    op: BitmexWsOperation::Subscribe,
394                    args: vec![Ustr::from(BitmexWsTopic::Instrument.as_ref())],
395                };
396
397                match serde_json::to_string(&subscribe_msg) {
398                    Ok(subscribe_json) => {
399                        if let Err(e) = inner.send_text(subscribe_json, None).await {
400                            log::error!("Failed to subscribe to instruments: {e}");
401                        } else {
402                            log::debug!("Subscribed to all instruments");
403                        }
404                    }
405                    Err(e) => {
406                        tracing::error!(error = %e, "Failed to serialize resubscribe message");
407                    }
408                }
409            }
410        }
411
412        Ok(())
413    }
414
415    /// Connect to the WebSocket and return a message receiver.
416    ///
417    /// # Errors
418    ///
419    /// Returns an error if the WebSocket connection fails or if authentication fails (when credentials are provided).
420    async fn connect_inner(
421        &mut self,
422    ) -> Result<tokio::sync::mpsc::UnboundedReceiver<Message>, BitmexWsError> {
423        let (message_handler, rx) = channel_message_handler();
424
425        let inner_for_ping = self.inner.clone();
426        let ping_handler: PingHandler = Arc::new(move |payload: Vec<u8>| {
427            let inner = inner_for_ping.clone();
428
429            get_runtime().spawn(async move {
430                let len = payload.len();
431                let guard = inner.read().await;
432
433                if let Some(client) = guard.as_ref() {
434                    if let Err(err) = client.send_pong(payload).await {
435                        tracing::warn!(error = %err, "Failed to send pong frame");
436                    } else {
437                        tracing::trace!("Sent pong frame ({len} bytes)");
438                    }
439                } else {
440                    tracing::debug!("Ping received with no active websocket client");
441                }
442            });
443        });
444
445        let config = WebSocketConfig {
446            url: self.url.clone(),
447            headers: vec![(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())],
448            heartbeat: self.heartbeat,
449            heartbeat_msg: None,
450            message_handler: Some(message_handler),
451            ping_handler: Some(ping_handler),
452            reconnect_timeout_ms: Some(5_000),
453            reconnect_delay_initial_ms: None, // Use default
454            reconnect_delay_max_ms: None,     // Use default
455            reconnect_backoff_factor: None,   // Use default
456            reconnect_jitter_ms: None,        // Use default
457        };
458
459        let keyed_quotas = vec![];
460        let client = WebSocketClient::connect(
461            config,
462            None, // post_reconnection
463            keyed_quotas,
464            None, // default_quota
465        )
466        .await
467        .map_err(|e| BitmexWsError::ClientError(e.to_string()))?;
468
469        {
470            let mut inner_guard = self.inner.write().await;
471            *inner_guard = Some(client);
472        }
473
474        Ok(rx)
475    }
476
477    async fn issue_authentication_request(
478        inner: &Arc<RwLock<Option<WebSocketClient>>>,
479        credential: &Credential,
480        tracker: &AuthTracker,
481    ) -> Result<AuthResultReceiver, BitmexWsError> {
482        let receiver = tracker.begin();
483
484        let expires = (chrono::Utc::now() + chrono::Duration::seconds(30)).timestamp();
485        let signature = credential.sign("GET", "/realtime", expires, "");
486
487        let auth_message = BitmexAuthentication {
488            op: BitmexWsAuthAction::AuthKeyExpires,
489            args: (credential.api_key.to_string(), expires, signature),
490        };
491
492        let auth_json = serde_json::to_string(&auth_message).map_err(|e| {
493            let msg = format!("Failed to serialize auth message: {e}");
494            tracker.fail(msg.clone());
495            BitmexWsError::AuthenticationError(msg)
496        })?;
497
498        {
499            let inner_guard = inner.read().await;
500            let client = inner_guard.as_ref().ok_or_else(|| {
501                tracker.fail("Cannot authenticate: not connected");
502                BitmexWsError::AuthenticationError("Cannot authenticate: not connected".to_string())
503            })?;
504
505            client.send_text(auth_json, None).await.map_err(|e| {
506                let error = e.to_string();
507                tracker.fail(error.clone());
508                BitmexWsError::AuthenticationError(error)
509            })?;
510        }
511
512        Ok(receiver)
513    }
514
515    /// Authenticate the WebSocket connection using the provided credentials.
516    ///
517    /// # Errors
518    ///
519    /// Returns an error if the WebSocket is not connected, if authentication fails,
520    /// or if credentials are not available.
521    async fn authenticate(&self) -> Result<(), BitmexWsError> {
522        let credential = match &self.credential {
523            Some(credential) => credential,
524            None => {
525                return Err(BitmexWsError::AuthenticationError(
526                    "API credentials not available to authenticate".to_string(),
527                ));
528            }
529        };
530
531        let rx =
532            Self::issue_authentication_request(&self.inner, credential, &self.auth_tracker).await?;
533        self.auth_tracker
534            .wait_for_result(Duration::from_secs(AUTHENTICATION_TIMEOUT_SECS), rx)
535            .await
536    }
537
538    /// Wait until the WebSocket connection is active.
539    ///
540    /// # Errors
541    ///
542    /// Returns an error if the connection times out.
543    pub async fn wait_until_active(&self, timeout_secs: f64) -> Result<(), BitmexWsError> {
544        let timeout = tokio::time::Duration::from_secs_f64(timeout_secs);
545
546        tokio::time::timeout(timeout, async {
547            while !self.is_active() {
548                tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
549            }
550        })
551        .await
552        .map_err(|_| {
553            BitmexWsError::ClientError(format!(
554                "WebSocket connection timeout after {timeout_secs} seconds"
555            ))
556        })?;
557
558        Ok(())
559    }
560
561    /// Provides the internal stream as a channel-based stream.
562    ///
563    /// # Panics
564    ///
565    /// This function panics:
566    /// - If the websocket is not connected.
567    /// - If `stream` has already been called somewhere else (stream receiver is then taken).
568    pub fn stream(&mut self) -> impl Stream<Item = NautilusWsMessage> + use<> {
569        let rx = self
570            .rx
571            .take()
572            .expect("Stream receiver already taken or not connected");
573        let mut rx = Arc::try_unwrap(rx).expect("Cannot take ownership - other references exist");
574        async_stream::stream! {
575            while let Some(msg) = rx.recv().await {
576                yield msg;
577            }
578        }
579    }
580
581    /// Closes the client.
582    ///
583    /// # Errors
584    ///
585    /// Returns an error if the WebSocket is not connected or if closing fails.
586    ///
587    /// # Panics
588    ///
589    /// Panics if the task handle cannot be unwrapped (should never happen in normal usage).
590    pub async fn close(&mut self) -> Result<(), BitmexWsError> {
591        log::debug!("Starting close process");
592
593        self.signal.store(true, Ordering::Relaxed);
594
595        {
596            let inner_guard = self.inner.read().await;
597            if let Some(inner) = &*inner_guard {
598                log::debug!("Disconnecting websocket");
599
600                match tokio::time::timeout(Duration::from_secs(3), inner.disconnect()).await {
601                    Ok(()) => log::debug!("Websocket disconnected successfully"),
602                    Err(_) => {
603                        log::warn!(
604                            "Timeout waiting for websocket disconnect, continuing with cleanup"
605                        );
606                    }
607                }
608            } else {
609                log::debug!("No active connection to disconnect");
610            }
611        }
612
613        // Clean up task handle with timeout
614        if let Some(task_handle) = self.task_handle.take() {
615            match Arc::try_unwrap(task_handle) {
616                Ok(handle) => {
617                    log::debug!("Waiting for task handle to complete");
618                    match tokio::time::timeout(Duration::from_secs(2), handle).await {
619                        Ok(Ok(())) => log::debug!("Task handle completed successfully"),
620                        Ok(Err(e)) => log::error!("Task handle encountered an error: {e:?}"),
621                        Err(_) => {
622                            log::warn!(
623                                "Timeout waiting for task handle, task may still be running"
624                            );
625                            // The task will be dropped and should clean up automatically
626                        }
627                    }
628                }
629                Err(arc_handle) => {
630                    log::debug!(
631                        "Cannot take ownership of task handle - other references exist, aborting task"
632                    );
633                    arc_handle.abort();
634                }
635            }
636        } else {
637            log::debug!("No task handle to await");
638        }
639
640        log::debug!("Closed");
641
642        Ok(())
643    }
644
645    async fn send_topics(
646        inner: &Arc<RwLock<Option<WebSocketClient>>>,
647        op: BitmexWsOperation,
648        topics: Vec<String>,
649    ) -> Result<(), BitmexWsError> {
650        if topics.is_empty() {
651            return Ok(());
652        }
653
654        let message = BitmexSubscription {
655            op,
656            args: topics
657                .iter()
658                .map(|topic| Ustr::from(topic.as_str()))
659                .collect(),
660        };
661
662        let op_name = message.op.as_ref().to_string();
663        let payload = serde_json::to_string(&message).map_err(|e| {
664            BitmexWsError::SubscriptionError(format!("Failed to serialize {op_name} message: {e}"))
665        })?;
666
667        let inner_guard = inner.read().await;
668        if let Some(client) = &*inner_guard {
669            client
670                .send_text(payload, None)
671                .await
672                .map_err(|e| BitmexWsError::SubscriptionError(e.to_string()))?;
673        } else {
674            log::error!("Cannot send {op_name} message: not connected");
675        }
676
677        Ok(())
678    }
679
680    /// Subscribe to the specified topics.
681    ///
682    /// # Errors
683    ///
684    /// Returns an error if the WebSocket is not connected or if sending the subscription message fails.
685    ///
686    /// # Panics
687    ///
688    /// Panics if serialization of WebSocket messages fails (should never happen).
689    pub async fn subscribe(&self, topics: Vec<String>) -> Result<(), BitmexWsError> {
690        log::debug!("Subscribing to topics: {topics:?}");
691
692        for topic in &topics {
693            self.subscriptions.mark_subscribe(topic.as_str());
694        }
695
696        Self::send_topics(&self.inner, BitmexWsOperation::Subscribe, topics).await
697    }
698
699    /// Unsubscribe from the specified topics.
700    ///
701    /// # Errors
702    ///
703    /// Returns an error if the WebSocket is not connected or if sending the unsubscription message fails.
704    async fn unsubscribe(&self, topics: Vec<String>) -> Result<(), BitmexWsError> {
705        log::debug!("Attempting to unsubscribe from topics: {topics:?}");
706
707        if self.signal.load(Ordering::Relaxed) {
708            log::debug!("Shutdown signal detected, skipping unsubscribe");
709            return Ok(());
710        }
711
712        for topic in &topics {
713            self.subscriptions.mark_unsubscribe(topic.as_str());
714        }
715
716        let result = Self::send_topics(&self.inner, BitmexWsOperation::Unsubscribe, topics).await;
717        if let Err(e) = result {
718            tracing::debug!(error = %e, "Failed to send unsubscribe message");
719        }
720        Ok(())
721    }
722
723    /// Get the current number of active subscriptions.
724    #[must_use]
725    pub fn subscription_count(&self) -> usize {
726        self.subscriptions.len()
727    }
728
729    pub fn get_subscriptions(&self, instrument_id: InstrumentId) -> Vec<String> {
730        let symbol = instrument_id.symbol.inner();
731        let confirmed = self.subscriptions.confirmed();
732        let mut channels = Vec::with_capacity(confirmed.len());
733
734        for entry in confirmed.iter() {
735            let (channel, symbols) = entry.pair();
736            if symbols.contains(&symbol) {
737                // Return the full topic string (e.g., "orderBookL2:XBTUSD")
738                channels.push(format!("{channel}:{symbol}"));
739            } else if symbols.is_empty()
740                && (channel == BitmexWsAuthChannel::Execution.as_ref()
741                    || channel == BitmexWsAuthChannel::Order.as_ref())
742            {
743                // These are account-level subscriptions without symbols
744                channels.push(channel.clone());
745            }
746        }
747
748        channels
749    }
750
751    /// Subscribe to instrument updates for all instruments on the venue.
752    ///
753    /// # Errors
754    ///
755    /// Returns an error if the WebSocket is not connected or if the subscription fails.
756    pub async fn subscribe_instruments(&self) -> Result<(), BitmexWsError> {
757        // Already subscribed automatically on connection
758        log::debug!("Already subscribed to all instruments on connection, skipping");
759        Ok(())
760    }
761
762    /// Subscribe to instrument updates (mark/index prices) for the specified instrument.
763    ///
764    /// # Errors
765    ///
766    /// Returns an error if the WebSocket is not connected or if the subscription fails.
767    pub async fn subscribe_instrument(
768        &self,
769        instrument_id: InstrumentId,
770    ) -> Result<(), BitmexWsError> {
771        // Already subscribed to all instruments on connection
772        log::debug!(
773            "Already subscribed to all instruments on connection (includes {instrument_id}), skipping"
774        );
775        Ok(())
776    }
777
778    /// Subscribe to order book updates for the specified instrument.
779    ///
780    /// # Errors
781    ///
782    /// Returns an error if the WebSocket is not connected or if the subscription fails.
783    pub async fn subscribe_book(&self, instrument_id: InstrumentId) -> Result<(), BitmexWsError> {
784        let topic = BitmexWsTopic::OrderBookL2;
785        let symbol = instrument_id.symbol.as_str();
786        self.subscribe(vec![format!("{topic}:{symbol}")]).await
787    }
788
789    /// Subscribe to order book L2 (25 levels) updates for the specified instrument.
790    ///
791    /// # Errors
792    ///
793    /// Returns an error if the WebSocket is not connected or if the subscription fails.
794    pub async fn subscribe_book_25(
795        &self,
796        instrument_id: InstrumentId,
797    ) -> Result<(), BitmexWsError> {
798        let topic = BitmexWsTopic::OrderBookL2_25;
799        let symbol = instrument_id.symbol.as_str();
800        self.subscribe(vec![format!("{topic}:{symbol}")]).await
801    }
802
803    /// Subscribe to order book depth 10 updates for the specified instrument.
804    ///
805    /// # Errors
806    ///
807    /// Returns an error if the WebSocket is not connected or if the subscription fails.
808    pub async fn subscribe_book_depth10(
809        &self,
810        instrument_id: InstrumentId,
811    ) -> Result<(), BitmexWsError> {
812        let topic = BitmexWsTopic::OrderBook10;
813        let symbol = instrument_id.symbol.as_str();
814        self.subscribe(vec![format!("{topic}:{symbol}")]).await
815    }
816
817    /// Subscribe to quote updates for the specified instrument.
818    ///
819    /// Note: Index symbols (starting with '.') do not have quotes and will be silently ignored.
820    ///
821    /// # Errors
822    ///
823    /// Returns an error if the WebSocket is not connected or if the subscription fails.
824    pub async fn subscribe_quotes(&self, instrument_id: InstrumentId) -> Result<(), BitmexWsError> {
825        let symbol = instrument_id.symbol.inner();
826
827        // Index symbols don't have quotes (bid/ask), only a single price
828        if is_index_symbol(&instrument_id.symbol.inner()) {
829            tracing::warn!("Ignoring quote subscription for index symbol: {symbol}");
830            return Ok(());
831        }
832
833        let topic = BitmexWsTopic::Quote;
834        self.subscribe(vec![format!("{topic}:{symbol}")]).await
835    }
836
837    /// Subscribe to trade updates for the specified instrument.
838    ///
839    /// Note: Index symbols (starting with '.') do not have trades and will be silently ignored.
840    ///
841    /// # Errors
842    ///
843    /// Returns an error if the WebSocket is not connected or if the subscription fails.
844    pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> Result<(), BitmexWsError> {
845        let symbol = instrument_id.symbol.inner();
846
847        // Index symbols don't have trades
848        if is_index_symbol(&symbol) {
849            tracing::warn!("Ignoring trade subscription for index symbol: {symbol}");
850            return Ok(());
851        }
852
853        let topic = BitmexWsTopic::Trade;
854        self.subscribe(vec![format!("{topic}:{symbol}")]).await
855    }
856
857    /// Subscribe to mark price updates for the specified instrument.
858    ///
859    /// # Errors
860    ///
861    /// Returns an error if the WebSocket is not connected or if the subscription fails.
862    pub async fn subscribe_mark_prices(
863        &self,
864        instrument_id: InstrumentId,
865    ) -> Result<(), BitmexWsError> {
866        self.subscribe_instrument(instrument_id).await
867    }
868
869    /// Subscribe to index price updates for the specified instrument.
870    ///
871    /// # Errors
872    ///
873    /// Returns an error if the WebSocket is not connected or if the subscription fails.
874    pub async fn subscribe_index_prices(
875        &self,
876        instrument_id: InstrumentId,
877    ) -> Result<(), BitmexWsError> {
878        self.subscribe_instrument(instrument_id).await
879    }
880
881    /// Subscribe to funding rate updates for the specified instrument.
882    ///
883    /// # Errors
884    ///
885    /// Returns an error if the WebSocket is not connected or if the subscription fails.
886    pub async fn subscribe_funding_rates(
887        &self,
888        instrument_id: InstrumentId,
889    ) -> Result<(), BitmexWsError> {
890        let topic = BitmexWsTopic::Funding;
891        let symbol = instrument_id.symbol.as_str();
892        self.subscribe(vec![format!("{topic}:{symbol}")]).await
893    }
894
895    /// Subscribe to bar updates for the specified bar type.
896    ///
897    /// # Errors
898    ///
899    /// Returns an error if the WebSocket is not connected or if the subscription fails.
900    pub async fn subscribe_bars(&self, bar_type: BarType) -> Result<(), BitmexWsError> {
901        let topic = topic_from_bar_spec(bar_type.spec());
902        let symbol = bar_type.instrument_id().symbol.to_string();
903        self.subscribe(vec![format!("{topic}:{symbol}")]).await
904    }
905
906    /// Unsubscribe from instrument updates for all instruments on the venue.
907    ///
908    /// # Errors
909    ///
910    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
911    pub async fn unsubscribe_instruments(&self) -> Result<(), BitmexWsError> {
912        // No-op: instruments are required for proper operation
913        log::debug!(
914            "Instruments subscription maintained for proper operation, skipping unsubscribe"
915        );
916        Ok(())
917    }
918
919    /// Unsubscribe from instrument updates (mark/index prices) for the specified instrument.
920    ///
921    /// # Errors
922    ///
923    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
924    pub async fn unsubscribe_instrument(
925        &self,
926        instrument_id: InstrumentId,
927    ) -> Result<(), BitmexWsError> {
928        // No-op: instruments are required for proper operation
929        log::debug!(
930            "Instruments subscription maintained for proper operation (includes {instrument_id}), skipping unsubscribe"
931        );
932        Ok(())
933    }
934
935    /// Unsubscribe from order book updates for the specified instrument.
936    ///
937    /// # Errors
938    ///
939    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
940    pub async fn unsubscribe_book(&self, instrument_id: InstrumentId) -> Result<(), BitmexWsError> {
941        let topic = BitmexWsTopic::OrderBookL2;
942        let symbol = instrument_id.symbol.as_str();
943        self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
944    }
945
946    /// Unsubscribe from order book L2 (25 levels) updates for the specified instrument.
947    ///
948    /// # Errors
949    ///
950    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
951    pub async fn unsubscribe_book_25(
952        &self,
953        instrument_id: InstrumentId,
954    ) -> Result<(), BitmexWsError> {
955        let topic = BitmexWsTopic::OrderBookL2_25;
956        let symbol = instrument_id.symbol.as_str();
957        self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
958    }
959
960    /// Unsubscribe from order book depth 10 updates for the specified instrument.
961    ///
962    /// # Errors
963    ///
964    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
965    pub async fn unsubscribe_book_depth10(
966        &self,
967        instrument_id: InstrumentId,
968    ) -> Result<(), BitmexWsError> {
969        let topic = BitmexWsTopic::OrderBook10;
970        let symbol = instrument_id.symbol.as_str();
971        self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
972    }
973
974    /// Unsubscribe from quote updates for the specified instrument.
975    ///
976    /// # Errors
977    ///
978    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
979    pub async fn unsubscribe_quotes(
980        &self,
981        instrument_id: InstrumentId,
982    ) -> Result<(), BitmexWsError> {
983        let symbol = instrument_id.symbol.inner();
984
985        // Index symbols don't have quotes
986        if is_index_symbol(&symbol) {
987            return Ok(());
988        }
989
990        let topic = BitmexWsTopic::Quote;
991        self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
992    }
993
994    /// Unsubscribe from trade updates for the specified instrument.
995    ///
996    /// # Errors
997    ///
998    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
999    pub async fn unsubscribe_trades(
1000        &self,
1001        instrument_id: InstrumentId,
1002    ) -> Result<(), BitmexWsError> {
1003        let symbol = instrument_id.symbol.inner();
1004
1005        // Index symbols don't have trades
1006        if is_index_symbol(&symbol) {
1007            return Ok(());
1008        }
1009
1010        let topic = BitmexWsTopic::Trade;
1011        self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
1012    }
1013
1014    /// Unsubscribe from mark price updates for the specified instrument.
1015    ///
1016    /// # Errors
1017    ///
1018    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1019    pub async fn unsubscribe_mark_prices(
1020        &self,
1021        instrument_id: InstrumentId,
1022    ) -> Result<(), BitmexWsError> {
1023        // No-op: instrument channel shared with index prices
1024        log::debug!(
1025            "Mark prices for {instrument_id} uses shared instrument channel, skipping unsubscribe"
1026        );
1027        Ok(())
1028    }
1029
1030    /// Unsubscribe from index price updates for the specified instrument.
1031    ///
1032    /// # Errors
1033    ///
1034    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1035    pub async fn unsubscribe_index_prices(
1036        &self,
1037        instrument_id: InstrumentId,
1038    ) -> Result<(), BitmexWsError> {
1039        // No-op: instrument channel shared with mark prices
1040        log::debug!(
1041            "Index prices for {instrument_id} uses shared instrument channel, skipping unsubscribe"
1042        );
1043        Ok(())
1044    }
1045
1046    /// Unsubscribe from funding rate updates for the specified instrument.
1047    ///
1048    /// # Errors
1049    ///
1050    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1051    pub async fn unsubscribe_funding_rates(
1052        &self,
1053        instrument_id: InstrumentId,
1054    ) -> Result<(), BitmexWsError> {
1055        // No-op: unsubscribing during shutdown causes race conditions
1056        log::debug!(
1057            "Funding rates for {instrument_id}, skipping unsubscribe to avoid shutdown race"
1058        );
1059        Ok(())
1060    }
1061
1062    /// Unsubscribe from bar updates for the specified bar type.
1063    ///
1064    /// # Errors
1065    ///
1066    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1067    pub async fn unsubscribe_bars(&self, bar_type: BarType) -> Result<(), BitmexWsError> {
1068        let topic = topic_from_bar_spec(bar_type.spec());
1069        let symbol = bar_type.instrument_id().symbol.to_string();
1070        self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
1071    }
1072
1073    /// Subscribe to order updates for the authenticated account.
1074    ///
1075    /// # Errors
1076    ///
1077    /// Returns an error if the WebSocket is not connected, not authenticated, or if the subscription fails.
1078    pub async fn subscribe_orders(&self) -> Result<(), BitmexWsError> {
1079        if self.credential.is_none() {
1080            return Err(BitmexWsError::MissingCredentials);
1081        }
1082        self.subscribe(vec![BitmexWsAuthChannel::Order.to_string()])
1083            .await
1084    }
1085
1086    /// Subscribe to execution updates for the authenticated account.
1087    ///
1088    /// # Errors
1089    ///
1090    /// Returns an error if the WebSocket is not connected, not authenticated, or if the subscription fails.
1091    pub async fn subscribe_executions(&self) -> Result<(), BitmexWsError> {
1092        if self.credential.is_none() {
1093            return Err(BitmexWsError::MissingCredentials);
1094        }
1095        self.subscribe(vec![BitmexWsAuthChannel::Execution.to_string()])
1096            .await
1097    }
1098
1099    /// Subscribe to position updates for the authenticated account.
1100    ///
1101    /// # Errors
1102    ///
1103    /// Returns an error if the WebSocket is not connected, not authenticated, or if the subscription fails.
1104    pub async fn subscribe_positions(&self) -> Result<(), BitmexWsError> {
1105        if self.credential.is_none() {
1106            return Err(BitmexWsError::MissingCredentials);
1107        }
1108        self.subscribe(vec![BitmexWsAuthChannel::Position.to_string()])
1109            .await
1110    }
1111
1112    /// Subscribe to margin updates for the authenticated account.
1113    ///
1114    /// # Errors
1115    ///
1116    /// Returns an error if the WebSocket is not connected, not authenticated, or if the subscription fails.
1117    pub async fn subscribe_margin(&self) -> Result<(), BitmexWsError> {
1118        if self.credential.is_none() {
1119            return Err(BitmexWsError::MissingCredentials);
1120        }
1121        self.subscribe(vec![BitmexWsAuthChannel::Margin.to_string()])
1122            .await
1123    }
1124
1125    /// Subscribe to wallet updates for the authenticated account.
1126    ///
1127    /// # Errors
1128    ///
1129    /// Returns an error if the WebSocket is not connected, not authenticated, or if the subscription fails.
1130    pub async fn subscribe_wallet(&self) -> Result<(), BitmexWsError> {
1131        if self.credential.is_none() {
1132            return Err(BitmexWsError::MissingCredentials);
1133        }
1134        self.subscribe(vec![BitmexWsAuthChannel::Wallet.to_string()])
1135            .await
1136    }
1137
1138    /// Unsubscribe from order updates for the authenticated account.
1139    ///
1140    /// # Errors
1141    ///
1142    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1143    pub async fn unsubscribe_orders(&self) -> Result<(), BitmexWsError> {
1144        self.unsubscribe(vec![BitmexWsAuthChannel::Order.to_string()])
1145            .await
1146    }
1147
1148    /// Unsubscribe from execution updates for the authenticated account.
1149    ///
1150    /// # Errors
1151    ///
1152    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1153    pub async fn unsubscribe_executions(&self) -> Result<(), BitmexWsError> {
1154        self.unsubscribe(vec![BitmexWsAuthChannel::Execution.to_string()])
1155            .await
1156    }
1157
1158    /// Unsubscribe from position updates for the authenticated account.
1159    ///
1160    /// # Errors
1161    ///
1162    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1163    pub async fn unsubscribe_positions(&self) -> Result<(), BitmexWsError> {
1164        self.unsubscribe(vec![BitmexWsAuthChannel::Position.to_string()])
1165            .await
1166    }
1167
1168    /// Unsubscribe from margin updates for the authenticated account.
1169    ///
1170    /// # Errors
1171    ///
1172    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1173    pub async fn unsubscribe_margin(&self) -> Result<(), BitmexWsError> {
1174        self.unsubscribe(vec![BitmexWsAuthChannel::Margin.to_string()])
1175            .await
1176    }
1177
1178    /// Unsubscribe from wallet updates for the authenticated account.
1179    ///
1180    /// # Errors
1181    ///
1182    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1183    pub async fn unsubscribe_wallet(&self) -> Result<(), BitmexWsError> {
1184        self.unsubscribe(vec![BitmexWsAuthChannel::Wallet.to_string()])
1185            .await
1186    }
1187}
1188
1189struct BitmexFeedHandler {
1190    receiver: tokio::sync::mpsc::UnboundedReceiver<Message>,
1191    signal: Arc<AtomicBool>,
1192}
1193
1194impl BitmexFeedHandler {
1195    /// Creates a new [`BitmexFeedHandler`] instance.
1196    pub fn new(
1197        receiver: tokio::sync::mpsc::UnboundedReceiver<Message>,
1198        signal: Arc<AtomicBool>,
1199    ) -> Self {
1200        Self { receiver, signal }
1201    }
1202
1203    /// Get the next message from the WebSocket stream.
1204    async fn next(&mut self) -> Option<BitmexWsMessage> {
1205        loop {
1206            tokio::select! {
1207                msg = self.receiver.recv() => match msg {
1208                    Some(msg) => match msg {
1209                        Message::Text(text) => {
1210                            if text == RECONNECTED {
1211                                tracing::info!("Received WebSocket reconnection signal");
1212                                return Some(BitmexWsMessage::Reconnected);
1213                            }
1214
1215                            tracing::trace!("Raw websocket message: {text}");
1216
1217                            if Self::is_heartbeat_message(&text) {
1218                                tracing::trace!(
1219                                    "Ignoring heartbeat control message: {text}"
1220                                );
1221                                continue;
1222                            }
1223
1224                            match serde_json::from_str(&text) {
1225                                Ok(msg) => match &msg {
1226                                    BitmexWsMessage::Welcome {
1227                                        version,
1228                                        heartbeat_enabled,
1229                                        limit,
1230                                        ..
1231                                    } => {
1232                                        tracing::info!(
1233                                            version = version,
1234                                            heartbeat = heartbeat_enabled,
1235                                            rate_limit = ?limit.remaining,
1236                                            "Welcome to the BitMEX Realtime API:",
1237                                        );
1238                                    }
1239                                    BitmexWsMessage::Subscription { .. } => return Some(msg),
1240                                    BitmexWsMessage::Error { status, error, .. } => {
1241                                        tracing::error!(
1242                                            status = status,
1243                                            error = error,
1244                                            "Received error from BitMEX"
1245                                        );
1246                                    }
1247                                    _ => return Some(msg),
1248                                },
1249                                Err(e) => {
1250                                    tracing::error!("Failed to parse WebSocket message: {e}: {text}");
1251                                }
1252                            }
1253                        }
1254                        Message::Binary(msg) => {
1255                            tracing::debug!("Raw binary: {msg:?}");
1256                        }
1257                        Message::Close(_) => {
1258                            tracing::debug!("Received close message, waiting for reconnection");
1259                            continue;
1260                        }
1261                        msg => match msg {
1262                            Message::Ping(data) => {
1263                                tracing::trace!("Received ping frame with {} bytes", data.len());
1264                            }
1265                            Message::Pong(data) => {
1266                                tracing::trace!("Received pong frame with {} bytes", data.len());
1267                            }
1268                            Message::Frame(frame) => {
1269                                tracing::debug!("Received raw frame: {frame:?}");
1270                            }
1271                            _ => {
1272                                tracing::warn!("Unexpected message type: {msg:?}");
1273                            }
1274                        },
1275                    }
1276                    None => {
1277                        tracing::info!("WebSocket stream closed");
1278                        return None;
1279                    }
1280                },
1281                _ = tokio::time::sleep(Duration::from_millis(1)) => {
1282                    if self.signal.load(std::sync::atomic::Ordering::Relaxed) {
1283                        tracing::debug!("Stop signal received");
1284                        return None;
1285                    }
1286                }
1287            }
1288        }
1289    }
1290
1291    fn is_heartbeat_message(text: &str) -> bool {
1292        let trimmed = text.trim();
1293
1294        if !trimmed.starts_with('{') || trimmed.len() > 64 {
1295            return false;
1296        }
1297
1298        trimmed.contains("\"op\":\"ping\"") || trimmed.contains("\"op\":\"pong\"")
1299    }
1300}
1301
1302struct BitmexWsMessageHandler {
1303    handler: BitmexFeedHandler,
1304    tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
1305    #[allow(
1306        dead_code,
1307        reason = "May be needed for future account-specific processing"
1308    )]
1309    account_id: AccountId,
1310    auth_tracker: AuthTracker,
1311    subscriptions: SubscriptionState,
1312    instruments_cache: Arc<AHashMap<Ustr, InstrumentAny>>,
1313    order_type_cache: Arc<DashMap<ClientOrderId, OrderType>>,
1314    order_symbol_cache: Arc<DashMap<ClientOrderId, Ustr>>,
1315    quote_cache: QuoteCache,
1316}
1317
1318impl BitmexWsMessageHandler {
1319    /// Creates a new [`BitmexWsMessageHandler`] instance.
1320    #[allow(clippy::too_many_arguments)]
1321    pub fn new(
1322        receiver: tokio::sync::mpsc::UnboundedReceiver<Message>,
1323        signal: Arc<AtomicBool>,
1324        tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
1325        account_id: AccountId,
1326        auth_tracker: AuthTracker,
1327        subscriptions: SubscriptionState,
1328        instruments_cache: Arc<AHashMap<Ustr, InstrumentAny>>,
1329        order_type_cache: Arc<DashMap<ClientOrderId, OrderType>>,
1330        order_symbol_cache: Arc<DashMap<ClientOrderId, Ustr>>,
1331    ) -> Self {
1332        let handler = BitmexFeedHandler::new(receiver, signal);
1333        Self {
1334            handler,
1335            tx,
1336            account_id,
1337            auth_tracker,
1338            subscriptions,
1339            instruments_cache,
1340            order_type_cache,
1341            order_symbol_cache,
1342            quote_cache: QuoteCache::new(),
1343        }
1344    }
1345
1346    // Run is now handled inline in the connect() method where we have access to reconnection resources
1347
1348    #[inline]
1349    fn get_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
1350        self.instruments_cache.get(symbol).cloned()
1351    }
1352
1353    async fn next(&mut self) -> Option<NautilusWsMessage> {
1354        let clock = get_atomic_clock_realtime();
1355
1356        while let Some(msg) = self.handler.next().await {
1357            match msg {
1358                BitmexWsMessage::Reconnected => {
1359                    // Return reconnection signal to outer loop
1360                    self.quote_cache.clear();
1361                    return Some(NautilusWsMessage::Reconnected);
1362                }
1363                BitmexWsMessage::Subscription {
1364                    success,
1365                    subscribe,
1366                    request,
1367                    error,
1368                } => {
1369                    self.handle_subscription_message(
1370                        success,
1371                        subscribe.as_ref(),
1372                        request.as_ref(),
1373                        error.as_deref(),
1374                    );
1375                    continue;
1376                }
1377                BitmexWsMessage::Table(table_msg) => {
1378                    let ts_init = clock.get_time_ns();
1379
1380                    return Some(match table_msg {
1381                        BitmexTableMessage::OrderBookL2 { action, data } => {
1382                            if data.is_empty() {
1383                                continue;
1384                            }
1385                            let data = parse_book_msg_vec(
1386                                data,
1387                                action,
1388                                self.instruments_cache.as_ref(),
1389                                ts_init,
1390                            );
1391
1392                            NautilusWsMessage::Data(data)
1393                        }
1394                        BitmexTableMessage::OrderBookL2_25 { action, data } => {
1395                            if data.is_empty() {
1396                                continue;
1397                            }
1398                            let data = parse_book_msg_vec(
1399                                data,
1400                                action,
1401                                self.instruments_cache.as_ref(),
1402                                ts_init,
1403                            );
1404
1405                            NautilusWsMessage::Data(data)
1406                        }
1407                        BitmexTableMessage::OrderBook10 { data, .. } => {
1408                            if data.is_empty() {
1409                                continue;
1410                            }
1411                            let data = parse_book10_msg_vec(
1412                                data,
1413                                self.instruments_cache.as_ref(),
1414                                ts_init,
1415                            );
1416
1417                            NautilusWsMessage::Data(data)
1418                        }
1419                        BitmexTableMessage::Quote { mut data, .. } => {
1420                            // Index symbols may return empty quote data
1421                            if data.is_empty() {
1422                                continue;
1423                            }
1424
1425                            let msg = data.remove(0);
1426                            let Some(instrument) = self.get_instrument(&msg.symbol) else {
1427                                tracing::error!(
1428                                    symbol = %msg.symbol,
1429                                    "Instrument not found for quote message"
1430                                );
1431                                continue;
1432                            };
1433
1434                            if let Some(quote) =
1435                                self.quote_cache.process(&msg, &instrument, ts_init)
1436                            {
1437                                NautilusWsMessage::Data(vec![Data::Quote(quote)])
1438                            } else {
1439                                continue;
1440                            }
1441                        }
1442                        BitmexTableMessage::Trade { data, .. } => {
1443                            if data.is_empty() {
1444                                continue;
1445                            }
1446                            let data =
1447                                parse_trade_msg_vec(data, self.instruments_cache.as_ref(), ts_init);
1448
1449                            NautilusWsMessage::Data(data)
1450                        }
1451                        BitmexTableMessage::TradeBin1m { action, data } => {
1452                            if action == BitmexAction::Partial || data.is_empty() {
1453                                continue;
1454                            }
1455                            let data = parse_trade_bin_msg_vec(
1456                                data,
1457                                BitmexWsTopic::TradeBin1m,
1458                                self.instruments_cache.as_ref(),
1459                                ts_init,
1460                            );
1461
1462                            NautilusWsMessage::Data(data)
1463                        }
1464                        BitmexTableMessage::TradeBin5m { action, data } => {
1465                            if action == BitmexAction::Partial || data.is_empty() {
1466                                continue;
1467                            }
1468                            let data = parse_trade_bin_msg_vec(
1469                                data,
1470                                BitmexWsTopic::TradeBin5m,
1471                                self.instruments_cache.as_ref(),
1472                                ts_init,
1473                            );
1474
1475                            NautilusWsMessage::Data(data)
1476                        }
1477                        BitmexTableMessage::TradeBin1h { action, data } => {
1478                            if action == BitmexAction::Partial || data.is_empty() {
1479                                continue;
1480                            }
1481                            let data = parse_trade_bin_msg_vec(
1482                                data,
1483                                BitmexWsTopic::TradeBin1h,
1484                                self.instruments_cache.as_ref(),
1485                                ts_init,
1486                            );
1487
1488                            NautilusWsMessage::Data(data)
1489                        }
1490                        BitmexTableMessage::TradeBin1d { action, data } => {
1491                            if action == BitmexAction::Partial || data.is_empty() {
1492                                continue;
1493                            }
1494                            let data = parse_trade_bin_msg_vec(
1495                                data,
1496                                BitmexWsTopic::TradeBin1d,
1497                                self.instruments_cache.as_ref(),
1498                                ts_init,
1499                            );
1500
1501                            NautilusWsMessage::Data(data)
1502                        }
1503                        // Execution messages
1504                        // Note: BitMEX may send duplicate order status updates for the same order
1505                        // (e.g., immediate response + stream update). This is expected behavior.
1506                        BitmexTableMessage::Order { data, .. } => {
1507                            // Process all orders in the message
1508                            let mut reports = Vec::with_capacity(data.len());
1509
1510                            for order_data in data {
1511                                match order_data {
1512                                    OrderData::Full(order_msg) => {
1513                                        let Some(instrument) =
1514                                            self.get_instrument(&order_msg.symbol)
1515                                        else {
1516                                            tracing::error!(
1517                                                symbol = %order_msg.symbol,
1518                                                "Instrument not found for order message"
1519                                            );
1520                                            continue;
1521                                        };
1522
1523                                        match parse_order_msg(
1524                                            &order_msg,
1525                                            &instrument,
1526                                            &self.order_type_cache,
1527                                        ) {
1528                                            Ok(report) => {
1529                                                // Cache the order type and symbol AFTER successful parse
1530                                                if let Some(client_order_id) = &order_msg.cl_ord_id
1531                                                {
1532                                                    let client_order_id =
1533                                                        ClientOrderId::new(client_order_id);
1534
1535                                                    if let Some(ord_type) = &order_msg.ord_type {
1536                                                        let order_type: OrderType =
1537                                                            (*ord_type).into();
1538                                                        self.order_type_cache
1539                                                            .insert(client_order_id, order_type);
1540                                                    }
1541
1542                                                    // Cache symbol for execution message routing
1543                                                    self.order_symbol_cache
1544                                                        .insert(client_order_id, order_msg.symbol);
1545                                                }
1546
1547                                                if is_terminal_order_status(report.order_status)
1548                                                    && let Some(client_id) = report.client_order_id
1549                                                {
1550                                                    self.order_type_cache.remove(&client_id);
1551                                                    self.order_symbol_cache.remove(&client_id);
1552                                                }
1553
1554                                                reports.push(report);
1555                                            }
1556                                            Err(e) => {
1557                                                tracing::error!(
1558                                                    error = %e,
1559                                                    symbol = %order_msg.symbol,
1560                                                    order_id = %order_msg.order_id,
1561                                                    time_in_force = ?order_msg.time_in_force,
1562                                                    "Failed to parse full order message - potential data loss"
1563                                                );
1564                                                // TODO: Add metric counter for parse failures
1565                                                continue;
1566                                            }
1567                                        }
1568                                    }
1569                                    OrderData::Update(msg) => {
1570                                        let Some(instrument) = self.get_instrument(&msg.symbol)
1571                                        else {
1572                                            tracing::error!(
1573                                                symbol = %msg.symbol,
1574                                                "Instrument not found for order update"
1575                                            );
1576                                            continue;
1577                                        };
1578
1579                                        // Populate cache for execution message routing (handles edge case where update arrives before full snapshot)
1580                                        if let Some(cl_ord_id) = &msg.cl_ord_id {
1581                                            let client_order_id = ClientOrderId::new(cl_ord_id);
1582                                            self.order_symbol_cache
1583                                                .insert(client_order_id, msg.symbol);
1584                                        }
1585
1586                                        if let Some(event) = parse_order_update_msg(
1587                                            &msg,
1588                                            &instrument,
1589                                            self.account_id,
1590                                        ) {
1591                                            return Some(NautilusWsMessage::OrderUpdated(event));
1592                                        } else {
1593                                            tracing::warn!(
1594                                                order_id = %msg.order_id,
1595                                                price = ?msg.price,
1596                                                "Skipped order update message (insufficient data)"
1597                                            );
1598                                        }
1599                                    }
1600                                }
1601                            }
1602
1603                            if reports.is_empty() {
1604                                continue;
1605                            }
1606
1607                            NautilusWsMessage::OrderStatusReports(reports)
1608                        }
1609                        BitmexTableMessage::Execution { data, .. } => {
1610                            let mut fills = Vec::with_capacity(data.len());
1611
1612                            for exec_msg in data {
1613                                // Try to get symbol, fall back to cache lookup if missing
1614                                let symbol_opt = if let Some(sym) = &exec_msg.symbol {
1615                                    Some(*sym)
1616                                } else if let Some(cl_ord_id) = &exec_msg.cl_ord_id {
1617                                    // Try to look up symbol from order_symbol_cache
1618                                    let client_order_id = ClientOrderId::new(cl_ord_id);
1619                                    self.order_symbol_cache
1620                                        .get(&client_order_id)
1621                                        .map(|r| *r.value())
1622                                } else {
1623                                    None
1624                                };
1625
1626                                let Some(symbol) = symbol_opt else {
1627                                    // Symbol missing - log appropriately based on exec type and whether we had clOrdID
1628                                    if let Some(cl_ord_id) = &exec_msg.cl_ord_id {
1629                                        if exec_msg.exec_type == Some(BitmexExecType::Trade) {
1630                                            tracing::warn!(
1631                                                cl_ord_id = %cl_ord_id,
1632                                                exec_id = ?exec_msg.exec_id,
1633                                                ord_rej_reason = ?exec_msg.ord_rej_reason,
1634                                                text = ?exec_msg.text,
1635                                                "Execution message missing symbol and not found in cache"
1636                                            );
1637                                        } else {
1638                                            tracing::debug!(
1639                                                cl_ord_id = %cl_ord_id,
1640                                                exec_id = ?exec_msg.exec_id,
1641                                                exec_type = ?exec_msg.exec_type,
1642                                                ord_rej_reason = ?exec_msg.ord_rej_reason,
1643                                                text = ?exec_msg.text,
1644                                                "Execution message missing symbol and not found in cache"
1645                                            );
1646                                        }
1647                                    } else {
1648                                        // CancelReject messages without symbol/clOrdID are expected when using
1649                                        // redundant cancel broadcasting - one cancel succeeds, others arrive late
1650                                        // and BitMEX responds with CancelReject but doesn't populate the fields
1651                                        if exec_msg.exec_type == Some(BitmexExecType::CancelReject)
1652                                        {
1653                                            tracing::debug!(
1654                                                exec_id = ?exec_msg.exec_id,
1655                                                order_id = ?exec_msg.order_id,
1656                                                "CancelReject message missing symbol/clOrdID (expected with redundant cancels)"
1657                                            );
1658                                        } else {
1659                                            tracing::warn!(
1660                                                exec_id = ?exec_msg.exec_id,
1661                                                order_id = ?exec_msg.order_id,
1662                                                exec_type = ?exec_msg.exec_type,
1663                                                ord_rej_reason = ?exec_msg.ord_rej_reason,
1664                                                text = ?exec_msg.text,
1665                                                "Execution message missing both symbol and clOrdID, cannot process"
1666                                            );
1667                                        }
1668                                    }
1669                                    continue;
1670                                };
1671
1672                                let Some(instrument) = self.get_instrument(&symbol) else {
1673                                    tracing::error!(
1674                                        symbol = %symbol,
1675                                        exec_id = ?exec_msg.exec_id,
1676                                        "Instrument not found for execution message"
1677                                    );
1678                                    continue;
1679                                };
1680
1681                                if let Some(fill) = parse_execution_msg(exec_msg, &instrument) {
1682                                    fills.push(fill);
1683                                }
1684                            }
1685
1686                            if fills.is_empty() {
1687                                continue;
1688                            }
1689                            NautilusWsMessage::FillReports(fills)
1690                        }
1691                        BitmexTableMessage::Position { data, .. } => {
1692                            if let Some(pos_msg) = data.into_iter().next() {
1693                                let Some(instrument) = self.get_instrument(&pos_msg.symbol) else {
1694                                    tracing::error!(
1695                                        symbol = %pos_msg.symbol,
1696                                        "Instrument not found for position message"
1697                                    );
1698                                    continue;
1699                                };
1700                                let report = parse_position_msg(pos_msg, &instrument);
1701                                NautilusWsMessage::PositionStatusReport(report)
1702                            } else {
1703                                continue;
1704                            }
1705                        }
1706                        BitmexTableMessage::Wallet { data, .. } => {
1707                            if let Some(wallet_msg) = data.into_iter().next() {
1708                                let account_state = parse_wallet_msg(wallet_msg, ts_init);
1709                                NautilusWsMessage::AccountState(account_state)
1710                            } else {
1711                                continue;
1712                            }
1713                        }
1714                        BitmexTableMessage::Margin { .. } => {
1715                            // Skip margin messages - BitMEX uses account-level cross-margin
1716                            // which doesn't map well to Nautilus's per-instrument margin model
1717                            continue;
1718                        }
1719                        BitmexTableMessage::Instrument { data, .. } => {
1720                            let ts_init = clock.get_time_ns();
1721                            let mut data_msgs = Vec::with_capacity(data.len());
1722
1723                            for msg in data {
1724                                let parsed =
1725                                    parse_instrument_msg(msg, &self.instruments_cache, ts_init);
1726                                data_msgs.extend(parsed);
1727                            }
1728
1729                            if data_msgs.is_empty() {
1730                                continue;
1731                            }
1732                            NautilusWsMessage::Data(data_msgs)
1733                        }
1734                        BitmexTableMessage::Funding { data, .. } => {
1735                            let ts_init = clock.get_time_ns();
1736                            let mut funding_updates = Vec::with_capacity(data.len());
1737
1738                            for msg in data {
1739                                if let Some(parsed) = parse_funding_msg(msg, ts_init) {
1740                                    funding_updates.push(parsed);
1741                                }
1742                            }
1743
1744                            if !funding_updates.is_empty() {
1745                                NautilusWsMessage::FundingRateUpdates(funding_updates)
1746                            } else {
1747                                continue;
1748                            }
1749                        }
1750                        _ => {
1751                            // Other message types not yet implemented
1752                            tracing::warn!("Unhandled table message type: {table_msg:?}");
1753                            continue;
1754                        }
1755                    });
1756                }
1757                BitmexWsMessage::Welcome { .. } | BitmexWsMessage::Error { .. } => continue,
1758            }
1759        }
1760
1761        None
1762    }
1763
1764    fn handle_subscription_message(
1765        &self,
1766        success: bool,
1767        subscribe: Option<&String>,
1768        request: Option<&BitmexHttpRequest>,
1769        error: Option<&str>,
1770    ) {
1771        if let Some(req) = request {
1772            if req
1773                .op
1774                .eq_ignore_ascii_case(BitmexWsAuthAction::AuthKeyExpires.as_ref())
1775            {
1776                if success {
1777                    tracing::info!("Authenticated BitMEX WebSocket session");
1778                    self.auth_tracker.succeed();
1779                } else {
1780                    let reason = error.unwrap_or("Authentication rejected").to_string();
1781                    tracing::error!(error = %reason, "Authentication failed");
1782                    self.auth_tracker.fail(reason);
1783                }
1784                return;
1785            }
1786
1787            if req
1788                .op
1789                .eq_ignore_ascii_case(BitmexWsOperation::Subscribe.as_ref())
1790            {
1791                self.handle_subscription_ack(success, request, subscribe, error);
1792                return;
1793            }
1794
1795            if req
1796                .op
1797                .eq_ignore_ascii_case(BitmexWsOperation::Unsubscribe.as_ref())
1798            {
1799                self.handle_unsubscribe_ack(success, request, subscribe, error);
1800                return;
1801            }
1802        }
1803
1804        if subscribe.is_some() {
1805            self.handle_subscription_ack(success, request, subscribe, error);
1806            return;
1807        }
1808
1809        if let Some(error) = error {
1810            tracing::warn!(
1811                success = success,
1812                error = error,
1813                "Unhandled subscription control message"
1814            );
1815        }
1816    }
1817
1818    fn handle_subscription_ack(
1819        &self,
1820        success: bool,
1821        request: Option<&BitmexHttpRequest>,
1822        subscribe: Option<&String>,
1823        error: Option<&str>,
1824    ) {
1825        let topics = Self::topics_from_request(request, subscribe);
1826
1827        if topics.is_empty() {
1828            tracing::debug!("Subscription acknowledgement without topics");
1829            return;
1830        }
1831
1832        for topic in topics {
1833            if success {
1834                self.subscriptions.confirm(topic);
1835                tracing::debug!(topic = topic, "Subscription confirmed");
1836            } else {
1837                self.subscriptions.mark_failure(topic);
1838                let reason = error.unwrap_or("Subscription rejected");
1839                tracing::error!(topic = topic, error = reason, "Subscription failed");
1840            }
1841        }
1842    }
1843
1844    fn handle_unsubscribe_ack(
1845        &self,
1846        success: bool,
1847        request: Option<&BitmexHttpRequest>,
1848        subscribe: Option<&String>,
1849        error: Option<&str>,
1850    ) {
1851        let topics = Self::topics_from_request(request, subscribe);
1852
1853        if topics.is_empty() {
1854            tracing::debug!("Unsubscription acknowledgement without topics");
1855            return;
1856        }
1857
1858        for topic in topics {
1859            if success {
1860                tracing::debug!(topic = topic, "Unsubscription confirmed");
1861                self.subscriptions.clear_pending(topic);
1862            } else {
1863                let reason = error.unwrap_or("Unsubscription rejected");
1864                tracing::error!(topic = topic, error = reason, "Unsubscription failed");
1865                self.subscriptions.confirm(topic);
1866            }
1867        }
1868    }
1869
1870    fn topics_from_request<'a>(
1871        request: Option<&'a BitmexHttpRequest>,
1872        fallback: Option<&'a String>,
1873    ) -> Vec<&'a str> {
1874        if let Some(req) = request
1875            && !req.args.is_empty()
1876        {
1877            return req.args.iter().filter_map(|arg| arg.as_str()).collect();
1878        }
1879
1880        fallback.into_iter().map(|topic| topic.as_str()).collect()
1881    }
1882}
1883
1884fn is_terminal_order_status(status: OrderStatus) -> bool {
1885    matches!(
1886        status,
1887        OrderStatus::Canceled | OrderStatus::Expired | OrderStatus::Rejected | OrderStatus::Filled,
1888    )
1889}
1890
1891////////////////////////////////////////////////////////////////////////////////
1892// Tests
1893////////////////////////////////////////////////////////////////////////////////
1894
1895#[cfg(test)]
1896mod tests {
1897    use ahash::AHashSet;
1898    use rstest::rstest;
1899    use ustr::Ustr;
1900
1901    use super::*;
1902
1903    #[test]
1904    fn test_is_heartbeat_message_detection() {
1905        assert!(BitmexFeedHandler::is_heartbeat_message("{\"op\":\"ping\"}"));
1906        assert!(BitmexFeedHandler::is_heartbeat_message("{\"op\":\"pong\"}"));
1907        assert!(!BitmexFeedHandler::is_heartbeat_message(
1908            "{\"op\":\"subscribe\",\"args\":[\"trade:XBTUSD\"]}"
1909        ));
1910    }
1911
1912    #[rstest]
1913    fn test_reconnect_topics_restoration_logic() {
1914        // Create real client with credentials
1915        let client = BitmexWebSocketClient::new(
1916            Some("ws://test.com".to_string()),
1917            Some("test_key".to_string()),
1918            Some("test_secret".to_string()),
1919            Some(AccountId::new("BITMEX-TEST")),
1920            None,
1921        )
1922        .unwrap();
1923
1924        // Populate subscriptions like they would be during normal operation
1925        let subs = client.subscriptions.confirmed();
1926        subs.insert(BitmexWsTopic::Trade.as_ref().to_string(), {
1927            let mut set = AHashSet::new();
1928            set.insert(Ustr::from("XBTUSD"));
1929            set.insert(Ustr::from("ETHUSD"));
1930            set
1931        });
1932
1933        subs.insert(BitmexWsTopic::OrderBookL2.as_ref().to_string(), {
1934            let mut set = AHashSet::new();
1935            set.insert(Ustr::from("XBTUSD"));
1936            set
1937        });
1938
1939        // Private channels (no symbols)
1940        subs.insert(
1941            BitmexWsAuthChannel::Order.as_ref().to_string(),
1942            AHashSet::new(),
1943        );
1944        subs.insert(
1945            BitmexWsAuthChannel::Position.as_ref().to_string(),
1946            AHashSet::new(),
1947        );
1948
1949        // Test the actual reconnection topic building logic from lines 258-268
1950        let mut topics_to_restore = Vec::new();
1951        for entry in subs.iter() {
1952            let (channel, symbols) = entry.pair();
1953            if symbols.is_empty() {
1954                topics_to_restore.push(channel.clone());
1955            } else {
1956                for symbol in symbols.iter() {
1957                    topics_to_restore.push(format!("{channel}:{symbol}"));
1958                }
1959            }
1960        }
1961
1962        // Verify it builds the correct restoration topics
1963        assert!(topics_to_restore.contains(&format!("{}:XBTUSD", BitmexWsTopic::Trade.as_ref())));
1964        assert!(topics_to_restore.contains(&format!("{}:ETHUSD", BitmexWsTopic::Trade.as_ref())));
1965        assert!(
1966            topics_to_restore.contains(&format!("{}:XBTUSD", BitmexWsTopic::OrderBookL2.as_ref()))
1967        );
1968        assert!(topics_to_restore.contains(&BitmexWsAuthChannel::Order.as_ref().to_string()));
1969        assert!(topics_to_restore.contains(&BitmexWsAuthChannel::Position.as_ref().to_string()));
1970        assert_eq!(topics_to_restore.len(), 5);
1971    }
1972
1973    #[rstest]
1974    fn test_reconnect_auth_message_building() {
1975        // Test with credentials
1976        let client_with_creds = BitmexWebSocketClient::new(
1977            Some("ws://test.com".to_string()),
1978            Some("test_key".to_string()),
1979            Some("test_secret".to_string()),
1980            Some(AccountId::new("BITMEX-TEST")),
1981            None,
1982        )
1983        .unwrap();
1984
1985        // Test the actual auth message building logic from lines 220-228
1986        if let Some(cred) = &client_with_creds.credential {
1987            let expires = (chrono::Utc::now() + chrono::Duration::seconds(30)).timestamp();
1988            let signature = cred.sign("GET", "/realtime", expires, "");
1989
1990            let auth_message = BitmexAuthentication {
1991                op: BitmexWsAuthAction::AuthKeyExpires,
1992                args: (cred.api_key.to_string(), expires, signature),
1993            };
1994
1995            // Verify auth message structure
1996            assert_eq!(auth_message.op, BitmexWsAuthAction::AuthKeyExpires);
1997            assert_eq!(auth_message.args.0, "test_key");
1998            assert!(auth_message.args.1 > 0); // expires should be positive
1999            assert!(!auth_message.args.2.is_empty()); // signature should exist
2000        } else {
2001            panic!("Client should have credentials");
2002        }
2003
2004        // Test without credentials
2005        let client_no_creds = BitmexWebSocketClient::new(
2006            Some("ws://test.com".to_string()),
2007            None,
2008            None,
2009            Some(AccountId::new("BITMEX-TEST")),
2010            None,
2011        )
2012        .unwrap();
2013
2014        assert!(client_no_creds.credential.is_none());
2015    }
2016
2017    #[rstest]
2018    fn test_subscription_state_after_unsubscribe() {
2019        let client = BitmexWebSocketClient::new(
2020            Some("ws://test.com".to_string()),
2021            Some("test_key".to_string()),
2022            Some("test_secret".to_string()),
2023            Some(AccountId::new("BITMEX-TEST")),
2024            None,
2025        )
2026        .unwrap();
2027
2028        // Set up initial subscriptions
2029        let subs = client.subscriptions.confirmed();
2030        subs.insert(BitmexWsTopic::Trade.as_ref().to_string(), {
2031            let mut set = AHashSet::new();
2032            set.insert(Ustr::from("XBTUSD"));
2033            set.insert(Ustr::from("ETHUSD"));
2034            set
2035        });
2036
2037        subs.insert(BitmexWsTopic::OrderBookL2.as_ref().to_string(), {
2038            let mut set = AHashSet::new();
2039            set.insert(Ustr::from("XBTUSD"));
2040            set
2041        });
2042
2043        // Simulate unsubscribe logic (like from unsubscribe() method lines 586-599)
2044        let topic = format!("{}:ETHUSD", BitmexWsTopic::Trade.as_ref());
2045        if let Some((channel, symbol)) = topic.split_once(':')
2046            && let Some(mut entry) = subs.get_mut(channel)
2047        {
2048            entry.remove(&Ustr::from(symbol));
2049            if entry.is_empty() {
2050                drop(entry);
2051                subs.remove(channel);
2052            }
2053        }
2054
2055        // Build restoration topics after unsubscribe
2056        let mut topics_to_restore = Vec::new();
2057        for entry in subs.iter() {
2058            let (channel, symbols) = entry.pair();
2059            if symbols.is_empty() {
2060                topics_to_restore.push(channel.clone());
2061            } else {
2062                for symbol in symbols.iter() {
2063                    topics_to_restore.push(format!("{channel}:{symbol}"));
2064                }
2065            }
2066        }
2067
2068        // Should have XBTUSD trade but not ETHUSD trade
2069        let trade_xbt = format!("{}:XBTUSD", BitmexWsTopic::Trade.as_ref());
2070        let trade_eth = format!("{}:ETHUSD", BitmexWsTopic::Trade.as_ref());
2071        let book_xbt = format!("{}:XBTUSD", BitmexWsTopic::OrderBookL2.as_ref());
2072
2073        assert!(topics_to_restore.contains(&trade_xbt));
2074        assert!(!topics_to_restore.contains(&trade_eth));
2075        assert!(topics_to_restore.contains(&book_xbt));
2076        assert_eq!(topics_to_restore.len(), 2);
2077    }
2078}