nautilus_bitmex/websocket/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides the WebSocket client integration for the [BitMEX](https://bitmex.com) WebSocket API.
17//!
18//! This module defines and implements a [`BitmexWebSocketClient`] for
19//! connecting to BitMEX WebSocket streams. It handles authentication (when credentials
20//! are provided), manages subscriptions to market data and account update channels,
21//! and parses incoming messages into structured Nautilus domain objects.
22
23use std::sync::{
24    Arc,
25    atomic::{AtomicBool, AtomicU8, Ordering},
26};
27
28use arc_swap::ArcSwap;
29use dashmap::DashMap;
30use futures_util::Stream;
31use nautilus_common::runtime::get_runtime;
32use nautilus_core::{consts::NAUTILUS_USER_AGENT, env::get_env_var};
33use nautilus_model::{
34    data::bar::BarType,
35    enums::OrderType,
36    identifiers::{AccountId, ClientOrderId, InstrumentId},
37    instruments::{Instrument, InstrumentAny},
38};
39use nautilus_network::{
40    mode::ConnectionMode,
41    websocket::{
42        AUTHENTICATION_TIMEOUT_SECS, AuthTracker, PingHandler, SubscriptionState, WebSocketClient,
43        WebSocketConfig, channel_message_handler,
44    },
45};
46use reqwest::header::USER_AGENT;
47use tokio::time::Duration;
48use tokio_tungstenite::tungstenite::Message;
49use ustr::Ustr;
50
51use super::{
52    enums::{BitmexWsAuthAction, BitmexWsAuthChannel, BitmexWsOperation, BitmexWsTopic},
53    error::BitmexWsError,
54    handler::{FeedHandler, HandlerCommand},
55    messages::{BitmexAuthentication, BitmexSubscription, NautilusWsMessage},
56    parse::{is_index_symbol, topic_from_bar_spec},
57};
58use crate::common::{
59    consts::{BITMEX_WS_TOPIC_DELIMITER, BITMEX_WS_URL},
60    credential::Credential,
61};
62
63/// Provides a WebSocket client for connecting to the [BitMEX](https://bitmex.com) real-time API.
64///
65/// Key runtime patterns:
66/// - Authentication handshakes are managed by the internal auth tracker, ensuring resubscriptions
67///   occur only after BitMEX acknowledges `authKey` messages.
68/// - The subscription state maintains pending and confirmed topics so reconnection replay is
69///   deterministic and per-topic errors are surfaced.
70#[derive(Clone, Debug)]
71#[cfg_attr(
72    feature = "python",
73    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.adapters")
74)]
75pub struct BitmexWebSocketClient {
76    url: String,
77    credential: Option<Credential>,
78    heartbeat: Option<u64>,
79    account_id: AccountId,
80    auth_tracker: AuthTracker,
81    signal: Arc<AtomicBool>,
82    connection_mode: Arc<ArcSwap<AtomicU8>>,
83    cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<HandlerCommand>>>,
84    out_rx: Option<Arc<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>>>,
85    task_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
86    subscriptions: SubscriptionState,
87    tracked_subscriptions: Arc<DashMap<String, ()>>,
88    instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
89    order_type_cache: Arc<DashMap<ClientOrderId, OrderType>>,
90    order_symbol_cache: Arc<DashMap<ClientOrderId, Ustr>>,
91}
92
93impl BitmexWebSocketClient {
94    /// Creates a new [`BitmexWebSocketClient`] instance.
95    ///
96    /// # Errors
97    ///
98    /// Returns an error if only one of `api_key` or `api_secret` is provided (both or neither required).
99    pub fn new(
100        url: Option<String>,
101        api_key: Option<String>,
102        api_secret: Option<String>,
103        account_id: Option<AccountId>,
104        heartbeat: Option<u64>,
105    ) -> anyhow::Result<Self> {
106        let credential = match (api_key, api_secret) {
107            (Some(key), Some(secret)) => Some(Credential::new(key, secret)),
108            (None, None) => None,
109            _ => anyhow::bail!("Both `api_key` and `api_secret` must be provided together"),
110        };
111
112        let account_id = account_id.unwrap_or(AccountId::from("BITMEX-master"));
113
114        let initial_mode = AtomicU8::new(ConnectionMode::Closed.as_u8());
115        let connection_mode = Arc::new(ArcSwap::from_pointee(initial_mode));
116
117        // We don't have a handler yet; this placeholder keeps cache_instrument() working,
118        // connect() swaps in the real channel and replays any queued instruments so the
119        // handler sees them once it starts.
120        let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
121
122        Ok(Self {
123            url: url.unwrap_or(BITMEX_WS_URL.to_string()),
124            credential,
125            heartbeat,
126            account_id,
127            auth_tracker: AuthTracker::new(),
128            signal: Arc::new(AtomicBool::new(false)),
129            connection_mode,
130            cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
131            out_rx: None,
132            task_handle: None,
133            subscriptions: SubscriptionState::new(BITMEX_WS_TOPIC_DELIMITER),
134            tracked_subscriptions: Arc::new(DashMap::new()),
135            instruments_cache: Arc::new(DashMap::new()),
136            order_type_cache: Arc::new(DashMap::new()),
137            order_symbol_cache: Arc::new(DashMap::new()),
138        })
139    }
140
141    /// Creates a new authenticated [`BitmexWebSocketClient`] using environment variables.
142    ///
143    /// # Errors
144    ///
145    /// Returns an error if environment variables are not set or credentials are invalid.
146    pub fn from_env() -> anyhow::Result<Self> {
147        let url = get_env_var("BITMEX_WS_URL")?;
148        let api_key = get_env_var("BITMEX_API_KEY")?;
149        let api_secret = get_env_var("BITMEX_API_SECRET")?;
150
151        Self::new(Some(url), Some(api_key), Some(api_secret), None, None)
152    }
153
154    /// Returns the websocket url being used by the client.
155    #[must_use]
156    pub const fn url(&self) -> &str {
157        self.url.as_str()
158    }
159
160    /// Returns the public API key being used by the client.
161    #[must_use]
162    pub fn api_key(&self) -> Option<&str> {
163        self.credential.as_ref().map(|c| c.api_key.as_str())
164    }
165
166    /// Returns a masked version of the API key for logging purposes.
167    #[must_use]
168    pub fn api_key_masked(&self) -> Option<String> {
169        self.credential.as_ref().map(|c| c.api_key_masked())
170    }
171
172    /// Returns a value indicating whether the client is active.
173    #[must_use]
174    pub fn is_active(&self) -> bool {
175        let connection_mode_arc = self.connection_mode.load();
176        ConnectionMode::from_atomic(&connection_mode_arc).is_active()
177            && !self.signal.load(Ordering::Relaxed)
178    }
179
180    /// Returns a value indicating whether the client is closed.
181    #[must_use]
182    pub fn is_closed(&self) -> bool {
183        let connection_mode_arc = self.connection_mode.load();
184        ConnectionMode::from_atomic(&connection_mode_arc).is_closed()
185            || self.signal.load(Ordering::Relaxed)
186    }
187
188    /// Sets the account ID.
189    pub fn set_account_id(&mut self, account_id: AccountId) {
190        self.account_id = account_id;
191    }
192
193    /// Caches multiple instruments.
194    ///
195    /// Clears the existing cache first, then adds all provided instruments.
196    pub fn cache_instruments(&mut self, instruments: Vec<InstrumentAny>) {
197        self.instruments_cache.clear();
198        let mut count = 0;
199
200        log::debug!("Initializing BitMEX instrument cache");
201
202        for inst in instruments {
203            let symbol = inst.symbol().inner();
204            self.instruments_cache.insert(symbol, inst.clone());
205            log::debug!("Cached instrument: {symbol}");
206            count += 1;
207        }
208
209        log::info!("BitMEX instrument cache initialized with {count} instruments");
210    }
211
212    /// Caches a single instrument.
213    ///
214    /// Any existing instrument with the same symbol will be replaced.
215    pub fn cache_instrument(&self, instrument: InstrumentAny) {
216        self.instruments_cache
217            .insert(instrument.symbol().inner(), instrument.clone());
218
219        // Before connect() the handler isn't running; this send will fail and that's expected
220        // because connect() replays the instruments via InitializeInstruments
221        if let Ok(cmd_tx) = self.cmd_tx.try_read()
222            && let Err(e) = cmd_tx.send(HandlerCommand::UpdateInstrument(instrument))
223        {
224            log::debug!("Failed to send instrument update to handler: {e}");
225        }
226    }
227
228    /// Connect to the BitMEX WebSocket server.
229    ///
230    /// # Errors
231    ///
232    /// Returns an error if the WebSocket connection fails or authentication fails (if credentials provided).
233    ///
234    /// # Panics
235    ///
236    /// Panics if subscription or authentication messages fail to serialize to JSON.
237    pub async fn connect(&mut self) -> Result<(), BitmexWsError> {
238        let (client, raw_rx) = self.connect_inner().await?;
239
240        // Replace connection state so all clones see the underlying WebSocketClient's state
241        self.connection_mode.store(client.connection_mode_atomic());
242
243        let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<NautilusWsMessage>();
244        self.out_rx = Some(Arc::new(out_rx));
245
246        let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
247        *self.cmd_tx.write().await = cmd_tx.clone();
248
249        // Send WebSocketClient to handler
250        if let Err(e) = cmd_tx.send(HandlerCommand::SetClient(client)) {
251            return Err(BitmexWsError::ClientError(format!(
252                "Failed to send WebSocketClient to handler: {e}"
253            )));
254        }
255
256        // Replay cached instruments to the new handler via the new channel
257        if !self.instruments_cache.is_empty() {
258            let cached_instruments: Vec<InstrumentAny> = self
259                .instruments_cache
260                .iter()
261                .map(|entry| entry.value().clone())
262                .collect();
263            if let Err(e) = cmd_tx.send(HandlerCommand::InitializeInstruments(cached_instruments)) {
264                tracing::error!("Failed to replay instruments to handler: {e}");
265            }
266        }
267
268        let signal = self.signal.clone();
269        let account_id = self.account_id;
270        let credential = self.credential.clone();
271        let auth_tracker = self.auth_tracker.clone();
272        let subscriptions = self.subscriptions.clone();
273        let order_type_cache = self.order_type_cache.clone();
274        let order_symbol_cache = self.order_symbol_cache.clone();
275        let cmd_tx_for_reconnect = cmd_tx.clone();
276
277        let stream_handle = get_runtime().spawn(async move {
278            let mut handler = FeedHandler::new(
279                signal.clone(),
280                cmd_rx,
281                raw_rx,
282                out_tx,
283                account_id,
284                auth_tracker.clone(),
285                subscriptions.clone(),
286                order_type_cache,
287                order_symbol_cache,
288            );
289
290            // Helper closure to resubscribe all tracked subscriptions after reconnection
291            let resubscribe_all = || {
292                // Use SubscriptionState as source of truth for what to restore
293                let topics = subscriptions.all_topics();
294
295                if topics.is_empty() {
296                    return;
297                }
298
299                tracing::debug!(count = topics.len(), "Resubscribing to confirmed subscriptions");
300
301                for topic in &topics {
302                    subscriptions.mark_subscribe(topic.as_str());
303                }
304
305                // Serialize subscription messages
306                let mut payloads = Vec::with_capacity(topics.len());
307                for topic in &topics {
308                    let message = BitmexSubscription {
309                        op: BitmexWsOperation::Subscribe,
310                        args: vec![Ustr::from(topic.as_ref())],
311                    };
312                    if let Ok(payload) = serde_json::to_string(&message) {
313                        payloads.push(payload);
314                    }
315                }
316
317                if let Err(e) = cmd_tx_for_reconnect.send(HandlerCommand::Subscribe { topics: payloads }) {
318                    tracing::error!(error = %e, "Failed to send resubscribe command");
319                }
320            };
321
322            // Run message processing with reconnection handling
323            loop {
324                match handler.next().await {
325                    Some(NautilusWsMessage::Reconnected) => {
326                        if signal.load(Ordering::Relaxed) {
327                            continue;
328                        }
329
330                        log::info!("WebSocket reconnected");
331
332                        // Mark all confirmed subscriptions as failed so they transition to pending state
333                        let confirmed_topics: Vec<String> = {
334                            let confirmed = subscriptions.confirmed();
335                            let mut topics = Vec::new();
336
337                            for entry in confirmed.iter() {
338                                let (channel, symbols) = entry.pair();
339
340                                if *channel == BitmexWsTopic::Instrument.as_ref() {
341                                    continue;
342                                }
343
344                                for symbol in symbols.iter() {
345                                    if symbol.is_empty() {
346                                        topics.push(channel.to_string());
347                                    } else {
348                                        topics.push(format!("{channel}:{symbol}"));
349                                    }
350                                }
351                            }
352
353                            topics
354                        };
355
356                        if !confirmed_topics.is_empty() {
357                            tracing::debug!(count = confirmed_topics.len(), "Marking confirmed subscriptions as pending for replay");
358                            for topic in confirmed_topics {
359                                subscriptions.mark_failure(&topic);
360                            }
361                        }
362
363                        if let Some(cred) = &credential {
364                            tracing::debug!("Re-authenticating after reconnection");
365
366                            let expires = (chrono::Utc::now() + chrono::Duration::seconds(30)).timestamp();
367                            let signature = cred.sign("GET", "/realtime", expires, "");
368
369                            let auth_message = BitmexAuthentication {
370                                op: BitmexWsAuthAction::AuthKeyExpires,
371                                args: (cred.api_key.to_string(), expires, signature),
372                            };
373
374                            if let Ok(payload) = serde_json::to_string(&auth_message) {
375                                if let Err(e) = cmd_tx_for_reconnect.send(HandlerCommand::Authenticate { payload }) {
376                                    tracing::error!(error = %e, "Failed to send reconnection auth command");
377                                }
378                            } else {
379                                tracing::error!("Failed to serialize reconnection auth message");
380                            }
381                        }
382
383                        // Unauthenticated sessions resubscribe immediately after reconnection,
384                        // authenticated sessions wait for Authenticated message
385                        if credential.is_none() {
386                            tracing::debug!("No authentication required, resubscribing immediately");
387                            resubscribe_all();
388                        }
389
390                        // TODO: Implement proper Reconnected event forwarding to consumers.
391                        // Currently intercepted for internal housekeeping only. Will add new
392                        // message type from WebSocketClient to notify consumers of reconnections.
393
394                        continue;
395                    }
396                    Some(NautilusWsMessage::Authenticated) => {
397                        tracing::debug!("Authenticated after reconnection, resubscribing");
398                        resubscribe_all();
399                        continue;
400                    }
401                    Some(msg) => {
402                        if handler.send(msg).is_err() {
403                            tracing::error!("Failed to send message (receiver dropped)");
404                            break;
405                        }
406                    }
407                    None => {
408                        // Stream ended - check if it's a stop signal
409                        if handler.is_stopped() {
410                            tracing::debug!("Stop signal received, ending message processing");
411                            break;
412                        }
413                        // Otherwise it's an unexpected stream end
414                        tracing::warn!("WebSocket stream ended unexpectedly");
415                        break;
416                    }
417                }
418            }
419
420            tracing::debug!("Handler task exiting");
421        });
422
423        self.task_handle = Some(Arc::new(stream_handle));
424
425        if self.credential.is_some()
426            && let Err(e) = self.authenticate().await
427        {
428            return Err(e);
429        }
430
431        // Subscribe to instrument topic
432        let instrument_topic = BitmexWsTopic::Instrument.as_ref().to_string();
433        self.subscriptions.mark_subscribe(&instrument_topic);
434        self.tracked_subscriptions.insert(instrument_topic, ());
435
436        let subscribe_msg = BitmexSubscription {
437            op: BitmexWsOperation::Subscribe,
438            args: vec![Ustr::from(BitmexWsTopic::Instrument.as_ref())],
439        };
440
441        match serde_json::to_string(&subscribe_msg) {
442            Ok(subscribe_json) => {
443                if let Err(e) = self.cmd_tx.read().await.send(HandlerCommand::Subscribe {
444                    topics: vec![subscribe_json],
445                }) {
446                    log::error!("Failed to send subscribe command for instruments: {e}");
447                } else {
448                    log::debug!("Subscribed to all instruments");
449                }
450            }
451            Err(e) => {
452                tracing::error!(error = %e, "Failed to serialize subscribe message");
453            }
454        }
455
456        Ok(())
457    }
458
459    /// Connect to the WebSocket and return a message receiver.
460    ///
461    /// # Errors
462    ///
463    /// Returns an error if the WebSocket connection fails or if authentication fails (when credentials are provided).
464    async fn connect_inner(
465        &mut self,
466    ) -> Result<
467        (
468            WebSocketClient,
469            tokio::sync::mpsc::UnboundedReceiver<Message>,
470        ),
471        BitmexWsError,
472    > {
473        let (message_handler, rx) = channel_message_handler();
474
475        // No-op ping handler: handler owns the WebSocketClient and responds to pings directly
476        // in the message loop for minimal latency (see handler.rs pong response)
477        let ping_handler: PingHandler = Arc::new(move |_payload: Vec<u8>| {
478            // Handler responds to pings internally via select! loop
479        });
480
481        let config = WebSocketConfig {
482            url: self.url.clone(),
483            headers: vec![(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())],
484            heartbeat: self.heartbeat,
485            heartbeat_msg: None,
486            message_handler: Some(message_handler),
487            ping_handler: Some(ping_handler),
488            reconnect_timeout_ms: Some(5_000),
489            reconnect_delay_initial_ms: None, // Use default
490            reconnect_delay_max_ms: None,     // Use default
491            reconnect_backoff_factor: None,   // Use default
492            reconnect_jitter_ms: None,        // Use default
493            reconnect_max_attempts: None,
494        };
495
496        let keyed_quotas = vec![];
497        let client = WebSocketClient::connect(
498            config,
499            None, // post_reconnection
500            keyed_quotas,
501            None, // default_quota
502        )
503        .await
504        .map_err(|e| BitmexWsError::ClientError(e.to_string()))?;
505
506        Ok((client, rx))
507    }
508
509    /// Authenticate the WebSocket connection using the provided credentials.
510    ///
511    /// # Errors
512    ///
513    /// Returns an error if the WebSocket is not connected, if authentication fails,
514    /// or if credentials are not available.
515    async fn authenticate(&self) -> Result<(), BitmexWsError> {
516        let credential = match &self.credential {
517            Some(credential) => credential,
518            None => {
519                return Err(BitmexWsError::AuthenticationError(
520                    "API credentials not available to authenticate".to_string(),
521                ));
522            }
523        };
524
525        let receiver = self.auth_tracker.begin();
526
527        let expires = (chrono::Utc::now() + chrono::Duration::seconds(30)).timestamp();
528        let signature = credential.sign("GET", "/realtime", expires, "");
529
530        let auth_message = BitmexAuthentication {
531            op: BitmexWsAuthAction::AuthKeyExpires,
532            args: (credential.api_key.to_string(), expires, signature),
533        };
534
535        let auth_json = serde_json::to_string(&auth_message).map_err(|e| {
536            let msg = format!("Failed to serialize auth message: {e}");
537            self.auth_tracker.fail(msg.clone());
538            BitmexWsError::AuthenticationError(msg)
539        })?;
540
541        // Send Authenticate command to handler
542        self.cmd_tx
543            .read()
544            .await
545            .send(HandlerCommand::Authenticate { payload: auth_json })
546            .map_err(|e| {
547                let msg = format!("Failed to send authenticate command: {e}");
548                self.auth_tracker.fail(msg.clone());
549                BitmexWsError::AuthenticationError(msg)
550            })?;
551
552        self.auth_tracker
553            .wait_for_result::<BitmexWsError>(
554                Duration::from_secs(AUTHENTICATION_TIMEOUT_SECS),
555                receiver,
556            )
557            .await
558    }
559
560    /// Wait until the WebSocket connection is active.
561    ///
562    /// # Errors
563    ///
564    /// Returns an error if the connection times out.
565    pub async fn wait_until_active(&self, timeout_secs: f64) -> Result<(), BitmexWsError> {
566        let timeout = tokio::time::Duration::from_secs_f64(timeout_secs);
567
568        tokio::time::timeout(timeout, async {
569            while !self.is_active() {
570                tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
571            }
572        })
573        .await
574        .map_err(|_| {
575            BitmexWsError::ClientError(format!(
576                "WebSocket connection timeout after {timeout_secs} seconds"
577            ))
578        })?;
579
580        Ok(())
581    }
582
583    /// Provides the internal stream as a channel-based stream.
584    ///
585    /// # Panics
586    ///
587    /// This function panics:
588    /// - If the websocket is not connected.
589    /// - If `stream` has already been called somewhere else (stream receiver is then taken).
590    pub fn stream(&mut self) -> impl Stream<Item = NautilusWsMessage> + use<> {
591        let rx = self
592            .out_rx
593            .take()
594            .expect("Stream receiver already taken or not connected");
595        let mut rx = Arc::try_unwrap(rx).expect("Cannot take ownership - other references exist");
596        async_stream::stream! {
597            while let Some(msg) = rx.recv().await {
598                yield msg;
599            }
600        }
601    }
602
603    /// Closes the client.
604    ///
605    /// # Errors
606    ///
607    /// Returns an error if the WebSocket is not connected or if closing fails.
608    ///
609    /// # Panics
610    ///
611    /// Panics if the task handle cannot be unwrapped (should never happen in normal usage).
612    pub async fn close(&mut self) -> Result<(), BitmexWsError> {
613        log::debug!("Starting close process");
614
615        self.signal.store(true, Ordering::Relaxed);
616
617        // Send Disconnect command to handler
618        if let Err(e) = self.cmd_tx.read().await.send(HandlerCommand::Disconnect) {
619            log::debug!(
620                "Failed to send disconnect command (handler may already be shut down): {e}"
621            );
622        }
623
624        // Clean up task handle with timeout
625        if let Some(task_handle) = self.task_handle.take() {
626            match Arc::try_unwrap(task_handle) {
627                Ok(handle) => {
628                    log::debug!("Waiting for task handle to complete");
629                    match tokio::time::timeout(Duration::from_secs(2), handle).await {
630                        Ok(Ok(())) => log::debug!("Task handle completed successfully"),
631                        Ok(Err(e)) => log::error!("Task handle encountered an error: {e:?}"),
632                        Err(_) => {
633                            log::warn!(
634                                "Timeout waiting for task handle, task may still be running"
635                            );
636                            // The task will be dropped and should clean up automatically
637                        }
638                    }
639                }
640                Err(arc_handle) => {
641                    log::debug!(
642                        "Cannot take ownership of task handle - other references exist, aborting task"
643                    );
644                    arc_handle.abort();
645                }
646            }
647        } else {
648            log::debug!("No task handle to await");
649        }
650
651        log::debug!("Closed");
652
653        Ok(())
654    }
655
656    /// Subscribe to the specified topics.
657    ///
658    /// # Errors
659    ///
660    /// Returns an error if the WebSocket is not connected or if sending the subscription message fails.
661    ///
662    /// # Panics
663    ///
664    /// Panics if serialization of WebSocket messages fails (should never happen).
665    pub async fn subscribe(&self, topics: Vec<String>) -> Result<(), BitmexWsError> {
666        log::debug!("Subscribing to topics: {topics:?}");
667
668        for topic in &topics {
669            self.subscriptions.mark_subscribe(topic.as_str());
670            self.tracked_subscriptions.insert(topic.clone(), ());
671        }
672
673        // Serialize subscription messages
674        let mut payloads = Vec::with_capacity(topics.len());
675        for topic in &topics {
676            let message = BitmexSubscription {
677                op: BitmexWsOperation::Subscribe,
678                args: vec![Ustr::from(topic.as_ref())],
679            };
680            let payload = serde_json::to_string(&message).map_err(|e| {
681                BitmexWsError::SubscriptionError(format!("Failed to serialize subscription: {e}"))
682            })?;
683            payloads.push(payload);
684        }
685
686        // Send Subscribe command to handler
687        let cmd = HandlerCommand::Subscribe { topics: payloads };
688
689        self.send_cmd(cmd).await.map_err(|e| {
690            BitmexWsError::SubscriptionError(format!("Failed to send subscribe command: {e}"))
691        })
692    }
693
694    /// Unsubscribe from the specified topics.
695    ///
696    /// # Errors
697    ///
698    /// Returns an error if the WebSocket is not connected or if sending the unsubscription message fails.
699    async fn unsubscribe(&self, topics: Vec<String>) -> Result<(), BitmexWsError> {
700        log::debug!("Attempting to unsubscribe from topics: {topics:?}");
701
702        if self.signal.load(Ordering::Relaxed) {
703            log::debug!("Shutdown signal detected, skipping unsubscribe");
704            return Ok(());
705        }
706
707        for topic in &topics {
708            self.subscriptions.mark_unsubscribe(topic.as_str());
709            self.tracked_subscriptions.remove(topic);
710        }
711
712        // Serialize unsubscription messages
713        let mut payloads = Vec::with_capacity(topics.len());
714        for topic in &topics {
715            let message = BitmexSubscription {
716                op: BitmexWsOperation::Unsubscribe,
717                args: vec![Ustr::from(topic.as_ref())],
718            };
719            if let Ok(payload) = serde_json::to_string(&message) {
720                payloads.push(payload);
721            }
722        }
723
724        // Send Unsubscribe command to handler
725        let cmd = HandlerCommand::Unsubscribe { topics: payloads };
726
727        if let Err(e) = self.send_cmd(cmd).await {
728            tracing::debug!(error = %e, "Failed to send unsubscribe command");
729        }
730
731        Ok(())
732    }
733
734    /// Get the current number of active subscriptions.
735    #[must_use]
736    pub fn subscription_count(&self) -> usize {
737        self.subscriptions.len()
738    }
739
740    pub fn get_subscriptions(&self, instrument_id: InstrumentId) -> Vec<String> {
741        let symbol = instrument_id.symbol.inner();
742        let confirmed = self.subscriptions.confirmed();
743        let mut channels = Vec::with_capacity(confirmed.len());
744
745        for entry in confirmed.iter() {
746            let (channel, symbols) = entry.pair();
747            if symbols.contains(&symbol) {
748                // Return the full topic string (e.g., "orderBookL2:XBTUSD")
749                channels.push(format!("{channel}:{symbol}"));
750            } else {
751                let has_channel_marker = symbols.iter().any(|s| s.is_empty());
752                if has_channel_marker
753                    && (*channel == BitmexWsAuthChannel::Execution.as_ref()
754                        || *channel == BitmexWsAuthChannel::Order.as_ref())
755                {
756                    // These are account-level subscriptions without symbols
757                    channels.push(channel.to_string());
758                }
759            }
760        }
761
762        channels
763    }
764
765    /// Subscribe to instrument updates for all instruments on the venue.
766    ///
767    /// # Errors
768    ///
769    /// Returns an error if the WebSocket is not connected or if the subscription fails.
770    pub async fn subscribe_instruments(&self) -> Result<(), BitmexWsError> {
771        // Already subscribed automatically on connection
772        log::debug!("Already subscribed to all instruments on connection, skipping");
773        Ok(())
774    }
775
776    /// Subscribe to instrument updates (mark/index prices) for the specified instrument.
777    ///
778    /// # Errors
779    ///
780    /// Returns an error if the WebSocket is not connected or if the subscription fails.
781    pub async fn subscribe_instrument(
782        &self,
783        instrument_id: InstrumentId,
784    ) -> Result<(), BitmexWsError> {
785        // Already subscribed to all instruments on connection
786        log::debug!(
787            "Already subscribed to all instruments on connection (includes {instrument_id}), skipping"
788        );
789        Ok(())
790    }
791
792    /// Subscribe to order book updates for the specified instrument.
793    ///
794    /// # Errors
795    ///
796    /// Returns an error if the WebSocket is not connected or if the subscription fails.
797    pub async fn subscribe_book(&self, instrument_id: InstrumentId) -> Result<(), BitmexWsError> {
798        let topic = BitmexWsTopic::OrderBookL2;
799        let symbol = instrument_id.symbol.inner();
800        self.subscribe(vec![format!("{topic}:{symbol}")]).await
801    }
802
803    /// Subscribe to order book L2 (25 levels) updates for the specified instrument.
804    ///
805    /// # Errors
806    ///
807    /// Returns an error if the WebSocket is not connected or if the subscription fails.
808    pub async fn subscribe_book_25(
809        &self,
810        instrument_id: InstrumentId,
811    ) -> Result<(), BitmexWsError> {
812        let topic = BitmexWsTopic::OrderBookL2_25;
813        let symbol = instrument_id.symbol.inner();
814        self.subscribe(vec![format!("{topic}:{symbol}")]).await
815    }
816
817    /// Subscribe to order book depth 10 updates for the specified instrument.
818    ///
819    /// # Errors
820    ///
821    /// Returns an error if the WebSocket is not connected or if the subscription fails.
822    pub async fn subscribe_book_depth10(
823        &self,
824        instrument_id: InstrumentId,
825    ) -> Result<(), BitmexWsError> {
826        let topic = BitmexWsTopic::OrderBook10;
827        let symbol = instrument_id.symbol.inner();
828        self.subscribe(vec![format!("{topic}:{symbol}")]).await
829    }
830
831    /// Subscribe to quote updates for the specified instrument.
832    ///
833    /// Note: Index symbols (starting with '.') do not have quotes and will be silently ignored.
834    ///
835    /// # Errors
836    ///
837    /// Returns an error if the WebSocket is not connected or if the subscription fails.
838    pub async fn subscribe_quotes(&self, instrument_id: InstrumentId) -> Result<(), BitmexWsError> {
839        let symbol = instrument_id.symbol.inner();
840
841        // Index symbols don't have quotes (bid/ask), only a single price
842        if is_index_symbol(&instrument_id.symbol.inner()) {
843            tracing::warn!("Ignoring quote subscription for index symbol: {symbol}");
844            return Ok(());
845        }
846
847        let topic = BitmexWsTopic::Quote;
848        self.subscribe(vec![format!("{topic}:{symbol}")]).await
849    }
850
851    /// Subscribe to trade updates for the specified instrument.
852    ///
853    /// Note: Index symbols (starting with '.') do not have trades and will be silently ignored.
854    ///
855    /// # Errors
856    ///
857    /// Returns an error if the WebSocket is not connected or if the subscription fails.
858    pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> Result<(), BitmexWsError> {
859        let symbol = instrument_id.symbol.inner();
860
861        // Index symbols don't have trades
862        if is_index_symbol(&symbol) {
863            tracing::warn!("Ignoring trade subscription for index symbol: {symbol}");
864            return Ok(());
865        }
866
867        let topic = BitmexWsTopic::Trade;
868        self.subscribe(vec![format!("{topic}:{symbol}")]).await
869    }
870
871    /// Subscribe to mark price updates for the specified instrument.
872    ///
873    /// # Errors
874    ///
875    /// Returns an error if the WebSocket is not connected or if the subscription fails.
876    pub async fn subscribe_mark_prices(
877        &self,
878        instrument_id: InstrumentId,
879    ) -> Result<(), BitmexWsError> {
880        self.subscribe_instrument(instrument_id).await
881    }
882
883    /// Subscribe to index price updates for the specified instrument.
884    ///
885    /// # Errors
886    ///
887    /// Returns an error if the WebSocket is not connected or if the subscription fails.
888    pub async fn subscribe_index_prices(
889        &self,
890        instrument_id: InstrumentId,
891    ) -> Result<(), BitmexWsError> {
892        self.subscribe_instrument(instrument_id).await
893    }
894
895    /// Subscribe to funding rate updates for the specified instrument.
896    ///
897    /// # Errors
898    ///
899    /// Returns an error if the WebSocket is not connected or if the subscription fails.
900    pub async fn subscribe_funding_rates(
901        &self,
902        instrument_id: InstrumentId,
903    ) -> Result<(), BitmexWsError> {
904        let topic = BitmexWsTopic::Funding;
905        let symbol = instrument_id.symbol.inner();
906        self.subscribe(vec![format!("{topic}:{symbol}")]).await
907    }
908
909    /// Subscribe to bar updates for the specified bar type.
910    ///
911    /// # Errors
912    ///
913    /// Returns an error if the WebSocket is not connected or if the subscription fails.
914    pub async fn subscribe_bars(&self, bar_type: BarType) -> Result<(), BitmexWsError> {
915        let topic = topic_from_bar_spec(bar_type.spec());
916        let symbol = bar_type.instrument_id().symbol.to_string();
917        self.subscribe(vec![format!("{topic}:{symbol}")]).await
918    }
919
920    /// Unsubscribe from instrument updates for all instruments on the venue.
921    ///
922    /// # Errors
923    ///
924    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
925    pub async fn unsubscribe_instruments(&self) -> Result<(), BitmexWsError> {
926        // No-op: instruments are required for proper operation
927        log::debug!(
928            "Instruments subscription maintained for proper operation, skipping unsubscribe"
929        );
930        Ok(())
931    }
932
933    /// Unsubscribe from instrument updates (mark/index prices) for the specified instrument.
934    ///
935    /// # Errors
936    ///
937    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
938    pub async fn unsubscribe_instrument(
939        &self,
940        instrument_id: InstrumentId,
941    ) -> Result<(), BitmexWsError> {
942        // No-op: instruments are required for proper operation
943        log::debug!(
944            "Instruments subscription maintained for proper operation (includes {instrument_id}), skipping unsubscribe"
945        );
946        Ok(())
947    }
948
949    /// Unsubscribe from order book updates for the specified instrument.
950    ///
951    /// # Errors
952    ///
953    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
954    pub async fn unsubscribe_book(&self, instrument_id: InstrumentId) -> Result<(), BitmexWsError> {
955        let topic = BitmexWsTopic::OrderBookL2;
956        let symbol = instrument_id.symbol.inner();
957        self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
958    }
959
960    /// Unsubscribe from order book L2 (25 levels) updates for the specified instrument.
961    ///
962    /// # Errors
963    ///
964    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
965    pub async fn unsubscribe_book_25(
966        &self,
967        instrument_id: InstrumentId,
968    ) -> Result<(), BitmexWsError> {
969        let topic = BitmexWsTopic::OrderBookL2_25;
970        let symbol = instrument_id.symbol.inner();
971        self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
972    }
973
974    /// Unsubscribe from order book depth 10 updates for the specified instrument.
975    ///
976    /// # Errors
977    ///
978    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
979    pub async fn unsubscribe_book_depth10(
980        &self,
981        instrument_id: InstrumentId,
982    ) -> Result<(), BitmexWsError> {
983        let topic = BitmexWsTopic::OrderBook10;
984        let symbol = instrument_id.symbol.inner();
985        self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
986    }
987
988    /// Unsubscribe from quote updates for the specified instrument.
989    ///
990    /// # Errors
991    ///
992    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
993    pub async fn unsubscribe_quotes(
994        &self,
995        instrument_id: InstrumentId,
996    ) -> Result<(), BitmexWsError> {
997        let symbol = instrument_id.symbol.inner();
998
999        // Index symbols don't have quotes
1000        if is_index_symbol(&symbol) {
1001            return Ok(());
1002        }
1003
1004        let topic = BitmexWsTopic::Quote;
1005        self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
1006    }
1007
1008    /// Unsubscribe from trade updates for the specified instrument.
1009    ///
1010    /// # Errors
1011    ///
1012    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1013    pub async fn unsubscribe_trades(
1014        &self,
1015        instrument_id: InstrumentId,
1016    ) -> Result<(), BitmexWsError> {
1017        let symbol = instrument_id.symbol.inner();
1018
1019        // Index symbols don't have trades
1020        if is_index_symbol(&symbol) {
1021            return Ok(());
1022        }
1023
1024        let topic = BitmexWsTopic::Trade;
1025        self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
1026    }
1027
1028    /// Unsubscribe from mark price updates for the specified instrument.
1029    ///
1030    /// # Errors
1031    ///
1032    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1033    pub async fn unsubscribe_mark_prices(
1034        &self,
1035        instrument_id: InstrumentId,
1036    ) -> Result<(), BitmexWsError> {
1037        // No-op: instrument channel shared with index prices
1038        log::debug!(
1039            "Mark prices for {instrument_id} uses shared instrument channel, skipping unsubscribe"
1040        );
1041        Ok(())
1042    }
1043
1044    /// Unsubscribe from index price updates for the specified instrument.
1045    ///
1046    /// # Errors
1047    ///
1048    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1049    pub async fn unsubscribe_index_prices(
1050        &self,
1051        instrument_id: InstrumentId,
1052    ) -> Result<(), BitmexWsError> {
1053        // No-op: instrument channel shared with mark prices
1054        log::debug!(
1055            "Index prices for {instrument_id} uses shared instrument channel, skipping unsubscribe"
1056        );
1057        Ok(())
1058    }
1059
1060    /// Unsubscribe from funding rate updates for the specified instrument.
1061    ///
1062    /// # Errors
1063    ///
1064    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1065    pub async fn unsubscribe_funding_rates(
1066        &self,
1067        instrument_id: InstrumentId,
1068    ) -> Result<(), BitmexWsError> {
1069        // No-op: unsubscribing during shutdown causes race conditions
1070        log::debug!(
1071            "Funding rates for {instrument_id}, skipping unsubscribe to avoid shutdown race"
1072        );
1073        Ok(())
1074    }
1075
1076    /// Unsubscribe from bar updates for the specified bar type.
1077    ///
1078    /// # Errors
1079    ///
1080    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1081    pub async fn unsubscribe_bars(&self, bar_type: BarType) -> Result<(), BitmexWsError> {
1082        let topic = topic_from_bar_spec(bar_type.spec());
1083        let symbol = bar_type.instrument_id().symbol.to_string();
1084        self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
1085    }
1086
1087    /// Subscribe to order updates for the authenticated account.
1088    ///
1089    /// # Errors
1090    ///
1091    /// Returns an error if the WebSocket is not connected, not authenticated, or if the subscription fails.
1092    pub async fn subscribe_orders(&self) -> Result<(), BitmexWsError> {
1093        if self.credential.is_none() {
1094            return Err(BitmexWsError::MissingCredentials);
1095        }
1096        self.subscribe(vec![BitmexWsAuthChannel::Order.to_string()])
1097            .await
1098    }
1099
1100    /// Subscribe to execution updates for the authenticated account.
1101    ///
1102    /// # Errors
1103    ///
1104    /// Returns an error if the WebSocket is not connected, not authenticated, or if the subscription fails.
1105    pub async fn subscribe_executions(&self) -> Result<(), BitmexWsError> {
1106        if self.credential.is_none() {
1107            return Err(BitmexWsError::MissingCredentials);
1108        }
1109        self.subscribe(vec![BitmexWsAuthChannel::Execution.to_string()])
1110            .await
1111    }
1112
1113    /// Subscribe to position updates for the authenticated account.
1114    ///
1115    /// # Errors
1116    ///
1117    /// Returns an error if the WebSocket is not connected, not authenticated, or if the subscription fails.
1118    pub async fn subscribe_positions(&self) -> Result<(), BitmexWsError> {
1119        if self.credential.is_none() {
1120            return Err(BitmexWsError::MissingCredentials);
1121        }
1122        self.subscribe(vec![BitmexWsAuthChannel::Position.to_string()])
1123            .await
1124    }
1125
1126    /// Subscribe to margin updates for the authenticated account.
1127    ///
1128    /// # Errors
1129    ///
1130    /// Returns an error if the WebSocket is not connected, not authenticated, or if the subscription fails.
1131    pub async fn subscribe_margin(&self) -> Result<(), BitmexWsError> {
1132        if self.credential.is_none() {
1133            return Err(BitmexWsError::MissingCredentials);
1134        }
1135        self.subscribe(vec![BitmexWsAuthChannel::Margin.to_string()])
1136            .await
1137    }
1138
1139    /// Subscribe to wallet updates for the authenticated account.
1140    ///
1141    /// # Errors
1142    ///
1143    /// Returns an error if the WebSocket is not connected, not authenticated, or if the subscription fails.
1144    pub async fn subscribe_wallet(&self) -> Result<(), BitmexWsError> {
1145        if self.credential.is_none() {
1146            return Err(BitmexWsError::MissingCredentials);
1147        }
1148        self.subscribe(vec![BitmexWsAuthChannel::Wallet.to_string()])
1149            .await
1150    }
1151
1152    /// Unsubscribe from order updates for the authenticated account.
1153    ///
1154    /// # Errors
1155    ///
1156    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1157    pub async fn unsubscribe_orders(&self) -> Result<(), BitmexWsError> {
1158        self.unsubscribe(vec![BitmexWsAuthChannel::Order.to_string()])
1159            .await
1160    }
1161
1162    /// Unsubscribe from execution updates for the authenticated account.
1163    ///
1164    /// # Errors
1165    ///
1166    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1167    pub async fn unsubscribe_executions(&self) -> Result<(), BitmexWsError> {
1168        self.unsubscribe(vec![BitmexWsAuthChannel::Execution.to_string()])
1169            .await
1170    }
1171
1172    /// Unsubscribe from position updates for the authenticated account.
1173    ///
1174    /// # Errors
1175    ///
1176    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1177    pub async fn unsubscribe_positions(&self) -> Result<(), BitmexWsError> {
1178        self.unsubscribe(vec![BitmexWsAuthChannel::Position.to_string()])
1179            .await
1180    }
1181
1182    /// Unsubscribe from margin updates for the authenticated account.
1183    ///
1184    /// # Errors
1185    ///
1186    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1187    pub async fn unsubscribe_margin(&self) -> Result<(), BitmexWsError> {
1188        self.unsubscribe(vec![BitmexWsAuthChannel::Margin.to_string()])
1189            .await
1190    }
1191
1192    /// Unsubscribe from wallet updates for the authenticated account.
1193    ///
1194    /// # Errors
1195    ///
1196    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1197    pub async fn unsubscribe_wallet(&self) -> Result<(), BitmexWsError> {
1198        self.unsubscribe(vec![BitmexWsAuthChannel::Wallet.to_string()])
1199            .await
1200    }
1201
1202    /// Sends a command to the handler.
1203    async fn send_cmd(&self, cmd: HandlerCommand) -> Result<(), BitmexWsError> {
1204        self.cmd_tx
1205            .read()
1206            .await
1207            .send(cmd)
1208            .map_err(|e| BitmexWsError::ClientError(format!("Handler not available: {e}")))
1209    }
1210}
1211
1212////////////////////////////////////////////////////////////////////////////////
1213// Tests
1214////////////////////////////////////////////////////////////////////////////////
1215
1216#[cfg(test)]
1217mod tests {
1218    use ahash::AHashSet;
1219    use rstest::rstest;
1220    use ustr::Ustr;
1221
1222    use super::*;
1223
1224    #[rstest]
1225    fn test_reconnect_topics_restoration_logic() {
1226        // Create real client with credentials
1227        let client = BitmexWebSocketClient::new(
1228            Some("ws://test.com".to_string()),
1229            Some("test_key".to_string()),
1230            Some("test_secret".to_string()),
1231            Some(AccountId::new("BITMEX-TEST")),
1232            None,
1233        )
1234        .unwrap();
1235
1236        // Populate subscriptions like they would be during normal operation
1237        let subs = client.subscriptions.confirmed();
1238        subs.insert(Ustr::from(BitmexWsTopic::Trade.as_ref()), {
1239            let mut set = AHashSet::new();
1240            set.insert(Ustr::from("XBTUSD"));
1241            set.insert(Ustr::from("ETHUSD"));
1242            set
1243        });
1244
1245        subs.insert(Ustr::from(BitmexWsTopic::OrderBookL2.as_ref()), {
1246            let mut set = AHashSet::new();
1247            set.insert(Ustr::from("XBTUSD"));
1248            set
1249        });
1250
1251        // Private channels (no symbols)
1252        subs.insert(Ustr::from(BitmexWsAuthChannel::Order.as_ref()), {
1253            let mut set = AHashSet::new();
1254            set.insert(Ustr::from(""));
1255            set
1256        });
1257        subs.insert(Ustr::from(BitmexWsAuthChannel::Position.as_ref()), {
1258            let mut set = AHashSet::new();
1259            set.insert(Ustr::from(""));
1260            set
1261        });
1262
1263        // Test the actual reconnection topic building logic
1264        let mut topics_to_restore = Vec::new();
1265        for entry in subs.iter() {
1266            let (channel, symbols) = entry.pair();
1267            for symbol in symbols.iter() {
1268                if symbol.is_empty() {
1269                    topics_to_restore.push(channel.to_string());
1270                } else {
1271                    topics_to_restore.push(format!("{channel}:{symbol}"));
1272                }
1273            }
1274        }
1275
1276        // Verify it builds the correct restoration topics
1277        assert!(topics_to_restore.contains(&format!("{}:XBTUSD", BitmexWsTopic::Trade.as_ref())));
1278        assert!(topics_to_restore.contains(&format!("{}:ETHUSD", BitmexWsTopic::Trade.as_ref())));
1279        assert!(
1280            topics_to_restore.contains(&format!("{}:XBTUSD", BitmexWsTopic::OrderBookL2.as_ref()))
1281        );
1282        assert!(topics_to_restore.contains(&BitmexWsAuthChannel::Order.as_ref().to_string()));
1283        assert!(topics_to_restore.contains(&BitmexWsAuthChannel::Position.as_ref().to_string()));
1284        assert_eq!(topics_to_restore.len(), 5);
1285    }
1286
1287    #[rstest]
1288    fn test_reconnect_auth_message_building() {
1289        // Test with credentials
1290        let client_with_creds = BitmexWebSocketClient::new(
1291            Some("ws://test.com".to_string()),
1292            Some("test_key".to_string()),
1293            Some("test_secret".to_string()),
1294            Some(AccountId::new("BITMEX-TEST")),
1295            None,
1296        )
1297        .unwrap();
1298
1299        // Test the actual auth message building logic from lines 220-228
1300        if let Some(cred) = &client_with_creds.credential {
1301            let expires = (chrono::Utc::now() + chrono::Duration::seconds(30)).timestamp();
1302            let signature = cred.sign("GET", "/realtime", expires, "");
1303
1304            let auth_message = BitmexAuthentication {
1305                op: BitmexWsAuthAction::AuthKeyExpires,
1306                args: (cred.api_key.to_string(), expires, signature),
1307            };
1308
1309            // Verify auth message structure
1310            assert_eq!(auth_message.op, BitmexWsAuthAction::AuthKeyExpires);
1311            assert_eq!(auth_message.args.0, "test_key");
1312            assert!(auth_message.args.1 > 0); // expires should be positive
1313            assert!(!auth_message.args.2.is_empty()); // signature should exist
1314        } else {
1315            panic!("Client should have credentials");
1316        }
1317
1318        // Test without credentials
1319        let client_no_creds = BitmexWebSocketClient::new(
1320            Some("ws://test.com".to_string()),
1321            None,
1322            None,
1323            Some(AccountId::new("BITMEX-TEST")),
1324            None,
1325        )
1326        .unwrap();
1327
1328        assert!(client_no_creds.credential.is_none());
1329    }
1330
1331    #[rstest]
1332    fn test_subscription_state_after_unsubscribe() {
1333        let client = BitmexWebSocketClient::new(
1334            Some("ws://test.com".to_string()),
1335            Some("test_key".to_string()),
1336            Some("test_secret".to_string()),
1337            Some(AccountId::new("BITMEX-TEST")),
1338            None,
1339        )
1340        .unwrap();
1341
1342        // Set up initial subscriptions
1343        let subs = client.subscriptions.confirmed();
1344        subs.insert(Ustr::from(BitmexWsTopic::Trade.as_ref()), {
1345            let mut set = AHashSet::new();
1346            set.insert(Ustr::from("XBTUSD"));
1347            set.insert(Ustr::from("ETHUSD"));
1348            set
1349        });
1350
1351        subs.insert(Ustr::from(BitmexWsTopic::OrderBookL2.as_ref()), {
1352            let mut set = AHashSet::new();
1353            set.insert(Ustr::from("XBTUSD"));
1354            set
1355        });
1356
1357        // Simulate unsubscribe logic (like from unsubscribe() method lines 586-599)
1358        let topic = format!("{}:ETHUSD", BitmexWsTopic::Trade.as_ref());
1359        if let Some((channel, symbol)) = topic.split_once(':')
1360            && let Some(mut entry) = subs.get_mut(&Ustr::from(channel))
1361        {
1362            entry.remove(&Ustr::from(symbol));
1363            if entry.is_empty() {
1364                drop(entry);
1365                subs.remove(&Ustr::from(channel));
1366            }
1367        }
1368
1369        // Build restoration topics after unsubscribe
1370        let mut topics_to_restore = Vec::new();
1371        for entry in subs.iter() {
1372            let (channel, symbols) = entry.pair();
1373            for symbol in symbols.iter() {
1374                if symbol.is_empty() {
1375                    topics_to_restore.push(channel.to_string());
1376                } else {
1377                    topics_to_restore.push(format!("{channel}:{symbol}"));
1378                }
1379            }
1380        }
1381
1382        // Should have XBTUSD trade but not ETHUSD trade
1383        let trade_xbt = format!("{}:XBTUSD", BitmexWsTopic::Trade.as_ref());
1384        let trade_eth = format!("{}:ETHUSD", BitmexWsTopic::Trade.as_ref());
1385        let book_xbt = format!("{}:XBTUSD", BitmexWsTopic::OrderBookL2.as_ref());
1386
1387        assert!(topics_to_restore.contains(&trade_xbt));
1388        assert!(!topics_to_restore.contains(&trade_eth));
1389        assert!(topics_to_restore.contains(&book_xbt));
1390        assert_eq!(topics_to_restore.len(), 2);
1391    }
1392
1393    #[rstest]
1394    fn test_race_unsubscribe_failure_recovery() {
1395        // Simulates the race condition where venue rejects an unsubscribe request.
1396        // The adapter must perform the 3-step recovery:
1397        // 1. confirm_unsubscribe() - clear pending_unsubscribe
1398        // 2. mark_subscribe() - mark as subscribing again
1399        // 3. confirm_subscribe() - restore to confirmed state
1400        let client = BitmexWebSocketClient::new(
1401            Some("ws://test.com".to_string()),
1402            None,
1403            None,
1404            Some(AccountId::new("BITMEX-TEST")),
1405            None,
1406        )
1407        .unwrap();
1408
1409        let topic = format!("{}:XBTUSD", BitmexWsTopic::Trade.as_ref());
1410
1411        // Initial subscribe flow
1412        client.subscriptions.mark_subscribe(&topic);
1413        client.subscriptions.confirm_subscribe(&topic);
1414        assert_eq!(client.subscriptions.len(), 1);
1415
1416        // User unsubscribes
1417        client.subscriptions.mark_unsubscribe(&topic);
1418        assert_eq!(client.subscriptions.len(), 0);
1419        assert_eq!(
1420            client.subscriptions.pending_unsubscribe_topics(),
1421            vec![topic.clone()]
1422        );
1423
1424        // Venue REJECTS the unsubscribe (error message)
1425        // Adapter must perform 3-step recovery (from lines 1884-1891)
1426        client.subscriptions.confirm_unsubscribe(&topic); // Step 1: clear pending_unsubscribe
1427        client.subscriptions.mark_subscribe(&topic); // Step 2: mark as subscribing
1428        client.subscriptions.confirm_subscribe(&topic); // Step 3: confirm subscription
1429
1430        // Verify recovery: topic should be back in confirmed state
1431        assert_eq!(client.subscriptions.len(), 1);
1432        assert!(client.subscriptions.pending_unsubscribe_topics().is_empty());
1433        assert!(client.subscriptions.pending_subscribe_topics().is_empty());
1434
1435        // Verify topic is in all_topics() for reconnect
1436        let all = client.subscriptions.all_topics();
1437        assert_eq!(all.len(), 1);
1438        assert!(all.contains(&topic));
1439    }
1440
1441    #[rstest]
1442    fn test_race_resubscribe_before_unsubscribe_ack() {
1443        // Simulates: User unsubscribes, then immediately resubscribes before
1444        // the unsubscribe ACK arrives from the venue.
1445        // This is the race condition fixed in the subscription tracker.
1446        let client = BitmexWebSocketClient::new(
1447            Some("ws://test.com".to_string()),
1448            None,
1449            None,
1450            Some(AccountId::new("BITMEX-TEST")),
1451            None,
1452        )
1453        .unwrap();
1454
1455        let topic = format!("{}:XBTUSD", BitmexWsTopic::OrderBookL2.as_ref());
1456
1457        // Initial subscribe
1458        client.subscriptions.mark_subscribe(&topic);
1459        client.subscriptions.confirm_subscribe(&topic);
1460        assert_eq!(client.subscriptions.len(), 1);
1461
1462        // User unsubscribes
1463        client.subscriptions.mark_unsubscribe(&topic);
1464        assert_eq!(client.subscriptions.len(), 0);
1465        assert_eq!(
1466            client.subscriptions.pending_unsubscribe_topics(),
1467            vec![topic.clone()]
1468        );
1469
1470        // User immediately changes mind and resubscribes (before unsubscribe ACK)
1471        client.subscriptions.mark_subscribe(&topic);
1472        assert_eq!(
1473            client.subscriptions.pending_subscribe_topics(),
1474            vec![topic.clone()]
1475        );
1476
1477        // NOW the unsubscribe ACK arrives - should NOT clear pending_subscribe
1478        client.subscriptions.confirm_unsubscribe(&topic);
1479        assert!(client.subscriptions.pending_unsubscribe_topics().is_empty());
1480        assert_eq!(
1481            client.subscriptions.pending_subscribe_topics(),
1482            vec![topic.clone()]
1483        );
1484
1485        // Subscribe ACK arrives
1486        client.subscriptions.confirm_subscribe(&topic);
1487        assert_eq!(client.subscriptions.len(), 1);
1488        assert!(client.subscriptions.pending_subscribe_topics().is_empty());
1489
1490        // Verify final state is correct
1491        let all = client.subscriptions.all_topics();
1492        assert_eq!(all.len(), 1);
1493        assert!(all.contains(&topic));
1494    }
1495
1496    #[rstest]
1497    fn test_race_channel_level_reconnection_with_pending_states() {
1498        // Simulates reconnection with mixed pending states including channel-level subscriptions.
1499        let client = BitmexWebSocketClient::new(
1500            Some("ws://test.com".to_string()),
1501            Some("test_key".to_string()),
1502            Some("test_secret".to_string()),
1503            Some(AccountId::new("BITMEX-TEST")),
1504            None,
1505        )
1506        .unwrap();
1507
1508        // Set up mixed state before reconnection
1509        // Confirmed: trade:XBTUSD
1510        let trade_xbt = format!("{}:XBTUSD", BitmexWsTopic::Trade.as_ref());
1511        client.subscriptions.mark_subscribe(&trade_xbt);
1512        client.subscriptions.confirm_subscribe(&trade_xbt);
1513
1514        // Confirmed: order (channel-level, no symbol)
1515        let order_channel = BitmexWsAuthChannel::Order.as_ref();
1516        client.subscriptions.mark_subscribe(order_channel);
1517        client.subscriptions.confirm_subscribe(order_channel);
1518
1519        // Pending subscribe: trade:ETHUSD
1520        let trade_eth = format!("{}:ETHUSD", BitmexWsTopic::Trade.as_ref());
1521        client.subscriptions.mark_subscribe(&trade_eth);
1522
1523        // Pending unsubscribe: orderBookL2:XBTUSD (user cancelled)
1524        let book_xbt = format!("{}:XBTUSD", BitmexWsTopic::OrderBookL2.as_ref());
1525        client.subscriptions.mark_subscribe(&book_xbt);
1526        client.subscriptions.confirm_subscribe(&book_xbt);
1527        client.subscriptions.mark_unsubscribe(&book_xbt);
1528
1529        // Get topics for reconnection
1530        let topics_to_restore = client.subscriptions.all_topics();
1531
1532        // Should include: confirmed + pending_subscribe (NOT pending_unsubscribe)
1533        assert_eq!(topics_to_restore.len(), 3);
1534        assert!(topics_to_restore.contains(&trade_xbt));
1535        assert!(topics_to_restore.contains(&order_channel.to_string()));
1536        assert!(topics_to_restore.contains(&trade_eth));
1537        assert!(!topics_to_restore.contains(&book_xbt)); // Excluded
1538
1539        // Verify channel-level marker is handled correctly
1540        // order channel should not have ':' delimiter
1541        for topic in &topics_to_restore {
1542            if topic == order_channel {
1543                assert!(
1544                    !topic.contains(':'),
1545                    "Channel-level topic should not have delimiter"
1546                );
1547            }
1548        }
1549    }
1550}