nautilus_bitmex/websocket/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides the WebSocket client integration for the [BitMEX](https://bitmex.com) WebSocket API.
17//!
18//! This module defines and implements a [`BitmexWebSocketClient`] for
19//! connecting to BitMEX WebSocket streams. It handles authentication (when credentials
20//! are provided), manages subscriptions to market data and account update channels,
21//! and parses incoming messages into structured Nautilus domain objects.
22
23use std::{
24    collections::HashSet,
25    sync::{
26        Arc,
27        atomic::{AtomicBool, Ordering},
28    },
29};
30
31use ahash::{AHashMap, AHashSet};
32use dashmap::DashMap;
33use futures_util::Stream;
34use nautilus_common::runtime::get_runtime;
35use nautilus_core::{
36    consts::NAUTILUS_USER_AGENT, env::get_env_var, time::get_atomic_clock_realtime,
37};
38use nautilus_model::{
39    data::{Data, bar::BarType},
40    enums::{OrderStatus, OrderType},
41    identifiers::{AccountId, ClientOrderId, InstrumentId},
42    instruments::{Instrument, InstrumentAny},
43};
44use nautilus_network::{
45    RECONNECTED,
46    websocket::{PingHandler, WebSocketClient, WebSocketConfig, channel_message_handler},
47};
48use reqwest::header::USER_AGENT;
49use tokio::{sync::RwLock, time::Duration};
50use tokio_tungstenite::tungstenite::Message;
51use ustr::Ustr;
52
53use super::{
54    cache::QuoteCache,
55    enums::{
56        BitmexAction, BitmexWsAuthAction, BitmexWsAuthChannel, BitmexWsOperation, BitmexWsTopic,
57    },
58    error::BitmexWsError,
59    messages::{
60        BitmexAuthentication, BitmexHttpRequest, BitmexSubscription, BitmexTableMessage,
61        BitmexWsMessage, NautilusWsMessage, OrderData,
62    },
63    parse::{
64        is_index_symbol, parse_book_msg_vec, parse_book10_msg_vec, parse_order_update_msg,
65        parse_trade_bin_msg_vec, parse_trade_msg_vec, parse_wallet_msg, topic_from_bar_spec,
66    },
67};
68use crate::{
69    common::{consts::BITMEX_WS_URL, credential::Credential, enums::BitmexExecType},
70    websocket::{
71        auth::{AUTHENTICATION_TIMEOUT_SECS, AuthResultReceiver, AuthTracker},
72        parse::{
73            parse_execution_msg, parse_funding_msg, parse_instrument_msg, parse_order_msg,
74            parse_position_msg,
75        },
76        subscription::SubscriptionState,
77    },
78};
79
80/// Provides a WebSocket client for connecting to the [BitMEX](https://bitmex.com) real-time API.
81///
82/// Key runtime patterns:
83/// - Authentication handshakes are managed by the internal auth tracker, ensuring resubscriptions
84///   occur only after BitMEX acknowledges `authKey` messages.
85/// - The subscription state maintains pending and confirmed topics so reconnection replay is
86///   deterministic and per-topic errors are surfaced.
87#[derive(Clone, Debug)]
88#[cfg_attr(
89    feature = "python",
90    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.adapters")
91)]
92pub struct BitmexWebSocketClient {
93    url: String,
94    credential: Option<Credential>,
95    heartbeat: Option<u64>,
96    inner: Arc<RwLock<Option<WebSocketClient>>>,
97    rx: Option<Arc<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>>>,
98    signal: Arc<AtomicBool>,
99    task_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
100    account_id: AccountId,
101    auth_tracker: AuthTracker,
102    subscriptions: SubscriptionState,
103    instruments_cache: Arc<AHashMap<Ustr, InstrumentAny>>,
104    order_type_cache: Arc<DashMap<ClientOrderId, OrderType>>,
105    order_symbol_cache: Arc<DashMap<ClientOrderId, Ustr>>,
106}
107
108impl BitmexWebSocketClient {
109    /// Creates a new [`BitmexWebSocketClient`] instance.
110    ///
111    /// # Errors
112    ///
113    /// Returns an error if only one of `api_key` or `api_secret` is provided (both or neither required).
114    pub fn new(
115        url: Option<String>,
116        api_key: Option<String>,
117        api_secret: Option<String>,
118        account_id: Option<AccountId>,
119        heartbeat: Option<u64>,
120    ) -> anyhow::Result<Self> {
121        let credential = match (api_key, api_secret) {
122            (Some(key), Some(secret)) => Some(Credential::new(key, secret)),
123            (None, None) => None,
124            _ => anyhow::bail!("Both `api_key` and `api_secret` must be provided together"),
125        };
126
127        let account_id = account_id.unwrap_or(AccountId::from("BITMEX-master"));
128
129        Ok(Self {
130            url: url.unwrap_or(BITMEX_WS_URL.to_string()),
131            credential,
132            heartbeat,
133            inner: Arc::new(RwLock::new(None)),
134            rx: None,
135            signal: Arc::new(AtomicBool::new(false)),
136            task_handle: None,
137            account_id,
138            auth_tracker: AuthTracker::new(),
139            subscriptions: SubscriptionState::new(),
140            instruments_cache: Arc::new(AHashMap::new()),
141            order_type_cache: Arc::new(DashMap::new()),
142            order_symbol_cache: Arc::new(DashMap::new()),
143        })
144    }
145
146    /// Creates a new authenticated [`BitmexWebSocketClient`] using environment variables.
147    ///
148    /// # Errors
149    ///
150    /// Returns an error if environment variables are not set or credentials are invalid.
151    pub fn from_env() -> anyhow::Result<Self> {
152        let url = get_env_var("BITMEX_WS_URL")?;
153        let api_key = get_env_var("BITMEX_API_KEY")?;
154        let api_secret = get_env_var("BITMEX_API_SECRET")?;
155
156        Self::new(Some(url), Some(api_key), Some(api_secret), None, None)
157    }
158
159    /// Returns the websocket url being used by the client.
160    #[must_use]
161    pub const fn url(&self) -> &str {
162        self.url.as_str()
163    }
164
165    /// Returns the public API key being used by the client.
166    #[must_use]
167    pub fn api_key(&self) -> Option<&str> {
168        self.credential.as_ref().map(|c| c.api_key.as_str())
169    }
170
171    /// Returns a value indicating whether the client is active.
172    #[must_use]
173    pub fn is_active(&self) -> bool {
174        match self.inner.try_read() {
175            Ok(guard) => match &*guard {
176                Some(inner) => inner.is_active(),
177                None => false,
178            },
179            Err(_) => false,
180        }
181    }
182
183    /// Returns a value indicating whether the client is closed.
184    #[must_use]
185    pub fn is_closed(&self) -> bool {
186        match self.inner.try_read() {
187            Ok(guard) => match &*guard {
188                Some(inner) => inner.is_closed(),
189                None => true,
190            },
191            Err(_) => true,
192        }
193    }
194
195    /// Sets the account ID.
196    pub fn set_account_id(&mut self, account_id: AccountId) {
197        self.account_id = account_id;
198    }
199
200    /// Initialize the instruments cache with the given `instruments`.
201    pub fn initialize_instruments_cache(&mut self, instruments: Vec<InstrumentAny>) {
202        let mut instruments_cache: AHashMap<Ustr, InstrumentAny> = AHashMap::new();
203        let mut count = 0;
204
205        log::info!("Initializing BitMEX instrument cache...");
206
207        for inst in instruments {
208            let symbol = inst.symbol().inner();
209            instruments_cache.insert(symbol, inst.clone());
210            log::debug!("Cached instrument: {symbol}");
211            count += 1;
212        }
213
214        self.instruments_cache = Arc::new(instruments_cache);
215
216        log::info!("BitMEX instrument cache initialized with {count} instruments");
217    }
218
219    /// Connect to the BitMEX WebSocket server.
220    ///
221    /// # Errors
222    ///
223    /// Returns an error if the WebSocket connection fails or authentication fails (if credentials provided).
224    ///
225    /// # Panics
226    ///
227    /// Panics if subscription or authentication messages fail to serialize to JSON.
228    pub async fn connect(&mut self) -> Result<(), BitmexWsError> {
229        let reader = self.connect_inner().await?;
230
231        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<NautilusWsMessage>();
232        self.rx = Some(Arc::new(rx));
233        let signal = self.signal.clone();
234
235        let account_id = self.account_id;
236        let inner_client = self.inner.clone();
237        let credential = self.credential.clone();
238        let auth_tracker = self.auth_tracker.clone();
239        let subscriptions = self.subscriptions.clone();
240        let instruments_cache = self.instruments_cache.clone();
241        let order_type_cache = self.order_type_cache.clone();
242        let order_symbol_cache = self.order_symbol_cache.clone();
243
244        let stream_handle = get_runtime().spawn(async move {
245            let mut handler = BitmexWsMessageHandler::new(
246                reader,
247                signal,
248                tx,
249                account_id,
250                auth_tracker.clone(),
251                subscriptions.clone(),
252                instruments_cache,
253                order_type_cache,
254                order_symbol_cache,
255            );
256
257            // Run message processing with reconnection handling
258            loop {
259                match handler.next().await {
260                    Some(NautilusWsMessage::Reconnected) => {
261                        log::info!("Reconnecting WebSocket");
262
263                        let has_client = {
264                            let guard = inner_client.read().await;
265                            guard.is_some()
266                        };
267
268                        if !has_client {
269                            log::warn!("Reconnection signaled but WebSocket client unavailable");
270                            continue;
271                        }
272
273                        let confirmed = subscriptions.confirmed();
274                        let pending = subscriptions.pending();
275                        let mut restore_set: HashSet<String> = HashSet::new();
276
277                        let mut collect_topics = |map: &DashMap<String, AHashSet<Ustr>>| {
278                            for entry in map.iter() {
279                                let (channel, symbols) = entry.pair();
280
281                                if channel == BitmexWsTopic::Instrument.as_ref() {
282                                    continue;
283                                }
284
285                                if symbols.is_empty() {
286                                    restore_set.insert(channel.clone());
287                                } else {
288                                    for symbol in symbols.iter() {
289                                        restore_set.insert(format!("{channel}:{symbol}"));
290                                    }
291                                }
292                            }
293                        };
294
295                        collect_topics(&confirmed);
296                        collect_topics(&pending);
297
298                        let mut topics_to_restore: Vec<String> = restore_set.into_iter().collect();
299                        topics_to_restore.sort();
300
301                        let auth_rx_opt = if let Some(cred) = &credential {
302                            match Self::issue_authentication_request(
303                                &inner_client,
304                                cred,
305                                &auth_tracker,
306                            )
307                            .await
308                            {
309                                Ok(rx) => Some(rx),
310                                Err(e) => {
311                                    log::error!(
312                                        "Failed to send re-authentication request after reconnection: {e}"
313                                    );
314                                    continue;
315                                }
316                            }
317                        } else {
318                            None
319                        };
320
321                        let inner_for_task = inner_client.clone();
322                        let state_for_task = subscriptions.clone();
323                        let auth_tracker_for_task = auth_tracker.clone();
324                        let auth_rx_for_task = auth_rx_opt;
325                        let topics_to_restore_clone = topics_to_restore.clone();
326                        get_runtime().spawn(async move {
327                            if let Some(rx) = auth_rx_for_task {
328                                if let Err(e) = auth_tracker_for_task
329                                    .wait_for_result(
330                                        Duration::from_secs(AUTHENTICATION_TIMEOUT_SECS),
331                                        rx,
332                                    )
333                                    .await
334                                {
335                                    log::error!("Authentication after reconnection failed: {e}");
336                                    return;
337                                }
338                                log::info!("Re-authenticated after reconnection");
339                            }
340
341                            let mut all_topics =
342                                Vec::with_capacity(1 + topics_to_restore_clone.len());
343                            all_topics.push(BitmexWsTopic::Instrument.as_ref().to_string());
344                            all_topics.extend(topics_to_restore_clone.iter().cloned());
345
346                            for topic in &all_topics {
347                                state_for_task.mark_subscribe(topic.as_str());
348                            }
349
350                            if let Err(e) = Self::send_topics(
351                                &inner_for_task,
352                                BitmexWsOperation::Subscribe,
353                                all_topics.clone(),
354                            )
355                            .await
356                            {
357                                log::error!(
358                                    "Failed to restore subscriptions after reconnection: {e}"
359                                );
360                                // Leave topics pending so the next reconnect attempt retries them.
361                            } else {
362                                log::info!(
363                                    "Restored {} subscriptions after reconnection",
364                                    all_topics.len()
365                                );
366                            }
367                        });
368                    }
369                    Some(msg) => {
370                        if let Err(e) = handler.tx.send(msg) {
371                            tracing::error!("Error sending message: {e}");
372                            break;
373                        }
374                    }
375                    None => {
376                        // Stream ended - check if it's a stop signal
377                        if handler.handler.signal.load(Ordering::Relaxed) {
378                            tracing::debug!("Stop signal received, ending message processing");
379                            break;
380                        }
381                        // Otherwise it's an unexpected stream end
382                        tracing::warn!("WebSocket stream ended unexpectedly");
383                        break;
384                    }
385                }
386            }
387        });
388
389        self.task_handle = Some(Arc::new(stream_handle));
390
391        if self.credential.is_some() {
392            self.authenticate().await?;
393        }
394
395        {
396            let inner_guard = self.inner.read().await;
397            if let Some(inner) = &*inner_guard {
398                self.subscriptions
399                    .mark_subscribe(BitmexWsTopic::Instrument.as_ref());
400
401                let subscribe_msg = BitmexSubscription {
402                    op: BitmexWsOperation::Subscribe,
403                    args: vec![Ustr::from(BitmexWsTopic::Instrument.as_ref())],
404                };
405
406                match serde_json::to_string(&subscribe_msg) {
407                    Ok(subscribe_json) => {
408                        if let Err(e) = inner.send_text(subscribe_json, None).await {
409                            log::error!("Failed to subscribe to instruments: {e}");
410                        } else {
411                            log::debug!("Subscribed to all instruments");
412                        }
413                    }
414                    Err(e) => {
415                        tracing::error!(error = %e, "Failed to serialize resubscribe message");
416                    }
417                }
418            }
419        }
420
421        Ok(())
422    }
423
424    /// Connect to the WebSocket and return a message receiver.
425    ///
426    /// # Errors
427    ///
428    /// Returns an error if the WebSocket connection fails or if authentication fails (when credentials are provided).
429    async fn connect_inner(
430        &mut self,
431    ) -> Result<tokio::sync::mpsc::UnboundedReceiver<Message>, BitmexWsError> {
432        let (message_handler, rx) = channel_message_handler();
433
434        let inner_for_ping = self.inner.clone();
435        let ping_handler: PingHandler = Arc::new(move |payload: Vec<u8>| {
436            let inner = inner_for_ping.clone();
437
438            get_runtime().spawn(async move {
439                let len = payload.len();
440                let guard = inner.read().await;
441
442                if let Some(client) = guard.as_ref() {
443                    if let Err(e) = client.send_pong(payload).await {
444                        tracing::warn!(error = %e, "Failed to send pong frame");
445                    } else {
446                        tracing::trace!("Sent pong frame ({len} bytes)");
447                    }
448                } else {
449                    tracing::debug!("Ping received with no active websocket client");
450                }
451            });
452        });
453
454        let config = WebSocketConfig {
455            url: self.url.clone(),
456            headers: vec![(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())],
457            heartbeat: self.heartbeat,
458            heartbeat_msg: None,
459            message_handler: Some(message_handler),
460            ping_handler: Some(ping_handler),
461            reconnect_timeout_ms: Some(5_000),
462            reconnect_delay_initial_ms: None, // Use default
463            reconnect_delay_max_ms: None,     // Use default
464            reconnect_backoff_factor: None,   // Use default
465            reconnect_jitter_ms: None,        // Use default
466        };
467
468        let keyed_quotas = vec![];
469        let client = WebSocketClient::connect(
470            config,
471            None, // post_reconnection
472            keyed_quotas,
473            None, // default_quota
474        )
475        .await
476        .map_err(|e| BitmexWsError::ClientError(e.to_string()))?;
477
478        {
479            let mut inner_guard = self.inner.write().await;
480            *inner_guard = Some(client);
481        }
482
483        Ok(rx)
484    }
485
486    async fn issue_authentication_request(
487        inner: &Arc<RwLock<Option<WebSocketClient>>>,
488        credential: &Credential,
489        tracker: &AuthTracker,
490    ) -> Result<AuthResultReceiver, BitmexWsError> {
491        let receiver = tracker.begin();
492
493        let expires = (chrono::Utc::now() + chrono::Duration::seconds(30)).timestamp();
494        let signature = credential.sign("GET", "/realtime", expires, "");
495
496        let auth_message = BitmexAuthentication {
497            op: BitmexWsAuthAction::AuthKeyExpires,
498            args: (credential.api_key.to_string(), expires, signature),
499        };
500
501        let auth_json = serde_json::to_string(&auth_message).map_err(|e| {
502            let msg = format!("Failed to serialize auth message: {e}");
503            tracker.fail(msg.clone());
504            BitmexWsError::AuthenticationError(msg)
505        })?;
506
507        {
508            let inner_guard = inner.read().await;
509            let client = inner_guard.as_ref().ok_or_else(|| {
510                tracker.fail("Cannot authenticate: not connected");
511                BitmexWsError::AuthenticationError("Cannot authenticate: not connected".to_string())
512            })?;
513
514            client.send_text(auth_json, None).await.map_err(|e| {
515                let error = e.to_string();
516                tracker.fail(error.clone());
517                BitmexWsError::AuthenticationError(error)
518            })?;
519        }
520
521        Ok(receiver)
522    }
523
524    /// Authenticate the WebSocket connection using the provided credentials.
525    ///
526    /// # Errors
527    ///
528    /// Returns an error if the WebSocket is not connected, if authentication fails,
529    /// or if credentials are not available.
530    async fn authenticate(&self) -> Result<(), BitmexWsError> {
531        let credential = match &self.credential {
532            Some(credential) => credential,
533            None => {
534                return Err(BitmexWsError::AuthenticationError(
535                    "API credentials not available to authenticate".to_string(),
536                ));
537            }
538        };
539
540        let rx =
541            Self::issue_authentication_request(&self.inner, credential, &self.auth_tracker).await?;
542        self.auth_tracker
543            .wait_for_result(Duration::from_secs(AUTHENTICATION_TIMEOUT_SECS), rx)
544            .await
545    }
546
547    /// Wait until the WebSocket connection is active.
548    ///
549    /// # Errors
550    ///
551    /// Returns an error if the connection times out.
552    pub async fn wait_until_active(&self, timeout_secs: f64) -> Result<(), BitmexWsError> {
553        let timeout = tokio::time::Duration::from_secs_f64(timeout_secs);
554
555        tokio::time::timeout(timeout, async {
556            while !self.is_active() {
557                tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
558            }
559        })
560        .await
561        .map_err(|_| {
562            BitmexWsError::ClientError(format!(
563                "WebSocket connection timeout after {timeout_secs} seconds"
564            ))
565        })?;
566
567        Ok(())
568    }
569
570    /// Provides the internal stream as a channel-based stream.
571    ///
572    /// # Panics
573    ///
574    /// This function panics:
575    /// - If the websocket is not connected.
576    /// - If `stream` has already been called somewhere else (stream receiver is then taken).
577    pub fn stream(&mut self) -> impl Stream<Item = NautilusWsMessage> + use<> {
578        let rx = self
579            .rx
580            .take()
581            .expect("Stream receiver already taken or not connected");
582        let mut rx = Arc::try_unwrap(rx).expect("Cannot take ownership - other references exist");
583        async_stream::stream! {
584            while let Some(msg) = rx.recv().await {
585                yield msg;
586            }
587        }
588    }
589
590    /// Closes the client.
591    ///
592    /// # Errors
593    ///
594    /// Returns an error if the WebSocket is not connected or if closing fails.
595    ///
596    /// # Panics
597    ///
598    /// Panics if the task handle cannot be unwrapped (should never happen in normal usage).
599    pub async fn close(&mut self) -> Result<(), BitmexWsError> {
600        log::debug!("Starting close process");
601
602        self.signal.store(true, Ordering::Relaxed);
603
604        {
605            let inner_guard = self.inner.read().await;
606            if let Some(inner) = &*inner_guard {
607                log::debug!("Disconnecting websocket");
608
609                match tokio::time::timeout(Duration::from_secs(3), inner.disconnect()).await {
610                    Ok(()) => log::debug!("Websocket disconnected successfully"),
611                    Err(_) => {
612                        log::warn!(
613                            "Timeout waiting for websocket disconnect, continuing with cleanup"
614                        );
615                    }
616                }
617            } else {
618                log::debug!("No active connection to disconnect");
619            }
620        }
621
622        // Clean up task handle with timeout
623        if let Some(task_handle) = self.task_handle.take() {
624            match Arc::try_unwrap(task_handle) {
625                Ok(handle) => {
626                    log::debug!("Waiting for task handle to complete");
627                    match tokio::time::timeout(Duration::from_secs(2), handle).await {
628                        Ok(Ok(())) => log::debug!("Task handle completed successfully"),
629                        Ok(Err(e)) => log::error!("Task handle encountered an error: {e:?}"),
630                        Err(_) => {
631                            log::warn!(
632                                "Timeout waiting for task handle, task may still be running"
633                            );
634                            // The task will be dropped and should clean up automatically
635                        }
636                    }
637                }
638                Err(arc_handle) => {
639                    log::debug!(
640                        "Cannot take ownership of task handle - other references exist, aborting task"
641                    );
642                    arc_handle.abort();
643                }
644            }
645        } else {
646            log::debug!("No task handle to await");
647        }
648
649        log::debug!("Closed");
650
651        Ok(())
652    }
653
654    async fn send_topics(
655        inner: &Arc<RwLock<Option<WebSocketClient>>>,
656        op: BitmexWsOperation,
657        topics: Vec<String>,
658    ) -> Result<(), BitmexWsError> {
659        if topics.is_empty() {
660            return Ok(());
661        }
662
663        let message = BitmexSubscription {
664            op,
665            args: topics
666                .iter()
667                .map(|topic| Ustr::from(topic.as_str()))
668                .collect(),
669        };
670
671        let op_name = message.op.as_ref().to_string();
672        let payload = serde_json::to_string(&message).map_err(|e| {
673            BitmexWsError::SubscriptionError(format!("Failed to serialize {op_name} message: {e}"))
674        })?;
675
676        let inner_guard = inner.read().await;
677        if let Some(client) = &*inner_guard {
678            client
679                .send_text(payload, None)
680                .await
681                .map_err(|e| BitmexWsError::SubscriptionError(e.to_string()))?;
682        } else {
683            log::error!("Cannot send {op_name} message: not connected");
684        }
685
686        Ok(())
687    }
688
689    /// Subscribe to the specified topics.
690    ///
691    /// # Errors
692    ///
693    /// Returns an error if the WebSocket is not connected or if sending the subscription message fails.
694    ///
695    /// # Panics
696    ///
697    /// Panics if serialization of WebSocket messages fails (should never happen).
698    pub async fn subscribe(&self, topics: Vec<String>) -> Result<(), BitmexWsError> {
699        log::debug!("Subscribing to topics: {topics:?}");
700
701        for topic in &topics {
702            self.subscriptions.mark_subscribe(topic.as_str());
703        }
704
705        Self::send_topics(&self.inner, BitmexWsOperation::Subscribe, topics).await
706    }
707
708    /// Unsubscribe from the specified topics.
709    ///
710    /// # Errors
711    ///
712    /// Returns an error if the WebSocket is not connected or if sending the unsubscription message fails.
713    async fn unsubscribe(&self, topics: Vec<String>) -> Result<(), BitmexWsError> {
714        log::debug!("Attempting to unsubscribe from topics: {topics:?}");
715
716        if self.signal.load(Ordering::Relaxed) {
717            log::debug!("Shutdown signal detected, skipping unsubscribe");
718            return Ok(());
719        }
720
721        for topic in &topics {
722            self.subscriptions.mark_unsubscribe(topic.as_str());
723        }
724
725        let result = Self::send_topics(&self.inner, BitmexWsOperation::Unsubscribe, topics).await;
726        if let Err(e) = result {
727            tracing::debug!(error = %e, "Failed to send unsubscribe message");
728        }
729        Ok(())
730    }
731
732    /// Get the current number of active subscriptions.
733    #[must_use]
734    pub fn subscription_count(&self) -> usize {
735        self.subscriptions.len()
736    }
737
738    pub fn get_subscriptions(&self, instrument_id: InstrumentId) -> Vec<String> {
739        let symbol = instrument_id.symbol.inner();
740        let confirmed = self.subscriptions.confirmed();
741        let mut channels = Vec::with_capacity(confirmed.len());
742
743        for entry in confirmed.iter() {
744            let (channel, symbols) = entry.pair();
745            if symbols.contains(&symbol) {
746                // Return the full topic string (e.g., "orderBookL2:XBTUSD")
747                channels.push(format!("{channel}:{symbol}"));
748            } else if symbols.is_empty()
749                && (channel == BitmexWsAuthChannel::Execution.as_ref()
750                    || channel == BitmexWsAuthChannel::Order.as_ref())
751            {
752                // These are account-level subscriptions without symbols
753                channels.push(channel.clone());
754            }
755        }
756
757        channels
758    }
759
760    /// Subscribe to instrument updates for all instruments on the venue.
761    ///
762    /// # Errors
763    ///
764    /// Returns an error if the WebSocket is not connected or if the subscription fails.
765    pub async fn subscribe_instruments(&self) -> Result<(), BitmexWsError> {
766        // Already subscribed automatically on connection
767        log::debug!("Already subscribed to all instruments on connection, skipping");
768        Ok(())
769    }
770
771    /// Subscribe to instrument updates (mark/index prices) for the specified instrument.
772    ///
773    /// # Errors
774    ///
775    /// Returns an error if the WebSocket is not connected or if the subscription fails.
776    pub async fn subscribe_instrument(
777        &self,
778        instrument_id: InstrumentId,
779    ) -> Result<(), BitmexWsError> {
780        // Already subscribed to all instruments on connection
781        log::debug!(
782            "Already subscribed to all instruments on connection (includes {instrument_id}), skipping"
783        );
784        Ok(())
785    }
786
787    /// Subscribe to order book updates for the specified instrument.
788    ///
789    /// # Errors
790    ///
791    /// Returns an error if the WebSocket is not connected or if the subscription fails.
792    pub async fn subscribe_book(&self, instrument_id: InstrumentId) -> Result<(), BitmexWsError> {
793        let topic = BitmexWsTopic::OrderBookL2;
794        let symbol = instrument_id.symbol.as_str();
795        self.subscribe(vec![format!("{topic}:{symbol}")]).await
796    }
797
798    /// Subscribe to order book L2 (25 levels) updates for the specified instrument.
799    ///
800    /// # Errors
801    ///
802    /// Returns an error if the WebSocket is not connected or if the subscription fails.
803    pub async fn subscribe_book_25(
804        &self,
805        instrument_id: InstrumentId,
806    ) -> Result<(), BitmexWsError> {
807        let topic = BitmexWsTopic::OrderBookL2_25;
808        let symbol = instrument_id.symbol.as_str();
809        self.subscribe(vec![format!("{topic}:{symbol}")]).await
810    }
811
812    /// Subscribe to order book depth 10 updates for the specified instrument.
813    ///
814    /// # Errors
815    ///
816    /// Returns an error if the WebSocket is not connected or if the subscription fails.
817    pub async fn subscribe_book_depth10(
818        &self,
819        instrument_id: InstrumentId,
820    ) -> Result<(), BitmexWsError> {
821        let topic = BitmexWsTopic::OrderBook10;
822        let symbol = instrument_id.symbol.as_str();
823        self.subscribe(vec![format!("{topic}:{symbol}")]).await
824    }
825
826    /// Subscribe to quote updates for the specified instrument.
827    ///
828    /// Note: Index symbols (starting with '.') do not have quotes and will be silently ignored.
829    ///
830    /// # Errors
831    ///
832    /// Returns an error if the WebSocket is not connected or if the subscription fails.
833    pub async fn subscribe_quotes(&self, instrument_id: InstrumentId) -> Result<(), BitmexWsError> {
834        let symbol = instrument_id.symbol.inner();
835
836        // Index symbols don't have quotes (bid/ask), only a single price
837        if is_index_symbol(&instrument_id.symbol.inner()) {
838            tracing::warn!("Ignoring quote subscription for index symbol: {symbol}");
839            return Ok(());
840        }
841
842        let topic = BitmexWsTopic::Quote;
843        self.subscribe(vec![format!("{topic}:{symbol}")]).await
844    }
845
846    /// Subscribe to trade updates for the specified instrument.
847    ///
848    /// Note: Index symbols (starting with '.') do not have trades and will be silently ignored.
849    ///
850    /// # Errors
851    ///
852    /// Returns an error if the WebSocket is not connected or if the subscription fails.
853    pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> Result<(), BitmexWsError> {
854        let symbol = instrument_id.symbol.inner();
855
856        // Index symbols don't have trades
857        if is_index_symbol(&symbol) {
858            tracing::warn!("Ignoring trade subscription for index symbol: {symbol}");
859            return Ok(());
860        }
861
862        let topic = BitmexWsTopic::Trade;
863        self.subscribe(vec![format!("{topic}:{symbol}")]).await
864    }
865
866    /// Subscribe to mark price updates for the specified instrument.
867    ///
868    /// # Errors
869    ///
870    /// Returns an error if the WebSocket is not connected or if the subscription fails.
871    pub async fn subscribe_mark_prices(
872        &self,
873        instrument_id: InstrumentId,
874    ) -> Result<(), BitmexWsError> {
875        self.subscribe_instrument(instrument_id).await
876    }
877
878    /// Subscribe to index price updates for the specified instrument.
879    ///
880    /// # Errors
881    ///
882    /// Returns an error if the WebSocket is not connected or if the subscription fails.
883    pub async fn subscribe_index_prices(
884        &self,
885        instrument_id: InstrumentId,
886    ) -> Result<(), BitmexWsError> {
887        self.subscribe_instrument(instrument_id).await
888    }
889
890    /// Subscribe to funding rate updates for the specified instrument.
891    ///
892    /// # Errors
893    ///
894    /// Returns an error if the WebSocket is not connected or if the subscription fails.
895    pub async fn subscribe_funding_rates(
896        &self,
897        instrument_id: InstrumentId,
898    ) -> Result<(), BitmexWsError> {
899        let topic = BitmexWsTopic::Funding;
900        let symbol = instrument_id.symbol.as_str();
901        self.subscribe(vec![format!("{topic}:{symbol}")]).await
902    }
903
904    /// Subscribe to bar updates for the specified bar type.
905    ///
906    /// # Errors
907    ///
908    /// Returns an error if the WebSocket is not connected or if the subscription fails.
909    pub async fn subscribe_bars(&self, bar_type: BarType) -> Result<(), BitmexWsError> {
910        let topic = topic_from_bar_spec(bar_type.spec());
911        let symbol = bar_type.instrument_id().symbol.to_string();
912        self.subscribe(vec![format!("{topic}:{symbol}")]).await
913    }
914
915    /// Unsubscribe from instrument updates for all instruments on the venue.
916    ///
917    /// # Errors
918    ///
919    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
920    pub async fn unsubscribe_instruments(&self) -> Result<(), BitmexWsError> {
921        // No-op: instruments are required for proper operation
922        log::debug!(
923            "Instruments subscription maintained for proper operation, skipping unsubscribe"
924        );
925        Ok(())
926    }
927
928    /// Unsubscribe from instrument updates (mark/index prices) for the specified instrument.
929    ///
930    /// # Errors
931    ///
932    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
933    pub async fn unsubscribe_instrument(
934        &self,
935        instrument_id: InstrumentId,
936    ) -> Result<(), BitmexWsError> {
937        // No-op: instruments are required for proper operation
938        log::debug!(
939            "Instruments subscription maintained for proper operation (includes {instrument_id}), skipping unsubscribe"
940        );
941        Ok(())
942    }
943
944    /// Unsubscribe from order book updates for the specified instrument.
945    ///
946    /// # Errors
947    ///
948    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
949    pub async fn unsubscribe_book(&self, instrument_id: InstrumentId) -> Result<(), BitmexWsError> {
950        let topic = BitmexWsTopic::OrderBookL2;
951        let symbol = instrument_id.symbol.as_str();
952        self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
953    }
954
955    /// Unsubscribe from order book L2 (25 levels) updates for the specified instrument.
956    ///
957    /// # Errors
958    ///
959    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
960    pub async fn unsubscribe_book_25(
961        &self,
962        instrument_id: InstrumentId,
963    ) -> Result<(), BitmexWsError> {
964        let topic = BitmexWsTopic::OrderBookL2_25;
965        let symbol = instrument_id.symbol.as_str();
966        self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
967    }
968
969    /// Unsubscribe from order book depth 10 updates for the specified instrument.
970    ///
971    /// # Errors
972    ///
973    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
974    pub async fn unsubscribe_book_depth10(
975        &self,
976        instrument_id: InstrumentId,
977    ) -> Result<(), BitmexWsError> {
978        let topic = BitmexWsTopic::OrderBook10;
979        let symbol = instrument_id.symbol.as_str();
980        self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
981    }
982
983    /// Unsubscribe from quote updates for the specified instrument.
984    ///
985    /// # Errors
986    ///
987    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
988    pub async fn unsubscribe_quotes(
989        &self,
990        instrument_id: InstrumentId,
991    ) -> Result<(), BitmexWsError> {
992        let symbol = instrument_id.symbol.inner();
993
994        // Index symbols don't have quotes
995        if is_index_symbol(&symbol) {
996            return Ok(());
997        }
998
999        let topic = BitmexWsTopic::Quote;
1000        self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
1001    }
1002
1003    /// Unsubscribe from trade updates for the specified instrument.
1004    ///
1005    /// # Errors
1006    ///
1007    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1008    pub async fn unsubscribe_trades(
1009        &self,
1010        instrument_id: InstrumentId,
1011    ) -> Result<(), BitmexWsError> {
1012        let symbol = instrument_id.symbol.inner();
1013
1014        // Index symbols don't have trades
1015        if is_index_symbol(&symbol) {
1016            return Ok(());
1017        }
1018
1019        let topic = BitmexWsTopic::Trade;
1020        self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
1021    }
1022
1023    /// Unsubscribe from mark price updates for the specified instrument.
1024    ///
1025    /// # Errors
1026    ///
1027    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1028    pub async fn unsubscribe_mark_prices(
1029        &self,
1030        instrument_id: InstrumentId,
1031    ) -> Result<(), BitmexWsError> {
1032        // No-op: instrument channel shared with index prices
1033        log::debug!(
1034            "Mark prices for {instrument_id} uses shared instrument channel, skipping unsubscribe"
1035        );
1036        Ok(())
1037    }
1038
1039    /// Unsubscribe from index price updates for the specified instrument.
1040    ///
1041    /// # Errors
1042    ///
1043    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1044    pub async fn unsubscribe_index_prices(
1045        &self,
1046        instrument_id: InstrumentId,
1047    ) -> Result<(), BitmexWsError> {
1048        // No-op: instrument channel shared with mark prices
1049        log::debug!(
1050            "Index prices for {instrument_id} uses shared instrument channel, skipping unsubscribe"
1051        );
1052        Ok(())
1053    }
1054
1055    /// Unsubscribe from funding rate updates for the specified instrument.
1056    ///
1057    /// # Errors
1058    ///
1059    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1060    pub async fn unsubscribe_funding_rates(
1061        &self,
1062        instrument_id: InstrumentId,
1063    ) -> Result<(), BitmexWsError> {
1064        // No-op: unsubscribing during shutdown causes race conditions
1065        log::debug!(
1066            "Funding rates for {instrument_id}, skipping unsubscribe to avoid shutdown race"
1067        );
1068        Ok(())
1069    }
1070
1071    /// Unsubscribe from bar updates for the specified bar type.
1072    ///
1073    /// # Errors
1074    ///
1075    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1076    pub async fn unsubscribe_bars(&self, bar_type: BarType) -> Result<(), BitmexWsError> {
1077        let topic = topic_from_bar_spec(bar_type.spec());
1078        let symbol = bar_type.instrument_id().symbol.to_string();
1079        self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
1080    }
1081
1082    /// Subscribe to order updates for the authenticated account.
1083    ///
1084    /// # Errors
1085    ///
1086    /// Returns an error if the WebSocket is not connected, not authenticated, or if the subscription fails.
1087    pub async fn subscribe_orders(&self) -> Result<(), BitmexWsError> {
1088        if self.credential.is_none() {
1089            return Err(BitmexWsError::MissingCredentials);
1090        }
1091        self.subscribe(vec![BitmexWsAuthChannel::Order.to_string()])
1092            .await
1093    }
1094
1095    /// Subscribe to execution updates for the authenticated account.
1096    ///
1097    /// # Errors
1098    ///
1099    /// Returns an error if the WebSocket is not connected, not authenticated, or if the subscription fails.
1100    pub async fn subscribe_executions(&self) -> Result<(), BitmexWsError> {
1101        if self.credential.is_none() {
1102            return Err(BitmexWsError::MissingCredentials);
1103        }
1104        self.subscribe(vec![BitmexWsAuthChannel::Execution.to_string()])
1105            .await
1106    }
1107
1108    /// Subscribe to position updates for the authenticated account.
1109    ///
1110    /// # Errors
1111    ///
1112    /// Returns an error if the WebSocket is not connected, not authenticated, or if the subscription fails.
1113    pub async fn subscribe_positions(&self) -> Result<(), BitmexWsError> {
1114        if self.credential.is_none() {
1115            return Err(BitmexWsError::MissingCredentials);
1116        }
1117        self.subscribe(vec![BitmexWsAuthChannel::Position.to_string()])
1118            .await
1119    }
1120
1121    /// Subscribe to margin updates for the authenticated account.
1122    ///
1123    /// # Errors
1124    ///
1125    /// Returns an error if the WebSocket is not connected, not authenticated, or if the subscription fails.
1126    pub async fn subscribe_margin(&self) -> Result<(), BitmexWsError> {
1127        if self.credential.is_none() {
1128            return Err(BitmexWsError::MissingCredentials);
1129        }
1130        self.subscribe(vec![BitmexWsAuthChannel::Margin.to_string()])
1131            .await
1132    }
1133
1134    /// Subscribe to wallet updates for the authenticated account.
1135    ///
1136    /// # Errors
1137    ///
1138    /// Returns an error if the WebSocket is not connected, not authenticated, or if the subscription fails.
1139    pub async fn subscribe_wallet(&self) -> Result<(), BitmexWsError> {
1140        if self.credential.is_none() {
1141            return Err(BitmexWsError::MissingCredentials);
1142        }
1143        self.subscribe(vec![BitmexWsAuthChannel::Wallet.to_string()])
1144            .await
1145    }
1146
1147    /// Unsubscribe from order updates for the authenticated account.
1148    ///
1149    /// # Errors
1150    ///
1151    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1152    pub async fn unsubscribe_orders(&self) -> Result<(), BitmexWsError> {
1153        self.unsubscribe(vec![BitmexWsAuthChannel::Order.to_string()])
1154            .await
1155    }
1156
1157    /// Unsubscribe from execution updates for the authenticated account.
1158    ///
1159    /// # Errors
1160    ///
1161    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1162    pub async fn unsubscribe_executions(&self) -> Result<(), BitmexWsError> {
1163        self.unsubscribe(vec![BitmexWsAuthChannel::Execution.to_string()])
1164            .await
1165    }
1166
1167    /// Unsubscribe from position updates for the authenticated account.
1168    ///
1169    /// # Errors
1170    ///
1171    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1172    pub async fn unsubscribe_positions(&self) -> Result<(), BitmexWsError> {
1173        self.unsubscribe(vec![BitmexWsAuthChannel::Position.to_string()])
1174            .await
1175    }
1176
1177    /// Unsubscribe from margin updates for the authenticated account.
1178    ///
1179    /// # Errors
1180    ///
1181    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1182    pub async fn unsubscribe_margin(&self) -> Result<(), BitmexWsError> {
1183        self.unsubscribe(vec![BitmexWsAuthChannel::Margin.to_string()])
1184            .await
1185    }
1186
1187    /// Unsubscribe from wallet updates for the authenticated account.
1188    ///
1189    /// # Errors
1190    ///
1191    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1192    pub async fn unsubscribe_wallet(&self) -> Result<(), BitmexWsError> {
1193        self.unsubscribe(vec![BitmexWsAuthChannel::Wallet.to_string()])
1194            .await
1195    }
1196}
1197
1198struct BitmexFeedHandler {
1199    receiver: tokio::sync::mpsc::UnboundedReceiver<Message>,
1200    signal: Arc<AtomicBool>,
1201}
1202
1203impl BitmexFeedHandler {
1204    /// Creates a new [`BitmexFeedHandler`] instance.
1205    pub fn new(
1206        receiver: tokio::sync::mpsc::UnboundedReceiver<Message>,
1207        signal: Arc<AtomicBool>,
1208    ) -> Self {
1209        Self { receiver, signal }
1210    }
1211
1212    /// Get the next message from the WebSocket stream.
1213    async fn next(&mut self) -> Option<BitmexWsMessage> {
1214        loop {
1215            tokio::select! {
1216                msg = self.receiver.recv() => match msg {
1217                    Some(msg) => match msg {
1218                        Message::Text(text) => {
1219                            if text == RECONNECTED {
1220                                tracing::info!("Received WebSocket reconnection signal");
1221                                return Some(BitmexWsMessage::Reconnected);
1222                            }
1223
1224                            tracing::trace!("Raw websocket message: {text}");
1225
1226                            if Self::is_heartbeat_message(&text) {
1227                                tracing::trace!(
1228                                    "Ignoring heartbeat control message: {text}"
1229                                );
1230                                continue;
1231                            }
1232
1233                            match serde_json::from_str(&text) {
1234                                Ok(msg) => match &msg {
1235                                    BitmexWsMessage::Welcome {
1236                                        version,
1237                                        heartbeat_enabled,
1238                                        limit,
1239                                        ..
1240                                    } => {
1241                                        tracing::info!(
1242                                            version = version,
1243                                            heartbeat = heartbeat_enabled,
1244                                            rate_limit = ?limit.remaining,
1245                                            "Welcome to the BitMEX Realtime API:",
1246                                        );
1247                                    }
1248                                    BitmexWsMessage::Subscription { .. } => return Some(msg),
1249                                    BitmexWsMessage::Error { status, error, .. } => {
1250                                        tracing::error!(
1251                                            status = status,
1252                                            error = error,
1253                                            "Received error from BitMEX"
1254                                        );
1255                                    }
1256                                    _ => return Some(msg),
1257                                },
1258                                Err(e) => {
1259                                    tracing::error!("Failed to parse WebSocket message: {e}: {text}");
1260                                }
1261                            }
1262                        }
1263                        Message::Binary(msg) => {
1264                            tracing::debug!("Raw binary: {msg:?}");
1265                        }
1266                        Message::Close(_) => {
1267                            tracing::debug!("Received close message, waiting for reconnection");
1268                            continue;
1269                        }
1270                        msg => match msg {
1271                            Message::Ping(data) => {
1272                                tracing::trace!("Received ping frame with {} bytes", data.len());
1273                            }
1274                            Message::Pong(data) => {
1275                                tracing::trace!("Received pong frame with {} bytes", data.len());
1276                            }
1277                            Message::Frame(frame) => {
1278                                tracing::debug!("Received raw frame: {frame:?}");
1279                            }
1280                            _ => {
1281                                tracing::warn!("Unexpected message type: {msg:?}");
1282                            }
1283                        },
1284                    }
1285                    None => {
1286                        tracing::info!("WebSocket stream closed");
1287                        return None;
1288                    }
1289                },
1290                _ = tokio::time::sleep(Duration::from_millis(1)) => {
1291                    if self.signal.load(std::sync::atomic::Ordering::Relaxed) {
1292                        tracing::debug!("Stop signal received");
1293                        return None;
1294                    }
1295                }
1296            }
1297        }
1298    }
1299
1300    fn is_heartbeat_message(text: &str) -> bool {
1301        let trimmed = text.trim();
1302
1303        if !trimmed.starts_with('{') || trimmed.len() > 64 {
1304            return false;
1305        }
1306
1307        trimmed.contains("\"op\":\"ping\"") || trimmed.contains("\"op\":\"pong\"")
1308    }
1309}
1310
1311struct BitmexWsMessageHandler {
1312    handler: BitmexFeedHandler,
1313    tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
1314    #[allow(
1315        dead_code,
1316        reason = "May be needed for future account-specific processing"
1317    )]
1318    account_id: AccountId,
1319    auth_tracker: AuthTracker,
1320    subscriptions: SubscriptionState,
1321    instruments_cache: Arc<AHashMap<Ustr, InstrumentAny>>,
1322    order_type_cache: Arc<DashMap<ClientOrderId, OrderType>>,
1323    order_symbol_cache: Arc<DashMap<ClientOrderId, Ustr>>,
1324    quote_cache: QuoteCache,
1325}
1326
1327impl BitmexWsMessageHandler {
1328    /// Creates a new [`BitmexWsMessageHandler`] instance.
1329    #[allow(clippy::too_many_arguments)]
1330    pub fn new(
1331        receiver: tokio::sync::mpsc::UnboundedReceiver<Message>,
1332        signal: Arc<AtomicBool>,
1333        tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
1334        account_id: AccountId,
1335        auth_tracker: AuthTracker,
1336        subscriptions: SubscriptionState,
1337        instruments_cache: Arc<AHashMap<Ustr, InstrumentAny>>,
1338        order_type_cache: Arc<DashMap<ClientOrderId, OrderType>>,
1339        order_symbol_cache: Arc<DashMap<ClientOrderId, Ustr>>,
1340    ) -> Self {
1341        let handler = BitmexFeedHandler::new(receiver, signal);
1342        Self {
1343            handler,
1344            tx,
1345            account_id,
1346            auth_tracker,
1347            subscriptions,
1348            instruments_cache,
1349            order_type_cache,
1350            order_symbol_cache,
1351            quote_cache: QuoteCache::new(),
1352        }
1353    }
1354
1355    // Run is now handled inline in the connect() method where we have access to reconnection resources
1356
1357    #[inline]
1358    fn get_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
1359        self.instruments_cache.get(symbol).cloned()
1360    }
1361
1362    async fn next(&mut self) -> Option<NautilusWsMessage> {
1363        let clock = get_atomic_clock_realtime();
1364
1365        while let Some(msg) = self.handler.next().await {
1366            match msg {
1367                BitmexWsMessage::Reconnected => {
1368                    // Return reconnection signal to outer loop
1369                    self.quote_cache.clear();
1370                    return Some(NautilusWsMessage::Reconnected);
1371                }
1372                BitmexWsMessage::Subscription {
1373                    success,
1374                    subscribe,
1375                    request,
1376                    error,
1377                } => {
1378                    self.handle_subscription_message(
1379                        success,
1380                        subscribe.as_ref(),
1381                        request.as_ref(),
1382                        error.as_deref(),
1383                    );
1384                    continue;
1385                }
1386                BitmexWsMessage::Table(table_msg) => {
1387                    let ts_init = clock.get_time_ns();
1388
1389                    return Some(match table_msg {
1390                        BitmexTableMessage::OrderBookL2 { action, data } => {
1391                            if data.is_empty() {
1392                                continue;
1393                            }
1394                            let data = parse_book_msg_vec(
1395                                data,
1396                                action,
1397                                self.instruments_cache.as_ref(),
1398                                ts_init,
1399                            );
1400
1401                            NautilusWsMessage::Data(data)
1402                        }
1403                        BitmexTableMessage::OrderBookL2_25 { action, data } => {
1404                            if data.is_empty() {
1405                                continue;
1406                            }
1407                            let data = parse_book_msg_vec(
1408                                data,
1409                                action,
1410                                self.instruments_cache.as_ref(),
1411                                ts_init,
1412                            );
1413
1414                            NautilusWsMessage::Data(data)
1415                        }
1416                        BitmexTableMessage::OrderBook10 { data, .. } => {
1417                            if data.is_empty() {
1418                                continue;
1419                            }
1420                            let data = parse_book10_msg_vec(
1421                                data,
1422                                self.instruments_cache.as_ref(),
1423                                ts_init,
1424                            );
1425
1426                            NautilusWsMessage::Data(data)
1427                        }
1428                        BitmexTableMessage::Quote { mut data, .. } => {
1429                            // Index symbols may return empty quote data
1430                            if data.is_empty() {
1431                                continue;
1432                            }
1433
1434                            let msg = data.remove(0);
1435                            let Some(instrument) = self.get_instrument(&msg.symbol) else {
1436                                tracing::error!(
1437                                    "Instrument cache miss: quote message dropped for symbol={}",
1438                                    msg.symbol
1439                                );
1440                                continue;
1441                            };
1442
1443                            if let Some(quote) =
1444                                self.quote_cache.process(&msg, &instrument, ts_init)
1445                            {
1446                                NautilusWsMessage::Data(vec![Data::Quote(quote)])
1447                            } else {
1448                                continue;
1449                            }
1450                        }
1451                        BitmexTableMessage::Trade { data, .. } => {
1452                            if data.is_empty() {
1453                                continue;
1454                            }
1455                            let data =
1456                                parse_trade_msg_vec(data, self.instruments_cache.as_ref(), ts_init);
1457
1458                            NautilusWsMessage::Data(data)
1459                        }
1460                        BitmexTableMessage::TradeBin1m { action, data } => {
1461                            if action == BitmexAction::Partial || data.is_empty() {
1462                                continue;
1463                            }
1464                            let data = parse_trade_bin_msg_vec(
1465                                data,
1466                                BitmexWsTopic::TradeBin1m,
1467                                self.instruments_cache.as_ref(),
1468                                ts_init,
1469                            );
1470
1471                            NautilusWsMessage::Data(data)
1472                        }
1473                        BitmexTableMessage::TradeBin5m { action, data } => {
1474                            if action == BitmexAction::Partial || data.is_empty() {
1475                                continue;
1476                            }
1477                            let data = parse_trade_bin_msg_vec(
1478                                data,
1479                                BitmexWsTopic::TradeBin5m,
1480                                self.instruments_cache.as_ref(),
1481                                ts_init,
1482                            );
1483
1484                            NautilusWsMessage::Data(data)
1485                        }
1486                        BitmexTableMessage::TradeBin1h { action, data } => {
1487                            if action == BitmexAction::Partial || data.is_empty() {
1488                                continue;
1489                            }
1490                            let data = parse_trade_bin_msg_vec(
1491                                data,
1492                                BitmexWsTopic::TradeBin1h,
1493                                self.instruments_cache.as_ref(),
1494                                ts_init,
1495                            );
1496
1497                            NautilusWsMessage::Data(data)
1498                        }
1499                        BitmexTableMessage::TradeBin1d { action, data } => {
1500                            if action == BitmexAction::Partial || data.is_empty() {
1501                                continue;
1502                            }
1503                            let data = parse_trade_bin_msg_vec(
1504                                data,
1505                                BitmexWsTopic::TradeBin1d,
1506                                self.instruments_cache.as_ref(),
1507                                ts_init,
1508                            );
1509
1510                            NautilusWsMessage::Data(data)
1511                        }
1512                        // Execution messages
1513                        // Note: BitMEX may send duplicate order status updates for the same order
1514                        // (e.g., immediate response + stream update). This is expected behavior.
1515                        BitmexTableMessage::Order { data, .. } => {
1516                            // Process all orders in the message
1517                            let mut reports = Vec::with_capacity(data.len());
1518
1519                            for order_data in data {
1520                                match order_data {
1521                                    OrderData::Full(order_msg) => {
1522                                        let Some(instrument) =
1523                                            self.get_instrument(&order_msg.symbol)
1524                                        else {
1525                                            tracing::error!(
1526                                                "Instrument cache miss: order message dropped for symbol={}, order_id={}",
1527                                                order_msg.symbol,
1528                                                order_msg.order_id
1529                                            );
1530                                            continue;
1531                                        };
1532
1533                                        match parse_order_msg(
1534                                            &order_msg,
1535                                            &instrument,
1536                                            &self.order_type_cache,
1537                                        ) {
1538                                            Ok(report) => {
1539                                                // Cache the order type and symbol AFTER successful parse
1540                                                if let Some(client_order_id) = &order_msg.cl_ord_id
1541                                                {
1542                                                    let client_order_id =
1543                                                        ClientOrderId::new(client_order_id);
1544
1545                                                    if let Some(ord_type) = &order_msg.ord_type {
1546                                                        let order_type: OrderType =
1547                                                            (*ord_type).into();
1548                                                        self.order_type_cache
1549                                                            .insert(client_order_id, order_type);
1550                                                    }
1551
1552                                                    // Cache symbol for execution message routing
1553                                                    self.order_symbol_cache
1554                                                        .insert(client_order_id, order_msg.symbol);
1555                                                }
1556
1557                                                if is_terminal_order_status(report.order_status)
1558                                                    && let Some(client_id) = report.client_order_id
1559                                                {
1560                                                    self.order_type_cache.remove(&client_id);
1561                                                    self.order_symbol_cache.remove(&client_id);
1562                                                }
1563
1564                                                reports.push(report);
1565                                            }
1566                                            Err(e) => {
1567                                                tracing::error!(
1568                                                    error = %e,
1569                                                    symbol = %order_msg.symbol,
1570                                                    order_id = %order_msg.order_id,
1571                                                    time_in_force = ?order_msg.time_in_force,
1572                                                    "Failed to parse full order message - potential data loss"
1573                                                );
1574                                                // TODO: Add metric counter for parse failures
1575                                                continue;
1576                                            }
1577                                        }
1578                                    }
1579                                    OrderData::Update(msg) => {
1580                                        let Some(instrument) = self.get_instrument(&msg.symbol)
1581                                        else {
1582                                            tracing::error!(
1583                                                "Instrument cache miss: order update dropped for symbol={}, order_id={}",
1584                                                msg.symbol,
1585                                                msg.order_id
1586                                            );
1587                                            continue;
1588                                        };
1589
1590                                        // Populate cache for execution message routing (handles edge case where update arrives before full snapshot)
1591                                        if let Some(cl_ord_id) = &msg.cl_ord_id {
1592                                            let client_order_id = ClientOrderId::new(cl_ord_id);
1593                                            self.order_symbol_cache
1594                                                .insert(client_order_id, msg.symbol);
1595                                        }
1596
1597                                        if let Some(event) = parse_order_update_msg(
1598                                            &msg,
1599                                            &instrument,
1600                                            self.account_id,
1601                                        ) {
1602                                            return Some(NautilusWsMessage::OrderUpdated(event));
1603                                        } else {
1604                                            tracing::warn!(
1605                                                order_id = %msg.order_id,
1606                                                price = ?msg.price,
1607                                                "Skipped order update message (insufficient data)"
1608                                            );
1609                                        }
1610                                    }
1611                                }
1612                            }
1613
1614                            if reports.is_empty() {
1615                                continue;
1616                            }
1617
1618                            NautilusWsMessage::OrderStatusReports(reports)
1619                        }
1620                        BitmexTableMessage::Execution { data, .. } => {
1621                            let mut fills = Vec::with_capacity(data.len());
1622
1623                            for exec_msg in data {
1624                                // Try to get symbol, fall back to cache lookup if missing
1625                                let symbol_opt = if let Some(sym) = &exec_msg.symbol {
1626                                    Some(*sym)
1627                                } else if let Some(cl_ord_id) = &exec_msg.cl_ord_id {
1628                                    // Try to look up symbol from order_symbol_cache
1629                                    let client_order_id = ClientOrderId::new(cl_ord_id);
1630                                    self.order_symbol_cache
1631                                        .get(&client_order_id)
1632                                        .map(|r| *r.value())
1633                                } else {
1634                                    None
1635                                };
1636
1637                                let Some(symbol) = symbol_opt else {
1638                                    // Symbol missing - log appropriately based on exec type and whether we had clOrdID
1639                                    if let Some(cl_ord_id) = &exec_msg.cl_ord_id {
1640                                        if exec_msg.exec_type == Some(BitmexExecType::Trade) {
1641                                            tracing::warn!(
1642                                                cl_ord_id = %cl_ord_id,
1643                                                exec_id = ?exec_msg.exec_id,
1644                                                ord_rej_reason = ?exec_msg.ord_rej_reason,
1645                                                text = ?exec_msg.text,
1646                                                "Execution message missing symbol and not found in cache"
1647                                            );
1648                                        } else {
1649                                            tracing::debug!(
1650                                                cl_ord_id = %cl_ord_id,
1651                                                exec_id = ?exec_msg.exec_id,
1652                                                exec_type = ?exec_msg.exec_type,
1653                                                ord_rej_reason = ?exec_msg.ord_rej_reason,
1654                                                text = ?exec_msg.text,
1655                                                "Execution message missing symbol and not found in cache"
1656                                            );
1657                                        }
1658                                    } else {
1659                                        // CancelReject messages without symbol/clOrdID are expected when using
1660                                        // redundant cancel broadcasting - one cancel succeeds, others arrive late
1661                                        // and BitMEX responds with CancelReject but doesn't populate the fields
1662                                        if exec_msg.exec_type == Some(BitmexExecType::CancelReject)
1663                                        {
1664                                            tracing::debug!(
1665                                                exec_id = ?exec_msg.exec_id,
1666                                                order_id = ?exec_msg.order_id,
1667                                                "CancelReject message missing symbol/clOrdID (expected with redundant cancels)"
1668                                            );
1669                                        } else {
1670                                            tracing::warn!(
1671                                                exec_id = ?exec_msg.exec_id,
1672                                                order_id = ?exec_msg.order_id,
1673                                                exec_type = ?exec_msg.exec_type,
1674                                                ord_rej_reason = ?exec_msg.ord_rej_reason,
1675                                                text = ?exec_msg.text,
1676                                                "Execution message missing both symbol and clOrdID, cannot process"
1677                                            );
1678                                        }
1679                                    }
1680                                    continue;
1681                                };
1682
1683                                let Some(instrument) = self.get_instrument(&symbol) else {
1684                                    tracing::error!(
1685                                        "Instrument cache miss: execution message dropped for symbol={}, exec_id={:?}, exec_type={:?}, Liquidation/ADL fills may be lost",
1686                                        symbol,
1687                                        exec_msg.exec_id,
1688                                        exec_msg.exec_type
1689                                    );
1690                                    continue;
1691                                };
1692
1693                                if let Some(fill) = parse_execution_msg(exec_msg, &instrument) {
1694                                    fills.push(fill);
1695                                }
1696                            }
1697
1698                            if fills.is_empty() {
1699                                continue;
1700                            }
1701                            NautilusWsMessage::FillReports(fills)
1702                        }
1703                        BitmexTableMessage::Position { data, .. } => {
1704                            if let Some(pos_msg) = data.into_iter().next() {
1705                                let Some(instrument) = self.get_instrument(&pos_msg.symbol) else {
1706                                    tracing::error!(
1707                                        "Instrument cache miss: position message dropped for symbol={}, account={}",
1708                                        pos_msg.symbol,
1709                                        pos_msg.account
1710                                    );
1711                                    continue;
1712                                };
1713                                let report = parse_position_msg(pos_msg, &instrument);
1714                                NautilusWsMessage::PositionStatusReport(report)
1715                            } else {
1716                                continue;
1717                            }
1718                        }
1719                        BitmexTableMessage::Wallet { data, .. } => {
1720                            if let Some(wallet_msg) = data.into_iter().next() {
1721                                let account_state = parse_wallet_msg(wallet_msg, ts_init);
1722                                NautilusWsMessage::AccountState(account_state)
1723                            } else {
1724                                continue;
1725                            }
1726                        }
1727                        BitmexTableMessage::Margin { .. } => {
1728                            // Skip margin messages - BitMEX uses account-level cross-margin
1729                            // which doesn't map well to Nautilus's per-instrument margin model
1730                            continue;
1731                        }
1732                        BitmexTableMessage::Instrument { data, .. } => {
1733                            let ts_init = clock.get_time_ns();
1734                            let mut data_msgs = Vec::with_capacity(data.len());
1735
1736                            for msg in data {
1737                                let parsed =
1738                                    parse_instrument_msg(msg, &self.instruments_cache, ts_init);
1739                                data_msgs.extend(parsed);
1740                            }
1741
1742                            if data_msgs.is_empty() {
1743                                continue;
1744                            }
1745                            NautilusWsMessage::Data(data_msgs)
1746                        }
1747                        BitmexTableMessage::Funding { data, .. } => {
1748                            let ts_init = clock.get_time_ns();
1749                            let mut funding_updates = Vec::with_capacity(data.len());
1750
1751                            for msg in data {
1752                                if let Some(parsed) = parse_funding_msg(msg, ts_init) {
1753                                    funding_updates.push(parsed);
1754                                }
1755                            }
1756
1757                            if !funding_updates.is_empty() {
1758                                NautilusWsMessage::FundingRateUpdates(funding_updates)
1759                            } else {
1760                                continue;
1761                            }
1762                        }
1763                        _ => {
1764                            // Other message types not yet implemented
1765                            tracing::warn!("Unhandled table message type: {table_msg:?}");
1766                            continue;
1767                        }
1768                    });
1769                }
1770                BitmexWsMessage::Welcome { .. } | BitmexWsMessage::Error { .. } => continue,
1771            }
1772        }
1773
1774        None
1775    }
1776
1777    fn handle_subscription_message(
1778        &self,
1779        success: bool,
1780        subscribe: Option<&String>,
1781        request: Option<&BitmexHttpRequest>,
1782        error: Option<&str>,
1783    ) {
1784        if let Some(req) = request {
1785            if req
1786                .op
1787                .eq_ignore_ascii_case(BitmexWsAuthAction::AuthKeyExpires.as_ref())
1788            {
1789                if success {
1790                    tracing::info!("Authenticated BitMEX WebSocket session");
1791                    self.auth_tracker.succeed();
1792                } else {
1793                    let reason = error.unwrap_or("Authentication rejected").to_string();
1794                    tracing::error!(error = %reason, "Authentication failed");
1795                    self.auth_tracker.fail(reason);
1796                }
1797                return;
1798            }
1799
1800            if req
1801                .op
1802                .eq_ignore_ascii_case(BitmexWsOperation::Subscribe.as_ref())
1803            {
1804                self.handle_subscription_ack(success, request, subscribe, error);
1805                return;
1806            }
1807
1808            if req
1809                .op
1810                .eq_ignore_ascii_case(BitmexWsOperation::Unsubscribe.as_ref())
1811            {
1812                self.handle_unsubscribe_ack(success, request, subscribe, error);
1813                return;
1814            }
1815        }
1816
1817        if subscribe.is_some() {
1818            self.handle_subscription_ack(success, request, subscribe, error);
1819            return;
1820        }
1821
1822        if let Some(error) = error {
1823            tracing::warn!(
1824                success = success,
1825                error = error,
1826                "Unhandled subscription control message"
1827            );
1828        }
1829    }
1830
1831    fn handle_subscription_ack(
1832        &self,
1833        success: bool,
1834        request: Option<&BitmexHttpRequest>,
1835        subscribe: Option<&String>,
1836        error: Option<&str>,
1837    ) {
1838        let topics = Self::topics_from_request(request, subscribe);
1839
1840        if topics.is_empty() {
1841            tracing::debug!("Subscription acknowledgement without topics");
1842            return;
1843        }
1844
1845        for topic in topics {
1846            if success {
1847                self.subscriptions.confirm(topic);
1848                tracing::debug!(topic = topic, "Subscription confirmed");
1849            } else {
1850                self.subscriptions.mark_failure(topic);
1851                let reason = error.unwrap_or("Subscription rejected");
1852                tracing::error!(topic = topic, error = reason, "Subscription failed");
1853            }
1854        }
1855    }
1856
1857    fn handle_unsubscribe_ack(
1858        &self,
1859        success: bool,
1860        request: Option<&BitmexHttpRequest>,
1861        subscribe: Option<&String>,
1862        error: Option<&str>,
1863    ) {
1864        let topics = Self::topics_from_request(request, subscribe);
1865
1866        if topics.is_empty() {
1867            tracing::debug!("Unsubscription acknowledgement without topics");
1868            return;
1869        }
1870
1871        for topic in topics {
1872            if success {
1873                tracing::debug!(topic = topic, "Unsubscription confirmed");
1874                self.subscriptions.clear_pending(topic);
1875            } else {
1876                let reason = error.unwrap_or("Unsubscription rejected");
1877                tracing::error!(topic = topic, error = reason, "Unsubscription failed");
1878                self.subscriptions.confirm(topic);
1879            }
1880        }
1881    }
1882
1883    fn topics_from_request<'a>(
1884        request: Option<&'a BitmexHttpRequest>,
1885        fallback: Option<&'a String>,
1886    ) -> Vec<&'a str> {
1887        if let Some(req) = request
1888            && !req.args.is_empty()
1889        {
1890            return req.args.iter().filter_map(|arg| arg.as_str()).collect();
1891        }
1892
1893        fallback.into_iter().map(|topic| topic.as_str()).collect()
1894    }
1895}
1896
1897fn is_terminal_order_status(status: OrderStatus) -> bool {
1898    matches!(
1899        status,
1900        OrderStatus::Canceled | OrderStatus::Expired | OrderStatus::Rejected | OrderStatus::Filled,
1901    )
1902}
1903
1904////////////////////////////////////////////////////////////////////////////////
1905// Tests
1906////////////////////////////////////////////////////////////////////////////////
1907
1908#[cfg(test)]
1909mod tests {
1910    use ahash::AHashSet;
1911    use rstest::rstest;
1912    use ustr::Ustr;
1913
1914    use super::*;
1915
1916    #[test]
1917    fn test_is_heartbeat_message_detection() {
1918        assert!(BitmexFeedHandler::is_heartbeat_message("{\"op\":\"ping\"}"));
1919        assert!(BitmexFeedHandler::is_heartbeat_message("{\"op\":\"pong\"}"));
1920        assert!(!BitmexFeedHandler::is_heartbeat_message(
1921            "{\"op\":\"subscribe\",\"args\":[\"trade:XBTUSD\"]}"
1922        ));
1923    }
1924
1925    #[rstest]
1926    fn test_reconnect_topics_restoration_logic() {
1927        // Create real client with credentials
1928        let client = BitmexWebSocketClient::new(
1929            Some("ws://test.com".to_string()),
1930            Some("test_key".to_string()),
1931            Some("test_secret".to_string()),
1932            Some(AccountId::new("BITMEX-TEST")),
1933            None,
1934        )
1935        .unwrap();
1936
1937        // Populate subscriptions like they would be during normal operation
1938        let subs = client.subscriptions.confirmed();
1939        subs.insert(BitmexWsTopic::Trade.as_ref().to_string(), {
1940            let mut set = AHashSet::new();
1941            set.insert(Ustr::from("XBTUSD"));
1942            set.insert(Ustr::from("ETHUSD"));
1943            set
1944        });
1945
1946        subs.insert(BitmexWsTopic::OrderBookL2.as_ref().to_string(), {
1947            let mut set = AHashSet::new();
1948            set.insert(Ustr::from("XBTUSD"));
1949            set
1950        });
1951
1952        // Private channels (no symbols)
1953        subs.insert(
1954            BitmexWsAuthChannel::Order.as_ref().to_string(),
1955            AHashSet::new(),
1956        );
1957        subs.insert(
1958            BitmexWsAuthChannel::Position.as_ref().to_string(),
1959            AHashSet::new(),
1960        );
1961
1962        // Test the actual reconnection topic building logic from lines 258-268
1963        let mut topics_to_restore = Vec::new();
1964        for entry in subs.iter() {
1965            let (channel, symbols) = entry.pair();
1966            if symbols.is_empty() {
1967                topics_to_restore.push(channel.clone());
1968            } else {
1969                for symbol in symbols.iter() {
1970                    topics_to_restore.push(format!("{channel}:{symbol}"));
1971                }
1972            }
1973        }
1974
1975        // Verify it builds the correct restoration topics
1976        assert!(topics_to_restore.contains(&format!("{}:XBTUSD", BitmexWsTopic::Trade.as_ref())));
1977        assert!(topics_to_restore.contains(&format!("{}:ETHUSD", BitmexWsTopic::Trade.as_ref())));
1978        assert!(
1979            topics_to_restore.contains(&format!("{}:XBTUSD", BitmexWsTopic::OrderBookL2.as_ref()))
1980        );
1981        assert!(topics_to_restore.contains(&BitmexWsAuthChannel::Order.as_ref().to_string()));
1982        assert!(topics_to_restore.contains(&BitmexWsAuthChannel::Position.as_ref().to_string()));
1983        assert_eq!(topics_to_restore.len(), 5);
1984    }
1985
1986    #[rstest]
1987    fn test_reconnect_auth_message_building() {
1988        // Test with credentials
1989        let client_with_creds = BitmexWebSocketClient::new(
1990            Some("ws://test.com".to_string()),
1991            Some("test_key".to_string()),
1992            Some("test_secret".to_string()),
1993            Some(AccountId::new("BITMEX-TEST")),
1994            None,
1995        )
1996        .unwrap();
1997
1998        // Test the actual auth message building logic from lines 220-228
1999        if let Some(cred) = &client_with_creds.credential {
2000            let expires = (chrono::Utc::now() + chrono::Duration::seconds(30)).timestamp();
2001            let signature = cred.sign("GET", "/realtime", expires, "");
2002
2003            let auth_message = BitmexAuthentication {
2004                op: BitmexWsAuthAction::AuthKeyExpires,
2005                args: (cred.api_key.to_string(), expires, signature),
2006            };
2007
2008            // Verify auth message structure
2009            assert_eq!(auth_message.op, BitmexWsAuthAction::AuthKeyExpires);
2010            assert_eq!(auth_message.args.0, "test_key");
2011            assert!(auth_message.args.1 > 0); // expires should be positive
2012            assert!(!auth_message.args.2.is_empty()); // signature should exist
2013        } else {
2014            panic!("Client should have credentials");
2015        }
2016
2017        // Test without credentials
2018        let client_no_creds = BitmexWebSocketClient::new(
2019            Some("ws://test.com".to_string()),
2020            None,
2021            None,
2022            Some(AccountId::new("BITMEX-TEST")),
2023            None,
2024        )
2025        .unwrap();
2026
2027        assert!(client_no_creds.credential.is_none());
2028    }
2029
2030    #[rstest]
2031    fn test_subscription_state_after_unsubscribe() {
2032        let client = BitmexWebSocketClient::new(
2033            Some("ws://test.com".to_string()),
2034            Some("test_key".to_string()),
2035            Some("test_secret".to_string()),
2036            Some(AccountId::new("BITMEX-TEST")),
2037            None,
2038        )
2039        .unwrap();
2040
2041        // Set up initial subscriptions
2042        let subs = client.subscriptions.confirmed();
2043        subs.insert(BitmexWsTopic::Trade.as_ref().to_string(), {
2044            let mut set = AHashSet::new();
2045            set.insert(Ustr::from("XBTUSD"));
2046            set.insert(Ustr::from("ETHUSD"));
2047            set
2048        });
2049
2050        subs.insert(BitmexWsTopic::OrderBookL2.as_ref().to_string(), {
2051            let mut set = AHashSet::new();
2052            set.insert(Ustr::from("XBTUSD"));
2053            set
2054        });
2055
2056        // Simulate unsubscribe logic (like from unsubscribe() method lines 586-599)
2057        let topic = format!("{}:ETHUSD", BitmexWsTopic::Trade.as_ref());
2058        if let Some((channel, symbol)) = topic.split_once(':')
2059            && let Some(mut entry) = subs.get_mut(channel)
2060        {
2061            entry.remove(&Ustr::from(symbol));
2062            if entry.is_empty() {
2063                drop(entry);
2064                subs.remove(channel);
2065            }
2066        }
2067
2068        // Build restoration topics after unsubscribe
2069        let mut topics_to_restore = Vec::new();
2070        for entry in subs.iter() {
2071            let (channel, symbols) = entry.pair();
2072            if symbols.is_empty() {
2073                topics_to_restore.push(channel.clone());
2074            } else {
2075                for symbol in symbols.iter() {
2076                    topics_to_restore.push(format!("{channel}:{symbol}"));
2077                }
2078            }
2079        }
2080
2081        // Should have XBTUSD trade but not ETHUSD trade
2082        let trade_xbt = format!("{}:XBTUSD", BitmexWsTopic::Trade.as_ref());
2083        let trade_eth = format!("{}:ETHUSD", BitmexWsTopic::Trade.as_ref());
2084        let book_xbt = format!("{}:XBTUSD", BitmexWsTopic::OrderBookL2.as_ref());
2085
2086        assert!(topics_to_restore.contains(&trade_xbt));
2087        assert!(!topics_to_restore.contains(&trade_eth));
2088        assert!(topics_to_restore.contains(&book_xbt));
2089        assert_eq!(topics_to_restore.len(), 2);
2090    }
2091}