nautilus_bitmex/websocket/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides the WebSocket client integration for the [BitMEX](https://bitmex.com) WebSocket API.
17//!
18//! This module defines and implements a [`BitmexWebSocketClient`] for
19//! connecting to BitMEX WebSocket streams. It handles authentication (when credentials
20//! are provided), manages subscriptions to market data and account update channels,
21//! and parses incoming messages into structured Nautilus domain objects.
22
23use std::{
24    sync::{
25        Arc,
26        atomic::{AtomicBool, AtomicU8, Ordering},
27    },
28    time::Duration,
29};
30
31use arc_swap::ArcSwap;
32use dashmap::DashMap;
33use futures_util::Stream;
34use nautilus_common::live::runtime::get_runtime;
35use nautilus_core::{
36    consts::NAUTILUS_USER_AGENT,
37    env::{get_env_var, get_or_env_var_opt},
38};
39use nautilus_model::{
40    data::bar::BarType,
41    enums::OrderType,
42    identifiers::{AccountId, ClientOrderId, InstrumentId},
43    instruments::{Instrument, InstrumentAny},
44};
45use nautilus_network::{
46    http::USER_AGENT,
47    mode::ConnectionMode,
48    websocket::{
49        AUTHENTICATION_TIMEOUT_SECS, AuthTracker, PingHandler, SubscriptionState, WebSocketClient,
50        WebSocketConfig, channel_message_handler,
51    },
52};
53use tokio_tungstenite::tungstenite::Message;
54use ustr::Ustr;
55
56use super::{
57    enums::{BitmexWsAuthAction, BitmexWsAuthChannel, BitmexWsOperation, BitmexWsTopic},
58    error::BitmexWsError,
59    handler::{FeedHandler, HandlerCommand},
60    messages::{BitmexAuthentication, BitmexSubscription, NautilusWsMessage},
61    parse::{is_index_symbol, topic_from_bar_spec},
62};
63use crate::common::{
64    consts::{BITMEX_WS_TOPIC_DELIMITER, BITMEX_WS_URL},
65    credential::Credential,
66};
67
68/// Provides a WebSocket client for connecting to the [BitMEX](https://bitmex.com) real-time API.
69///
70/// Key runtime patterns:
71/// - Authentication handshakes are managed by the internal auth tracker, ensuring resubscriptions
72///   occur only after BitMEX acknowledges `authKey` messages.
73/// - The subscription state maintains pending and confirmed topics so reconnection replay is
74///   deterministic and per-topic errors are surfaced.
75#[derive(Clone, Debug)]
76#[cfg_attr(
77    feature = "python",
78    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.adapters")
79)]
80pub struct BitmexWebSocketClient {
81    url: String,
82    credential: Option<Credential>,
83    heartbeat: Option<u64>,
84    account_id: AccountId,
85    auth_tracker: AuthTracker,
86    signal: Arc<AtomicBool>,
87    connection_mode: Arc<ArcSwap<AtomicU8>>,
88    cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<HandlerCommand>>>,
89    out_rx: Option<Arc<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>>>,
90    task_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
91    subscriptions: SubscriptionState,
92    tracked_subscriptions: Arc<DashMap<String, ()>>,
93    instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
94    order_type_cache: Arc<DashMap<ClientOrderId, OrderType>>,
95    order_symbol_cache: Arc<DashMap<ClientOrderId, Ustr>>,
96}
97
98impl BitmexWebSocketClient {
99    /// Creates a new [`BitmexWebSocketClient`] instance.
100    ///
101    /// # Errors
102    ///
103    /// Returns an error if only one of `api_key` or `api_secret` is provided (both or neither required).
104    pub fn new(
105        url: Option<String>,
106        api_key: Option<String>,
107        api_secret: Option<String>,
108        account_id: Option<AccountId>,
109        heartbeat: Option<u64>,
110    ) -> anyhow::Result<Self> {
111        let credential = match (api_key, api_secret) {
112            (Some(key), Some(secret)) => Some(Credential::new(key, secret)),
113            (None, None) => None,
114            _ => anyhow::bail!("Both `api_key` and `api_secret` must be provided together"),
115        };
116
117        let account_id = account_id.unwrap_or(AccountId::from("BITMEX-master"));
118
119        let initial_mode = AtomicU8::new(ConnectionMode::Closed.as_u8());
120        let connection_mode = Arc::new(ArcSwap::from_pointee(initial_mode));
121
122        // We don't have a handler yet; this placeholder keeps cache_instrument() working,
123        // connect() swaps in the real channel and replays any queued instruments so the
124        // handler sees them once it starts.
125        let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
126
127        Ok(Self {
128            url: url.unwrap_or(BITMEX_WS_URL.to_string()),
129            credential,
130            heartbeat,
131            account_id,
132            auth_tracker: AuthTracker::new(),
133            signal: Arc::new(AtomicBool::new(false)),
134            connection_mode,
135            cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
136            out_rx: None,
137            task_handle: None,
138            subscriptions: SubscriptionState::new(BITMEX_WS_TOPIC_DELIMITER),
139            tracked_subscriptions: Arc::new(DashMap::new()),
140            instruments_cache: Arc::new(DashMap::new()),
141            order_type_cache: Arc::new(DashMap::new()),
142            order_symbol_cache: Arc::new(DashMap::new()),
143        })
144    }
145
146    /// Creates a new [`BitmexWebSocketClient`] with environment variable credential resolution.
147    ///
148    /// If `api_key` or `api_secret` are not provided, they will be loaded from
149    /// environment variables based on the `testnet` flag:
150    /// - Testnet: `BITMEX_TESTNET_API_KEY`, `BITMEX_TESTNET_API_SECRET`
151    /// - Mainnet: `BITMEX_API_KEY`, `BITMEX_API_SECRET`
152    ///
153    /// # Errors
154    ///
155    /// Returns an error if only one of `api_key` or `api_secret` is provided.
156    pub fn new_with_env(
157        url: Option<String>,
158        api_key: Option<String>,
159        api_secret: Option<String>,
160        account_id: Option<AccountId>,
161        heartbeat: Option<u64>,
162        testnet: bool,
163    ) -> anyhow::Result<Self> {
164        let (api_key_env, api_secret_env) = if testnet {
165            ("BITMEX_TESTNET_API_KEY", "BITMEX_TESTNET_API_SECRET")
166        } else {
167            ("BITMEX_API_KEY", "BITMEX_API_SECRET")
168        };
169
170        let key = get_or_env_var_opt(api_key, api_key_env);
171        let secret = get_or_env_var_opt(api_secret, api_secret_env);
172
173        Self::new(url, key, secret, account_id, heartbeat)
174    }
175
176    /// Creates a new authenticated [`BitmexWebSocketClient`] using environment variables.
177    ///
178    /// # Errors
179    ///
180    /// Returns an error if environment variables are not set or credentials are invalid.
181    pub fn from_env() -> anyhow::Result<Self> {
182        let url = get_env_var("BITMEX_WS_URL")?;
183        let api_key = get_env_var("BITMEX_API_KEY")?;
184        let api_secret = get_env_var("BITMEX_API_SECRET")?;
185
186        Self::new(Some(url), Some(api_key), Some(api_secret), None, None)
187    }
188
189    /// Returns the websocket url being used by the client.
190    #[must_use]
191    pub const fn url(&self) -> &str {
192        self.url.as_str()
193    }
194
195    /// Returns the public API key being used by the client.
196    #[must_use]
197    pub fn api_key(&self) -> Option<&str> {
198        self.credential.as_ref().map(|c| c.api_key.as_str())
199    }
200
201    /// Returns a masked version of the API key for logging purposes.
202    #[must_use]
203    pub fn api_key_masked(&self) -> Option<String> {
204        self.credential.as_ref().map(|c| c.api_key_masked())
205    }
206
207    /// Returns a value indicating whether the client is active.
208    #[must_use]
209    pub fn is_active(&self) -> bool {
210        let connection_mode_arc = self.connection_mode.load();
211        ConnectionMode::from_atomic(&connection_mode_arc).is_active()
212            && !self.signal.load(Ordering::Relaxed)
213    }
214
215    /// Returns a value indicating whether the client is closed.
216    #[must_use]
217    pub fn is_closed(&self) -> bool {
218        let connection_mode_arc = self.connection_mode.load();
219        ConnectionMode::from_atomic(&connection_mode_arc).is_closed()
220            || self.signal.load(Ordering::Relaxed)
221    }
222
223    /// Sets the account ID.
224    pub fn set_account_id(&mut self, account_id: AccountId) {
225        self.account_id = account_id;
226    }
227
228    /// Caches multiple instruments.
229    ///
230    /// Clears the existing cache first, then adds all provided instruments.
231    pub fn cache_instruments(&mut self, instruments: Vec<InstrumentAny>) {
232        self.instruments_cache.clear();
233        let mut count = 0;
234
235        log::debug!("Initializing BitMEX instrument cache");
236
237        for inst in instruments {
238            let symbol = inst.symbol().inner();
239            self.instruments_cache.insert(symbol, inst.clone());
240            log::debug!("Cached instrument: {symbol}");
241            count += 1;
242        }
243
244        log::info!("BitMEX instrument cache initialized with {count} instruments");
245    }
246
247    /// Caches a single instrument.
248    ///
249    /// Any existing instrument with the same symbol will be replaced.
250    pub fn cache_instrument(&self, instrument: InstrumentAny) {
251        self.instruments_cache
252            .insert(instrument.symbol().inner(), instrument.clone());
253
254        // Before connect() the handler isn't running; this send will fail and that's expected
255        // because connect() replays the instruments via InitializeInstruments
256        if let Ok(cmd_tx) = self.cmd_tx.try_read()
257            && let Err(e) = cmd_tx.send(HandlerCommand::UpdateInstrument(instrument))
258        {
259            log::debug!("Failed to send instrument update to handler: {e}");
260        }
261    }
262
263    /// Connect to the BitMEX WebSocket server.
264    ///
265    /// # Errors
266    ///
267    /// Returns an error if the WebSocket connection fails or authentication fails (if credentials provided).
268    ///
269    /// # Panics
270    ///
271    /// Panics if subscription or authentication messages fail to serialize to JSON.
272    pub async fn connect(&mut self) -> Result<(), BitmexWsError> {
273        let (client, raw_rx) = self.connect_inner().await?;
274
275        // Replace connection state so all clones see the underlying WebSocketClient's state
276        self.connection_mode.store(client.connection_mode_atomic());
277
278        let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<NautilusWsMessage>();
279        self.out_rx = Some(Arc::new(out_rx));
280
281        let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
282        *self.cmd_tx.write().await = cmd_tx.clone();
283
284        // Send WebSocketClient to handler
285        if let Err(e) = cmd_tx.send(HandlerCommand::SetClient(client)) {
286            return Err(BitmexWsError::ClientError(format!(
287                "Failed to send WebSocketClient to handler: {e}"
288            )));
289        }
290
291        // Replay cached instruments to the new handler via the new channel
292        if !self.instruments_cache.is_empty() {
293            let cached_instruments: Vec<InstrumentAny> = self
294                .instruments_cache
295                .iter()
296                .map(|entry| entry.value().clone())
297                .collect();
298            if let Err(e) = cmd_tx.send(HandlerCommand::InitializeInstruments(cached_instruments)) {
299                tracing::error!("Failed to replay instruments to handler: {e}");
300            }
301        }
302
303        let signal = self.signal.clone();
304        let account_id = self.account_id;
305        let credential = self.credential.clone();
306        let auth_tracker = self.auth_tracker.clone();
307        let subscriptions = self.subscriptions.clone();
308        let order_type_cache = self.order_type_cache.clone();
309        let order_symbol_cache = self.order_symbol_cache.clone();
310        let cmd_tx_for_reconnect = cmd_tx.clone();
311
312        let stream_handle = get_runtime().spawn(async move {
313            let mut handler = FeedHandler::new(
314                signal.clone(),
315                cmd_rx,
316                raw_rx,
317                out_tx,
318                account_id,
319                auth_tracker.clone(),
320                subscriptions.clone(),
321                order_type_cache,
322                order_symbol_cache,
323            );
324
325            // Helper closure to resubscribe all tracked subscriptions after reconnection
326            let resubscribe_all = || {
327                // Use SubscriptionState as source of truth for what to restore
328                let topics = subscriptions.all_topics();
329
330                if topics.is_empty() {
331                    return;
332                }
333
334                tracing::debug!(count = topics.len(), "Resubscribing to confirmed subscriptions");
335
336                for topic in &topics {
337                    subscriptions.mark_subscribe(topic.as_str());
338                }
339
340                // Serialize subscription messages
341                let mut payloads = Vec::with_capacity(topics.len());
342                for topic in &topics {
343                    let message = BitmexSubscription {
344                        op: BitmexWsOperation::Subscribe,
345                        args: vec![Ustr::from(topic.as_ref())],
346                    };
347                    if let Ok(payload) = serde_json::to_string(&message) {
348                        payloads.push(payload);
349                    }
350                }
351
352                if let Err(e) = cmd_tx_for_reconnect.send(HandlerCommand::Subscribe { topics: payloads }) {
353                    tracing::error!(error = %e, "Failed to send resubscribe command");
354                }
355            };
356
357            // Run message processing with reconnection handling
358            loop {
359                match handler.next().await {
360                    Some(NautilusWsMessage::Reconnected) => {
361                        if signal.load(Ordering::Relaxed) {
362                            continue;
363                        }
364
365                        log::info!("WebSocket reconnected");
366
367                        // Mark all confirmed subscriptions as failed so they transition to pending state
368                        let confirmed_topics: Vec<String> = {
369                            let confirmed = subscriptions.confirmed();
370                            let mut topics = Vec::new();
371
372                            for entry in confirmed.iter() {
373                                let (channel, symbols) = entry.pair();
374
375                                if *channel == BitmexWsTopic::Instrument.as_ref() {
376                                    continue;
377                                }
378
379                                for symbol in symbols {
380                                    if symbol.is_empty() {
381                                        topics.push(channel.to_string());
382                                    } else {
383                                        topics.push(format!("{channel}:{symbol}"));
384                                    }
385                                }
386                            }
387
388                            topics
389                        };
390
391                        if !confirmed_topics.is_empty() {
392                            tracing::debug!(count = confirmed_topics.len(), "Marking confirmed subscriptions as pending for replay");
393                            for topic in confirmed_topics {
394                                subscriptions.mark_failure(&topic);
395                            }
396                        }
397
398                        if let Some(cred) = &credential {
399                            tracing::debug!("Re-authenticating after reconnection");
400
401                            let expires = (chrono::Utc::now() + chrono::Duration::seconds(30)).timestamp();
402                            let signature = cred.sign("GET", "/realtime", expires, "");
403
404                            let auth_message = BitmexAuthentication {
405                                op: BitmexWsAuthAction::AuthKeyExpires,
406                                args: (cred.api_key.to_string(), expires, signature),
407                            };
408
409                            if let Ok(payload) = serde_json::to_string(&auth_message) {
410                                if let Err(e) = cmd_tx_for_reconnect.send(HandlerCommand::Authenticate { payload }) {
411                                    tracing::error!(error = %e, "Failed to send reconnection auth command");
412                                }
413                            } else {
414                                tracing::error!("Failed to serialize reconnection auth message");
415                            }
416                        }
417
418                        // Unauthenticated sessions resubscribe immediately after reconnection,
419                        // authenticated sessions wait for Authenticated message
420                        if credential.is_none() {
421                            tracing::debug!("No authentication required, resubscribing immediately");
422                            resubscribe_all();
423                        }
424
425                        // TODO: Implement proper Reconnected event forwarding to consumers.
426                        // Currently intercepted for internal housekeeping only. Will add new
427                        // message type from WebSocketClient to notify consumers of reconnections.
428
429                        continue;
430                    }
431                    Some(NautilusWsMessage::Authenticated) => {
432                        tracing::debug!("Authenticated after reconnection, resubscribing");
433                        resubscribe_all();
434                        continue;
435                    }
436                    Some(msg) => {
437                        if handler.send(msg).is_err() {
438                            tracing::error!("Failed to send message (receiver dropped)");
439                            break;
440                        }
441                    }
442                    None => {
443                        // Stream ended - check if it's a stop signal
444                        if handler.is_stopped() {
445                            tracing::debug!("Stop signal received, ending message processing");
446                            break;
447                        }
448                        // Otherwise it's an unexpected stream end
449                        tracing::warn!("WebSocket stream ended unexpectedly");
450                        break;
451                    }
452                }
453            }
454
455            tracing::debug!("Handler task exiting");
456        });
457
458        self.task_handle = Some(Arc::new(stream_handle));
459
460        if self.credential.is_some()
461            && let Err(e) = self.authenticate().await
462        {
463            return Err(e);
464        }
465
466        // Subscribe to instrument topic
467        let instrument_topic = BitmexWsTopic::Instrument.as_ref().to_string();
468        self.subscriptions.mark_subscribe(&instrument_topic);
469        self.tracked_subscriptions.insert(instrument_topic, ());
470
471        let subscribe_msg = BitmexSubscription {
472            op: BitmexWsOperation::Subscribe,
473            args: vec![Ustr::from(BitmexWsTopic::Instrument.as_ref())],
474        };
475
476        match serde_json::to_string(&subscribe_msg) {
477            Ok(subscribe_json) => {
478                if let Err(e) = self.cmd_tx.read().await.send(HandlerCommand::Subscribe {
479                    topics: vec![subscribe_json],
480                }) {
481                    log::error!("Failed to send subscribe command for instruments: {e}");
482                } else {
483                    log::debug!("Subscribed to all instruments");
484                }
485            }
486            Err(e) => {
487                tracing::error!(error = %e, "Failed to serialize subscribe message");
488            }
489        }
490
491        Ok(())
492    }
493
494    /// Connect to the WebSocket and return a message receiver.
495    ///
496    /// # Errors
497    ///
498    /// Returns an error if the WebSocket connection fails or if authentication fails (when credentials are provided).
499    async fn connect_inner(
500        &mut self,
501    ) -> Result<
502        (
503            WebSocketClient,
504            tokio::sync::mpsc::UnboundedReceiver<Message>,
505        ),
506        BitmexWsError,
507    > {
508        let (message_handler, rx) = channel_message_handler();
509
510        // No-op ping handler: handler owns the WebSocketClient and responds to pings directly
511        // in the message loop for minimal latency (see handler.rs pong response)
512        let ping_handler: PingHandler = Arc::new(move |_payload: Vec<u8>| {
513            // Handler responds to pings internally via select! loop
514        });
515
516        let config = WebSocketConfig {
517            url: self.url.clone(),
518            headers: vec![(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())],
519            heartbeat: self.heartbeat,
520            heartbeat_msg: None,
521            message_handler: Some(message_handler),
522            ping_handler: Some(ping_handler),
523            reconnect_timeout_ms: Some(5_000),
524            reconnect_delay_initial_ms: None, // Use default
525            reconnect_delay_max_ms: None,     // Use default
526            reconnect_backoff_factor: None,   // Use default
527            reconnect_jitter_ms: None,        // Use default
528            reconnect_max_attempts: None,
529        };
530
531        let keyed_quotas = vec![];
532        let client = WebSocketClient::connect(
533            config,
534            None, // post_reconnection
535            keyed_quotas,
536            None, // default_quota
537        )
538        .await
539        .map_err(|e| BitmexWsError::ClientError(e.to_string()))?;
540
541        Ok((client, rx))
542    }
543
544    /// Authenticate the WebSocket connection using the provided credentials.
545    ///
546    /// # Errors
547    ///
548    /// Returns an error if the WebSocket is not connected, if authentication fails,
549    /// or if credentials are not available.
550    async fn authenticate(&self) -> Result<(), BitmexWsError> {
551        let credential = match &self.credential {
552            Some(credential) => credential,
553            None => {
554                return Err(BitmexWsError::AuthenticationError(
555                    "API credentials not available to authenticate".to_string(),
556                ));
557            }
558        };
559
560        let receiver = self.auth_tracker.begin();
561
562        let expires = (chrono::Utc::now() + chrono::Duration::seconds(30)).timestamp();
563        let signature = credential.sign("GET", "/realtime", expires, "");
564
565        let auth_message = BitmexAuthentication {
566            op: BitmexWsAuthAction::AuthKeyExpires,
567            args: (credential.api_key.to_string(), expires, signature),
568        };
569
570        let auth_json = serde_json::to_string(&auth_message).map_err(|e| {
571            let msg = format!("Failed to serialize auth message: {e}");
572            self.auth_tracker.fail(msg.clone());
573            BitmexWsError::AuthenticationError(msg)
574        })?;
575
576        // Send Authenticate command to handler
577        self.cmd_tx
578            .read()
579            .await
580            .send(HandlerCommand::Authenticate { payload: auth_json })
581            .map_err(|e| {
582                let msg = format!("Failed to send authenticate command: {e}");
583                self.auth_tracker.fail(msg.clone());
584                BitmexWsError::AuthenticationError(msg)
585            })?;
586
587        self.auth_tracker
588            .wait_for_result::<BitmexWsError>(
589                Duration::from_secs(AUTHENTICATION_TIMEOUT_SECS),
590                receiver,
591            )
592            .await
593    }
594
595    /// Wait until the WebSocket connection is active.
596    ///
597    /// # Errors
598    ///
599    /// Returns an error if the connection times out.
600    pub async fn wait_until_active(&self, timeout_secs: f64) -> Result<(), BitmexWsError> {
601        let timeout = Duration::from_secs_f64(timeout_secs);
602
603        tokio::time::timeout(timeout, async {
604            while !self.is_active() {
605                tokio::time::sleep(Duration::from_millis(10)).await;
606            }
607        })
608        .await
609        .map_err(|_| {
610            BitmexWsError::ClientError(format!(
611                "WebSocket connection timeout after {timeout_secs} seconds"
612            ))
613        })?;
614
615        Ok(())
616    }
617
618    /// Provides the internal stream as a channel-based stream.
619    ///
620    /// # Panics
621    ///
622    /// This function panics:
623    /// - If the websocket is not connected.
624    /// - If `stream` has already been called somewhere else (stream receiver is then taken).
625    pub fn stream(&mut self) -> impl Stream<Item = NautilusWsMessage> + use<> {
626        let rx = self
627            .out_rx
628            .take()
629            .expect("Stream receiver already taken or not connected");
630        let mut rx = Arc::try_unwrap(rx).expect("Cannot take ownership - other references exist");
631        async_stream::stream! {
632            while let Some(msg) = rx.recv().await {
633                yield msg;
634            }
635        }
636    }
637
638    /// Closes the client.
639    ///
640    /// # Errors
641    ///
642    /// Returns an error if the WebSocket is not connected or if closing fails.
643    ///
644    /// # Panics
645    ///
646    /// Panics if the task handle cannot be unwrapped (should never happen in normal usage).
647    pub async fn close(&mut self) -> Result<(), BitmexWsError> {
648        log::debug!("Starting close process");
649
650        self.signal.store(true, Ordering::Relaxed);
651
652        // Send Disconnect command to handler
653        if let Err(e) = self.cmd_tx.read().await.send(HandlerCommand::Disconnect) {
654            log::debug!(
655                "Failed to send disconnect command (handler may already be shut down): {e}"
656            );
657        }
658
659        // Clean up task handle with timeout
660        if let Some(task_handle) = self.task_handle.take() {
661            match Arc::try_unwrap(task_handle) {
662                Ok(handle) => {
663                    log::debug!("Waiting for task handle to complete");
664                    match tokio::time::timeout(Duration::from_secs(2), handle).await {
665                        Ok(Ok(())) => log::debug!("Task handle completed successfully"),
666                        Ok(Err(e)) => log::error!("Task handle encountered an error: {e:?}"),
667                        Err(_) => {
668                            log::warn!(
669                                "Timeout waiting for task handle, task may still be running"
670                            );
671                            // The task will be dropped and should clean up automatically
672                        }
673                    }
674                }
675                Err(arc_handle) => {
676                    log::debug!(
677                        "Cannot take ownership of task handle - other references exist, aborting task"
678                    );
679                    arc_handle.abort();
680                }
681            }
682        } else {
683            log::debug!("No task handle to await");
684        }
685
686        log::debug!("Closed");
687
688        Ok(())
689    }
690
691    /// Subscribe to the specified topics.
692    ///
693    /// # Errors
694    ///
695    /// Returns an error if the WebSocket is not connected or if sending the subscription message fails.
696    ///
697    /// # Panics
698    ///
699    /// Panics if serialization of WebSocket messages fails (should never happen).
700    pub async fn subscribe(&self, topics: Vec<String>) -> Result<(), BitmexWsError> {
701        log::debug!("Subscribing to topics: {topics:?}");
702
703        for topic in &topics {
704            self.subscriptions.mark_subscribe(topic.as_str());
705            self.tracked_subscriptions.insert(topic.clone(), ());
706        }
707
708        // Serialize subscription messages
709        let mut payloads = Vec::with_capacity(topics.len());
710        for topic in &topics {
711            let message = BitmexSubscription {
712                op: BitmexWsOperation::Subscribe,
713                args: vec![Ustr::from(topic.as_ref())],
714            };
715            let payload = serde_json::to_string(&message).map_err(|e| {
716                BitmexWsError::SubscriptionError(format!("Failed to serialize subscription: {e}"))
717            })?;
718            payloads.push(payload);
719        }
720
721        // Send Subscribe command to handler
722        let cmd = HandlerCommand::Subscribe { topics: payloads };
723
724        self.send_cmd(cmd).await.map_err(|e| {
725            BitmexWsError::SubscriptionError(format!("Failed to send subscribe command: {e}"))
726        })
727    }
728
729    /// Unsubscribe from the specified topics.
730    ///
731    /// # Errors
732    ///
733    /// Returns an error if the WebSocket is not connected or if sending the unsubscription message fails.
734    async fn unsubscribe(&self, topics: Vec<String>) -> Result<(), BitmexWsError> {
735        log::debug!("Attempting to unsubscribe from topics: {topics:?}");
736
737        if self.signal.load(Ordering::Relaxed) {
738            log::debug!("Shutdown signal detected, skipping unsubscribe");
739            return Ok(());
740        }
741
742        for topic in &topics {
743            self.subscriptions.mark_unsubscribe(topic.as_str());
744            self.tracked_subscriptions.remove(topic);
745        }
746
747        // Serialize unsubscription messages
748        let mut payloads = Vec::with_capacity(topics.len());
749        for topic in &topics {
750            let message = BitmexSubscription {
751                op: BitmexWsOperation::Unsubscribe,
752                args: vec![Ustr::from(topic.as_ref())],
753            };
754            if let Ok(payload) = serde_json::to_string(&message) {
755                payloads.push(payload);
756            }
757        }
758
759        // Send Unsubscribe command to handler
760        let cmd = HandlerCommand::Unsubscribe { topics: payloads };
761
762        if let Err(e) = self.send_cmd(cmd).await {
763            tracing::debug!(error = %e, "Failed to send unsubscribe command");
764        }
765
766        Ok(())
767    }
768
769    /// Get the current number of active subscriptions.
770    #[must_use]
771    pub fn subscription_count(&self) -> usize {
772        self.subscriptions.len()
773    }
774
775    pub fn get_subscriptions(&self, instrument_id: InstrumentId) -> Vec<String> {
776        let symbol = instrument_id.symbol.inner();
777        let confirmed = self.subscriptions.confirmed();
778        let mut channels = Vec::with_capacity(confirmed.len());
779
780        for entry in confirmed.iter() {
781            let (channel, symbols) = entry.pair();
782            if symbols.contains(&symbol) {
783                // Return the full topic string (e.g., "orderBookL2:XBTUSD")
784                channels.push(format!("{channel}:{symbol}"));
785            } else {
786                let has_channel_marker = symbols.iter().any(|s| s.is_empty());
787                if has_channel_marker
788                    && (*channel == BitmexWsAuthChannel::Execution.as_ref()
789                        || *channel == BitmexWsAuthChannel::Order.as_ref())
790                {
791                    // These are account-level subscriptions without symbols
792                    channels.push(channel.to_string());
793                }
794            }
795        }
796
797        channels
798    }
799
800    /// Subscribe to instrument updates for all instruments on the venue.
801    ///
802    /// # Errors
803    ///
804    /// Returns an error if the WebSocket is not connected or if the subscription fails.
805    pub async fn subscribe_instruments(&self) -> Result<(), BitmexWsError> {
806        // Already subscribed automatically on connection
807        log::debug!("Already subscribed to all instruments on connection, skipping");
808        Ok(())
809    }
810
811    /// Subscribe to instrument updates (mark/index prices) for the specified instrument.
812    ///
813    /// # Errors
814    ///
815    /// Returns an error if the WebSocket is not connected or if the subscription fails.
816    pub async fn subscribe_instrument(
817        &self,
818        instrument_id: InstrumentId,
819    ) -> Result<(), BitmexWsError> {
820        // Already subscribed to all instruments on connection
821        log::debug!(
822            "Already subscribed to all instruments on connection (includes {instrument_id}), skipping"
823        );
824        Ok(())
825    }
826
827    /// Subscribe to order book updates for the specified instrument.
828    ///
829    /// # Errors
830    ///
831    /// Returns an error if the WebSocket is not connected or if the subscription fails.
832    pub async fn subscribe_book(&self, instrument_id: InstrumentId) -> Result<(), BitmexWsError> {
833        let topic = BitmexWsTopic::OrderBookL2;
834        let symbol = instrument_id.symbol.inner();
835        self.subscribe(vec![format!("{topic}:{symbol}")]).await
836    }
837
838    /// Subscribe to order book L2 (25 levels) updates for the specified instrument.
839    ///
840    /// # Errors
841    ///
842    /// Returns an error if the WebSocket is not connected or if the subscription fails.
843    pub async fn subscribe_book_25(
844        &self,
845        instrument_id: InstrumentId,
846    ) -> Result<(), BitmexWsError> {
847        let topic = BitmexWsTopic::OrderBookL2_25;
848        let symbol = instrument_id.symbol.inner();
849        self.subscribe(vec![format!("{topic}:{symbol}")]).await
850    }
851
852    /// Subscribe to order book depth 10 updates for the specified instrument.
853    ///
854    /// # Errors
855    ///
856    /// Returns an error if the WebSocket is not connected or if the subscription fails.
857    pub async fn subscribe_book_depth10(
858        &self,
859        instrument_id: InstrumentId,
860    ) -> Result<(), BitmexWsError> {
861        let topic = BitmexWsTopic::OrderBook10;
862        let symbol = instrument_id.symbol.inner();
863        self.subscribe(vec![format!("{topic}:{symbol}")]).await
864    }
865
866    /// Subscribe to quote updates for the specified instrument.
867    ///
868    /// Note: Index symbols (starting with '.') do not have quotes and will be silently ignored.
869    ///
870    /// # Errors
871    ///
872    /// Returns an error if the WebSocket is not connected or if the subscription fails.
873    pub async fn subscribe_quotes(&self, instrument_id: InstrumentId) -> Result<(), BitmexWsError> {
874        let symbol = instrument_id.symbol.inner();
875
876        // Index symbols don't have quotes (bid/ask), only a single price
877        if is_index_symbol(&instrument_id.symbol.inner()) {
878            tracing::warn!("Ignoring quote subscription for index symbol: {symbol}");
879            return Ok(());
880        }
881
882        let topic = BitmexWsTopic::Quote;
883        self.subscribe(vec![format!("{topic}:{symbol}")]).await
884    }
885
886    /// Subscribe to trade updates for the specified instrument.
887    ///
888    /// Note: Index symbols (starting with '.') do not have trades and will be silently ignored.
889    ///
890    /// # Errors
891    ///
892    /// Returns an error if the WebSocket is not connected or if the subscription fails.
893    pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> Result<(), BitmexWsError> {
894        let symbol = instrument_id.symbol.inner();
895
896        // Index symbols don't have trades
897        if is_index_symbol(&symbol) {
898            tracing::warn!("Ignoring trade subscription for index symbol: {symbol}");
899            return Ok(());
900        }
901
902        let topic = BitmexWsTopic::Trade;
903        self.subscribe(vec![format!("{topic}:{symbol}")]).await
904    }
905
906    /// Subscribe to mark price updates for the specified instrument.
907    ///
908    /// # Errors
909    ///
910    /// Returns an error if the WebSocket is not connected or if the subscription fails.
911    pub async fn subscribe_mark_prices(
912        &self,
913        instrument_id: InstrumentId,
914    ) -> Result<(), BitmexWsError> {
915        self.subscribe_instrument(instrument_id).await
916    }
917
918    /// Subscribe to index price updates for the specified instrument.
919    ///
920    /// # Errors
921    ///
922    /// Returns an error if the WebSocket is not connected or if the subscription fails.
923    pub async fn subscribe_index_prices(
924        &self,
925        instrument_id: InstrumentId,
926    ) -> Result<(), BitmexWsError> {
927        self.subscribe_instrument(instrument_id).await
928    }
929
930    /// Subscribe to funding rate updates for the specified instrument.
931    ///
932    /// # Errors
933    ///
934    /// Returns an error if the WebSocket is not connected or if the subscription fails.
935    pub async fn subscribe_funding_rates(
936        &self,
937        instrument_id: InstrumentId,
938    ) -> Result<(), BitmexWsError> {
939        let topic = BitmexWsTopic::Funding;
940        let symbol = instrument_id.symbol.inner();
941        self.subscribe(vec![format!("{topic}:{symbol}")]).await
942    }
943
944    /// Subscribe to bar updates for the specified bar type.
945    ///
946    /// # Errors
947    ///
948    /// Returns an error if the WebSocket is not connected or if the subscription fails.
949    pub async fn subscribe_bars(&self, bar_type: BarType) -> Result<(), BitmexWsError> {
950        let topic = topic_from_bar_spec(bar_type.spec());
951        let symbol = bar_type.instrument_id().symbol.inner();
952        self.subscribe(vec![format!("{topic}:{symbol}")]).await
953    }
954
955    /// Unsubscribe from instrument updates for all instruments on the venue.
956    ///
957    /// # Errors
958    ///
959    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
960    pub async fn unsubscribe_instruments(&self) -> Result<(), BitmexWsError> {
961        // No-op: instruments are required for proper operation
962        log::debug!(
963            "Instruments subscription maintained for proper operation, skipping unsubscribe"
964        );
965        Ok(())
966    }
967
968    /// Unsubscribe from instrument updates (mark/index prices) for the specified instrument.
969    ///
970    /// # Errors
971    ///
972    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
973    pub async fn unsubscribe_instrument(
974        &self,
975        instrument_id: InstrumentId,
976    ) -> Result<(), BitmexWsError> {
977        // No-op: instruments are required for proper operation
978        log::debug!(
979            "Instruments subscription maintained for proper operation (includes {instrument_id}), skipping unsubscribe"
980        );
981        Ok(())
982    }
983
984    /// Unsubscribe from order book updates for the specified instrument.
985    ///
986    /// # Errors
987    ///
988    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
989    pub async fn unsubscribe_book(&self, instrument_id: InstrumentId) -> Result<(), BitmexWsError> {
990        let topic = BitmexWsTopic::OrderBookL2;
991        let symbol = instrument_id.symbol.inner();
992        self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
993    }
994
995    /// Unsubscribe from order book L2 (25 levels) updates for the specified instrument.
996    ///
997    /// # Errors
998    ///
999    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1000    pub async fn unsubscribe_book_25(
1001        &self,
1002        instrument_id: InstrumentId,
1003    ) -> Result<(), BitmexWsError> {
1004        let topic = BitmexWsTopic::OrderBookL2_25;
1005        let symbol = instrument_id.symbol.inner();
1006        self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
1007    }
1008
1009    /// Unsubscribe from order book depth 10 updates for the specified instrument.
1010    ///
1011    /// # Errors
1012    ///
1013    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1014    pub async fn unsubscribe_book_depth10(
1015        &self,
1016        instrument_id: InstrumentId,
1017    ) -> Result<(), BitmexWsError> {
1018        let topic = BitmexWsTopic::OrderBook10;
1019        let symbol = instrument_id.symbol.inner();
1020        self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
1021    }
1022
1023    /// Unsubscribe from quote updates for the specified instrument.
1024    ///
1025    /// # Errors
1026    ///
1027    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1028    pub async fn unsubscribe_quotes(
1029        &self,
1030        instrument_id: InstrumentId,
1031    ) -> Result<(), BitmexWsError> {
1032        let symbol = instrument_id.symbol.inner();
1033
1034        // Index symbols don't have quotes
1035        if is_index_symbol(&symbol) {
1036            return Ok(());
1037        }
1038
1039        let topic = BitmexWsTopic::Quote;
1040        self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
1041    }
1042
1043    /// Unsubscribe from trade updates for the specified instrument.
1044    ///
1045    /// # Errors
1046    ///
1047    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1048    pub async fn unsubscribe_trades(
1049        &self,
1050        instrument_id: InstrumentId,
1051    ) -> Result<(), BitmexWsError> {
1052        let symbol = instrument_id.symbol.inner();
1053
1054        // Index symbols don't have trades
1055        if is_index_symbol(&symbol) {
1056            return Ok(());
1057        }
1058
1059        let topic = BitmexWsTopic::Trade;
1060        self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
1061    }
1062
1063    /// Unsubscribe from mark price updates for the specified instrument.
1064    ///
1065    /// # Errors
1066    ///
1067    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1068    pub async fn unsubscribe_mark_prices(
1069        &self,
1070        instrument_id: InstrumentId,
1071    ) -> Result<(), BitmexWsError> {
1072        // No-op: instrument channel shared with index prices
1073        log::debug!(
1074            "Mark prices for {instrument_id} uses shared instrument channel, skipping unsubscribe"
1075        );
1076        Ok(())
1077    }
1078
1079    /// Unsubscribe from index price updates for the specified instrument.
1080    ///
1081    /// # Errors
1082    ///
1083    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1084    pub async fn unsubscribe_index_prices(
1085        &self,
1086        instrument_id: InstrumentId,
1087    ) -> Result<(), BitmexWsError> {
1088        // No-op: instrument channel shared with mark prices
1089        log::debug!(
1090            "Index prices for {instrument_id} uses shared instrument channel, skipping unsubscribe"
1091        );
1092        Ok(())
1093    }
1094
1095    /// Unsubscribe from funding rate updates for the specified instrument.
1096    ///
1097    /// # Errors
1098    ///
1099    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1100    pub async fn unsubscribe_funding_rates(
1101        &self,
1102        instrument_id: InstrumentId,
1103    ) -> Result<(), BitmexWsError> {
1104        // No-op: unsubscribing during shutdown causes race conditions
1105        log::debug!(
1106            "Funding rates for {instrument_id}, skipping unsubscribe to avoid shutdown race"
1107        );
1108        Ok(())
1109    }
1110
1111    /// Unsubscribe from bar updates for the specified bar type.
1112    ///
1113    /// # Errors
1114    ///
1115    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1116    pub async fn unsubscribe_bars(&self, bar_type: BarType) -> Result<(), BitmexWsError> {
1117        let topic = topic_from_bar_spec(bar_type.spec());
1118        let symbol = bar_type.instrument_id().symbol.inner();
1119        self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
1120    }
1121
1122    /// Subscribe to order updates for the authenticated account.
1123    ///
1124    /// # Errors
1125    ///
1126    /// Returns an error if the WebSocket is not connected, not authenticated, or if the subscription fails.
1127    pub async fn subscribe_orders(&self) -> Result<(), BitmexWsError> {
1128        if self.credential.is_none() {
1129            return Err(BitmexWsError::MissingCredentials);
1130        }
1131        self.subscribe(vec![BitmexWsAuthChannel::Order.to_string()])
1132            .await
1133    }
1134
1135    /// Subscribe to execution updates for the authenticated account.
1136    ///
1137    /// # Errors
1138    ///
1139    /// Returns an error if the WebSocket is not connected, not authenticated, or if the subscription fails.
1140    pub async fn subscribe_executions(&self) -> Result<(), BitmexWsError> {
1141        if self.credential.is_none() {
1142            return Err(BitmexWsError::MissingCredentials);
1143        }
1144        self.subscribe(vec![BitmexWsAuthChannel::Execution.to_string()])
1145            .await
1146    }
1147
1148    /// Subscribe to position updates for the authenticated account.
1149    ///
1150    /// # Errors
1151    ///
1152    /// Returns an error if the WebSocket is not connected, not authenticated, or if the subscription fails.
1153    pub async fn subscribe_positions(&self) -> Result<(), BitmexWsError> {
1154        if self.credential.is_none() {
1155            return Err(BitmexWsError::MissingCredentials);
1156        }
1157        self.subscribe(vec![BitmexWsAuthChannel::Position.to_string()])
1158            .await
1159    }
1160
1161    /// Subscribe to margin updates for the authenticated account.
1162    ///
1163    /// # Errors
1164    ///
1165    /// Returns an error if the WebSocket is not connected, not authenticated, or if the subscription fails.
1166    pub async fn subscribe_margin(&self) -> Result<(), BitmexWsError> {
1167        if self.credential.is_none() {
1168            return Err(BitmexWsError::MissingCredentials);
1169        }
1170        self.subscribe(vec![BitmexWsAuthChannel::Margin.to_string()])
1171            .await
1172    }
1173
1174    /// Subscribe to wallet updates for the authenticated account.
1175    ///
1176    /// # Errors
1177    ///
1178    /// Returns an error if the WebSocket is not connected, not authenticated, or if the subscription fails.
1179    pub async fn subscribe_wallet(&self) -> Result<(), BitmexWsError> {
1180        if self.credential.is_none() {
1181            return Err(BitmexWsError::MissingCredentials);
1182        }
1183        self.subscribe(vec![BitmexWsAuthChannel::Wallet.to_string()])
1184            .await
1185    }
1186
1187    /// Unsubscribe from order updates for the authenticated account.
1188    ///
1189    /// # Errors
1190    ///
1191    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1192    pub async fn unsubscribe_orders(&self) -> Result<(), BitmexWsError> {
1193        self.unsubscribe(vec![BitmexWsAuthChannel::Order.to_string()])
1194            .await
1195    }
1196
1197    /// Unsubscribe from execution updates for the authenticated account.
1198    ///
1199    /// # Errors
1200    ///
1201    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1202    pub async fn unsubscribe_executions(&self) -> Result<(), BitmexWsError> {
1203        self.unsubscribe(vec![BitmexWsAuthChannel::Execution.to_string()])
1204            .await
1205    }
1206
1207    /// Unsubscribe from position updates for the authenticated account.
1208    ///
1209    /// # Errors
1210    ///
1211    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1212    pub async fn unsubscribe_positions(&self) -> Result<(), BitmexWsError> {
1213        self.unsubscribe(vec![BitmexWsAuthChannel::Position.to_string()])
1214            .await
1215    }
1216
1217    /// Unsubscribe from margin updates for the authenticated account.
1218    ///
1219    /// # Errors
1220    ///
1221    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1222    pub async fn unsubscribe_margin(&self) -> Result<(), BitmexWsError> {
1223        self.unsubscribe(vec![BitmexWsAuthChannel::Margin.to_string()])
1224            .await
1225    }
1226
1227    /// Unsubscribe from wallet updates for the authenticated account.
1228    ///
1229    /// # Errors
1230    ///
1231    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1232    pub async fn unsubscribe_wallet(&self) -> Result<(), BitmexWsError> {
1233        self.unsubscribe(vec![BitmexWsAuthChannel::Wallet.to_string()])
1234            .await
1235    }
1236
1237    /// Sends a command to the handler.
1238    async fn send_cmd(&self, cmd: HandlerCommand) -> Result<(), BitmexWsError> {
1239        self.cmd_tx
1240            .read()
1241            .await
1242            .send(cmd)
1243            .map_err(|e| BitmexWsError::ClientError(format!("Handler not available: {e}")))
1244    }
1245}
1246
1247#[cfg(test)]
1248mod tests {
1249    use ahash::AHashSet;
1250    use rstest::rstest;
1251    use ustr::Ustr;
1252
1253    use super::*;
1254
1255    #[rstest]
1256    fn test_reconnect_topics_restoration_logic() {
1257        // Create real client with credentials
1258        let client = BitmexWebSocketClient::new(
1259            Some("ws://test.com".to_string()),
1260            Some("test_key".to_string()),
1261            Some("test_secret".to_string()),
1262            Some(AccountId::new("BITMEX-TEST")),
1263            None,
1264        )
1265        .unwrap();
1266
1267        // Populate subscriptions like they would be during normal operation
1268        let subs = client.subscriptions.confirmed();
1269        subs.insert(Ustr::from(BitmexWsTopic::Trade.as_ref()), {
1270            let mut set = AHashSet::new();
1271            set.insert(Ustr::from("XBTUSD"));
1272            set.insert(Ustr::from("ETHUSD"));
1273            set
1274        });
1275
1276        subs.insert(Ustr::from(BitmexWsTopic::OrderBookL2.as_ref()), {
1277            let mut set = AHashSet::new();
1278            set.insert(Ustr::from("XBTUSD"));
1279            set
1280        });
1281
1282        // Private channels (no symbols)
1283        subs.insert(Ustr::from(BitmexWsAuthChannel::Order.as_ref()), {
1284            let mut set = AHashSet::new();
1285            set.insert(Ustr::from(""));
1286            set
1287        });
1288        subs.insert(Ustr::from(BitmexWsAuthChannel::Position.as_ref()), {
1289            let mut set = AHashSet::new();
1290            set.insert(Ustr::from(""));
1291            set
1292        });
1293
1294        // Test the actual reconnection topic building logic
1295        let mut topics_to_restore = Vec::new();
1296        for entry in subs.iter() {
1297            let (channel, symbols) = entry.pair();
1298            for symbol in symbols {
1299                if symbol.is_empty() {
1300                    topics_to_restore.push(channel.to_string());
1301                } else {
1302                    topics_to_restore.push(format!("{channel}:{symbol}"));
1303                }
1304            }
1305        }
1306
1307        // Verify it builds the correct restoration topics
1308        assert!(topics_to_restore.contains(&format!("{}:XBTUSD", BitmexWsTopic::Trade.as_ref())));
1309        assert!(topics_to_restore.contains(&format!("{}:ETHUSD", BitmexWsTopic::Trade.as_ref())));
1310        assert!(
1311            topics_to_restore.contains(&format!("{}:XBTUSD", BitmexWsTopic::OrderBookL2.as_ref()))
1312        );
1313        assert!(topics_to_restore.contains(&BitmexWsAuthChannel::Order.as_ref().to_string()));
1314        assert!(topics_to_restore.contains(&BitmexWsAuthChannel::Position.as_ref().to_string()));
1315        assert_eq!(topics_to_restore.len(), 5);
1316    }
1317
1318    #[rstest]
1319    fn test_reconnect_auth_message_building() {
1320        // Test with credentials
1321        let client_with_creds = BitmexWebSocketClient::new(
1322            Some("ws://test.com".to_string()),
1323            Some("test_key".to_string()),
1324            Some("test_secret".to_string()),
1325            Some(AccountId::new("BITMEX-TEST")),
1326            None,
1327        )
1328        .unwrap();
1329
1330        // Test the actual auth message building logic from lines 220-228
1331        if let Some(cred) = &client_with_creds.credential {
1332            let expires = (chrono::Utc::now() + chrono::Duration::seconds(30)).timestamp();
1333            let signature = cred.sign("GET", "/realtime", expires, "");
1334
1335            let auth_message = BitmexAuthentication {
1336                op: BitmexWsAuthAction::AuthKeyExpires,
1337                args: (cred.api_key.to_string(), expires, signature),
1338            };
1339
1340            // Verify auth message structure
1341            assert_eq!(auth_message.op, BitmexWsAuthAction::AuthKeyExpires);
1342            assert_eq!(auth_message.args.0, "test_key");
1343            assert!(auth_message.args.1 > 0); // expires should be positive
1344            assert!(!auth_message.args.2.is_empty()); // signature should exist
1345        } else {
1346            panic!("Client should have credentials");
1347        }
1348
1349        // Test without credentials
1350        let client_no_creds = BitmexWebSocketClient::new(
1351            Some("ws://test.com".to_string()),
1352            None,
1353            None,
1354            Some(AccountId::new("BITMEX-TEST")),
1355            None,
1356        )
1357        .unwrap();
1358
1359        assert!(client_no_creds.credential.is_none());
1360    }
1361
1362    #[rstest]
1363    fn test_subscription_state_after_unsubscribe() {
1364        let client = BitmexWebSocketClient::new(
1365            Some("ws://test.com".to_string()),
1366            Some("test_key".to_string()),
1367            Some("test_secret".to_string()),
1368            Some(AccountId::new("BITMEX-TEST")),
1369            None,
1370        )
1371        .unwrap();
1372
1373        // Set up initial subscriptions
1374        let subs = client.subscriptions.confirmed();
1375        subs.insert(Ustr::from(BitmexWsTopic::Trade.as_ref()), {
1376            let mut set = AHashSet::new();
1377            set.insert(Ustr::from("XBTUSD"));
1378            set.insert(Ustr::from("ETHUSD"));
1379            set
1380        });
1381
1382        subs.insert(Ustr::from(BitmexWsTopic::OrderBookL2.as_ref()), {
1383            let mut set = AHashSet::new();
1384            set.insert(Ustr::from("XBTUSD"));
1385            set
1386        });
1387
1388        // Simulate unsubscribe logic (like from unsubscribe() method lines 586-599)
1389        let topic = format!("{}:ETHUSD", BitmexWsTopic::Trade.as_ref());
1390        if let Some((channel, symbol)) = topic.split_once(':')
1391            && let Some(mut entry) = subs.get_mut(&Ustr::from(channel))
1392        {
1393            entry.remove(&Ustr::from(symbol));
1394            if entry.is_empty() {
1395                drop(entry);
1396                subs.remove(&Ustr::from(channel));
1397            }
1398        }
1399
1400        // Build restoration topics after unsubscribe
1401        let mut topics_to_restore = Vec::new();
1402        for entry in subs.iter() {
1403            let (channel, symbols) = entry.pair();
1404            for symbol in symbols {
1405                if symbol.is_empty() {
1406                    topics_to_restore.push(channel.to_string());
1407                } else {
1408                    topics_to_restore.push(format!("{channel}:{symbol}"));
1409                }
1410            }
1411        }
1412
1413        // Should have XBTUSD trade but not ETHUSD trade
1414        let trade_xbt = format!("{}:XBTUSD", BitmexWsTopic::Trade.as_ref());
1415        let trade_eth = format!("{}:ETHUSD", BitmexWsTopic::Trade.as_ref());
1416        let book_xbt = format!("{}:XBTUSD", BitmexWsTopic::OrderBookL2.as_ref());
1417
1418        assert!(topics_to_restore.contains(&trade_xbt));
1419        assert!(!topics_to_restore.contains(&trade_eth));
1420        assert!(topics_to_restore.contains(&book_xbt));
1421        assert_eq!(topics_to_restore.len(), 2);
1422    }
1423
1424    #[rstest]
1425    fn test_race_unsubscribe_failure_recovery() {
1426        // Simulates the race condition where venue rejects an unsubscribe request.
1427        // The adapter must perform the 3-step recovery:
1428        // 1. confirm_unsubscribe() - clear pending_unsubscribe
1429        // 2. mark_subscribe() - mark as subscribing again
1430        // 3. confirm_subscribe() - restore to confirmed state
1431        let client = BitmexWebSocketClient::new(
1432            Some("ws://test.com".to_string()),
1433            None,
1434            None,
1435            Some(AccountId::new("BITMEX-TEST")),
1436            None,
1437        )
1438        .unwrap();
1439
1440        let topic = format!("{}:XBTUSD", BitmexWsTopic::Trade.as_ref());
1441
1442        // Initial subscribe flow
1443        client.subscriptions.mark_subscribe(&topic);
1444        client.subscriptions.confirm_subscribe(&topic);
1445        assert_eq!(client.subscriptions.len(), 1);
1446
1447        // User unsubscribes
1448        client.subscriptions.mark_unsubscribe(&topic);
1449        assert_eq!(client.subscriptions.len(), 0);
1450        assert_eq!(
1451            client.subscriptions.pending_unsubscribe_topics(),
1452            vec![topic.clone()]
1453        );
1454
1455        // Venue REJECTS the unsubscribe (error message)
1456        // Adapter must perform 3-step recovery (from lines 1884-1891)
1457        client.subscriptions.confirm_unsubscribe(&topic); // Step 1: clear pending_unsubscribe
1458        client.subscriptions.mark_subscribe(&topic); // Step 2: mark as subscribing
1459        client.subscriptions.confirm_subscribe(&topic); // Step 3: confirm subscription
1460
1461        // Verify recovery: topic should be back in confirmed state
1462        assert_eq!(client.subscriptions.len(), 1);
1463        assert!(client.subscriptions.pending_unsubscribe_topics().is_empty());
1464        assert!(client.subscriptions.pending_subscribe_topics().is_empty());
1465
1466        // Verify topic is in all_topics() for reconnect
1467        let all = client.subscriptions.all_topics();
1468        assert_eq!(all.len(), 1);
1469        assert!(all.contains(&topic));
1470    }
1471
1472    #[rstest]
1473    fn test_race_resubscribe_before_unsubscribe_ack() {
1474        // Simulates: User unsubscribes, then immediately resubscribes before
1475        // the unsubscribe ACK arrives from the venue.
1476        // This is the race condition fixed in the subscription tracker.
1477        let client = BitmexWebSocketClient::new(
1478            Some("ws://test.com".to_string()),
1479            None,
1480            None,
1481            Some(AccountId::new("BITMEX-TEST")),
1482            None,
1483        )
1484        .unwrap();
1485
1486        let topic = format!("{}:XBTUSD", BitmexWsTopic::OrderBookL2.as_ref());
1487
1488        // Initial subscribe
1489        client.subscriptions.mark_subscribe(&topic);
1490        client.subscriptions.confirm_subscribe(&topic);
1491        assert_eq!(client.subscriptions.len(), 1);
1492
1493        // User unsubscribes
1494        client.subscriptions.mark_unsubscribe(&topic);
1495        assert_eq!(client.subscriptions.len(), 0);
1496        assert_eq!(
1497            client.subscriptions.pending_unsubscribe_topics(),
1498            vec![topic.clone()]
1499        );
1500
1501        // User immediately changes mind and resubscribes (before unsubscribe ACK)
1502        client.subscriptions.mark_subscribe(&topic);
1503        assert_eq!(
1504            client.subscriptions.pending_subscribe_topics(),
1505            vec![topic.clone()]
1506        );
1507
1508        // NOW the unsubscribe ACK arrives - should NOT clear pending_subscribe
1509        client.subscriptions.confirm_unsubscribe(&topic);
1510        assert!(client.subscriptions.pending_unsubscribe_topics().is_empty());
1511        assert_eq!(
1512            client.subscriptions.pending_subscribe_topics(),
1513            vec![topic.clone()]
1514        );
1515
1516        // Subscribe ACK arrives
1517        client.subscriptions.confirm_subscribe(&topic);
1518        assert_eq!(client.subscriptions.len(), 1);
1519        assert!(client.subscriptions.pending_subscribe_topics().is_empty());
1520
1521        // Verify final state is correct
1522        let all = client.subscriptions.all_topics();
1523        assert_eq!(all.len(), 1);
1524        assert!(all.contains(&topic));
1525    }
1526
1527    #[rstest]
1528    fn test_race_channel_level_reconnection_with_pending_states() {
1529        // Simulates reconnection with mixed pending states including channel-level subscriptions.
1530        let client = BitmexWebSocketClient::new(
1531            Some("ws://test.com".to_string()),
1532            Some("test_key".to_string()),
1533            Some("test_secret".to_string()),
1534            Some(AccountId::new("BITMEX-TEST")),
1535            None,
1536        )
1537        .unwrap();
1538
1539        // Set up mixed state before reconnection
1540        // Confirmed: trade:XBTUSD
1541        let trade_xbt = format!("{}:XBTUSD", BitmexWsTopic::Trade.as_ref());
1542        client.subscriptions.mark_subscribe(&trade_xbt);
1543        client.subscriptions.confirm_subscribe(&trade_xbt);
1544
1545        // Confirmed: order (channel-level, no symbol)
1546        let order_channel = BitmexWsAuthChannel::Order.as_ref();
1547        client.subscriptions.mark_subscribe(order_channel);
1548        client.subscriptions.confirm_subscribe(order_channel);
1549
1550        // Pending subscribe: trade:ETHUSD
1551        let trade_eth = format!("{}:ETHUSD", BitmexWsTopic::Trade.as_ref());
1552        client.subscriptions.mark_subscribe(&trade_eth);
1553
1554        // Pending unsubscribe: orderBookL2:XBTUSD (user cancelled)
1555        let book_xbt = format!("{}:XBTUSD", BitmexWsTopic::OrderBookL2.as_ref());
1556        client.subscriptions.mark_subscribe(&book_xbt);
1557        client.subscriptions.confirm_subscribe(&book_xbt);
1558        client.subscriptions.mark_unsubscribe(&book_xbt);
1559
1560        // Get topics for reconnection
1561        let topics_to_restore = client.subscriptions.all_topics();
1562
1563        // Should include: confirmed + pending_subscribe (NOT pending_unsubscribe)
1564        assert_eq!(topics_to_restore.len(), 3);
1565        assert!(topics_to_restore.contains(&trade_xbt));
1566        assert!(topics_to_restore.contains(&order_channel.to_string()));
1567        assert!(topics_to_restore.contains(&trade_eth));
1568        assert!(!topics_to_restore.contains(&book_xbt)); // Excluded
1569
1570        // Verify channel-level marker is handled correctly
1571        // order channel should not have ':' delimiter
1572        for topic in &topics_to_restore {
1573            if topic == order_channel {
1574                assert!(
1575                    !topic.contains(':'),
1576                    "Channel-level topic should not have delimiter"
1577                );
1578            }
1579        }
1580    }
1581}