Skip to main content

nautilus_bitmex/websocket/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides the WebSocket client integration for the [BitMEX](https://bitmex.com) WebSocket API.
17//!
18//! This module defines and implements a [`BitmexWebSocketClient`] for
19//! connecting to BitMEX WebSocket streams. It handles authentication (when credentials
20//! are provided), manages subscriptions to market data and account update channels,
21//! and parses incoming messages into structured Nautilus domain objects.
22
23use std::{
24    sync::{
25        Arc,
26        atomic::{AtomicBool, AtomicU8, Ordering},
27    },
28    time::Duration,
29};
30
31use arc_swap::ArcSwap;
32use dashmap::DashMap;
33use futures_util::Stream;
34use nautilus_common::live::get_runtime;
35use nautilus_core::{
36    consts::NAUTILUS_USER_AGENT,
37    env::{get_env_var, get_or_env_var_opt},
38};
39use nautilus_model::{
40    data::bar::BarType,
41    enums::OrderType,
42    identifiers::{AccountId, ClientOrderId, InstrumentId},
43    instruments::{Instrument, InstrumentAny},
44};
45use nautilus_network::{
46    http::USER_AGENT,
47    mode::ConnectionMode,
48    websocket::{
49        AUTHENTICATION_TIMEOUT_SECS, AuthTracker, PingHandler, SubscriptionState, WebSocketClient,
50        WebSocketConfig, channel_message_handler,
51    },
52};
53use tokio_tungstenite::tungstenite::Message;
54use ustr::Ustr;
55
56use super::{
57    enums::{BitmexWsAuthAction, BitmexWsAuthChannel, BitmexWsOperation, BitmexWsTopic},
58    error::BitmexWsError,
59    handler::{FeedHandler, HandlerCommand},
60    messages::{BitmexAuthentication, BitmexSubscription, NautilusWsMessage},
61    parse::{is_index_symbol, topic_from_bar_spec},
62};
63use crate::common::{
64    consts::{BITMEX_WS_TOPIC_DELIMITER, BITMEX_WS_URL},
65    credential::Credential,
66};
67
68/// Provides a WebSocket client for connecting to the [BitMEX](https://bitmex.com) real-time API.
69///
70/// Key runtime patterns:
71/// - Authentication handshakes are managed by the internal auth tracker, ensuring resubscriptions
72///   occur only after BitMEX acknowledges `authKey` messages.
73/// - The subscription state maintains pending and confirmed topics so reconnection replay is
74///   deterministic and per-topic errors are surfaced.
75#[derive(Clone, Debug)]
76#[cfg_attr(
77    feature = "python",
78    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.bitmex", from_py_object)
79)]
80pub struct BitmexWebSocketClient {
81    url: String,
82    credential: Option<Credential>,
83    heartbeat: Option<u64>,
84    account_id: AccountId,
85    auth_tracker: AuthTracker,
86    signal: Arc<AtomicBool>,
87    connection_mode: Arc<ArcSwap<AtomicU8>>,
88    cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<HandlerCommand>>>,
89    out_rx: Option<Arc<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>>>,
90    task_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
91    subscriptions: SubscriptionState,
92    tracked_subscriptions: Arc<DashMap<String, ()>>,
93    instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
94    order_type_cache: Arc<DashMap<ClientOrderId, OrderType>>,
95    order_symbol_cache: Arc<DashMap<ClientOrderId, Ustr>>,
96}
97
98impl BitmexWebSocketClient {
99    /// Creates a new [`BitmexWebSocketClient`] instance.
100    ///
101    /// # Errors
102    ///
103    /// Returns an error if only one of `api_key` or `api_secret` is provided (both or neither required).
104    pub fn new(
105        url: Option<String>,
106        api_key: Option<String>,
107        api_secret: Option<String>,
108        account_id: Option<AccountId>,
109        heartbeat: Option<u64>,
110    ) -> anyhow::Result<Self> {
111        let credential = match (api_key, api_secret) {
112            (Some(key), Some(secret)) => Some(Credential::new(key, secret)),
113            (None, None) => None,
114            _ => anyhow::bail!("Both `api_key` and `api_secret` must be provided together"),
115        };
116
117        let account_id = account_id.unwrap_or(AccountId::from("BITMEX-master"));
118
119        let initial_mode = AtomicU8::new(ConnectionMode::Closed.as_u8());
120        let connection_mode = Arc::new(ArcSwap::from_pointee(initial_mode));
121
122        // We don't have a handler yet; this placeholder keeps cache_instrument() working,
123        // connect() swaps in the real channel and replays any queued instruments so the
124        // handler sees them once it starts.
125        let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
126
127        Ok(Self {
128            url: url.unwrap_or(BITMEX_WS_URL.to_string()),
129            credential,
130            heartbeat,
131            account_id,
132            auth_tracker: AuthTracker::new(),
133            signal: Arc::new(AtomicBool::new(false)),
134            connection_mode,
135            cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
136            out_rx: None,
137            task_handle: None,
138            subscriptions: SubscriptionState::new(BITMEX_WS_TOPIC_DELIMITER),
139            tracked_subscriptions: Arc::new(DashMap::new()),
140            instruments_cache: Arc::new(DashMap::new()),
141            order_type_cache: Arc::new(DashMap::new()),
142            order_symbol_cache: Arc::new(DashMap::new()),
143        })
144    }
145
146    /// Creates a new [`BitmexWebSocketClient`] with environment variable credential resolution.
147    ///
148    /// If `api_key` or `api_secret` are not provided, they will be loaded from
149    /// environment variables based on the `testnet` flag:
150    /// - Testnet: `BITMEX_TESTNET_API_KEY`, `BITMEX_TESTNET_API_SECRET`
151    /// - Mainnet: `BITMEX_API_KEY`, `BITMEX_API_SECRET`
152    ///
153    /// # Errors
154    ///
155    /// Returns an error if only one of `api_key` or `api_secret` is provided.
156    pub fn new_with_env(
157        url: Option<String>,
158        api_key: Option<String>,
159        api_secret: Option<String>,
160        account_id: Option<AccountId>,
161        heartbeat: Option<u64>,
162        testnet: bool,
163    ) -> anyhow::Result<Self> {
164        let (api_key_env, api_secret_env) = if testnet {
165            ("BITMEX_TESTNET_API_KEY", "BITMEX_TESTNET_API_SECRET")
166        } else {
167            ("BITMEX_API_KEY", "BITMEX_API_SECRET")
168        };
169
170        let key = get_or_env_var_opt(api_key, api_key_env);
171        let secret = get_or_env_var_opt(api_secret, api_secret_env);
172
173        Self::new(url, key, secret, account_id, heartbeat)
174    }
175
176    /// Creates a new authenticated [`BitmexWebSocketClient`] using environment variables.
177    ///
178    /// # Errors
179    ///
180    /// Returns an error if environment variables are not set or credentials are invalid.
181    pub fn from_env() -> anyhow::Result<Self> {
182        let url = get_env_var("BITMEX_WS_URL")?;
183        let api_key = get_env_var("BITMEX_API_KEY")?;
184        let api_secret = get_env_var("BITMEX_API_SECRET")?;
185
186        Self::new(Some(url), Some(api_key), Some(api_secret), None, None)
187    }
188
189    /// Returns the websocket url being used by the client.
190    #[must_use]
191    pub const fn url(&self) -> &str {
192        self.url.as_str()
193    }
194
195    /// Returns the public API key being used by the client.
196    #[must_use]
197    pub fn api_key(&self) -> Option<&str> {
198        self.credential.as_ref().map(|c| c.api_key.as_str())
199    }
200
201    /// Returns a masked version of the API key for logging purposes.
202    #[must_use]
203    pub fn api_key_masked(&self) -> Option<String> {
204        self.credential.as_ref().map(|c| c.api_key_masked())
205    }
206
207    /// Returns a value indicating whether the client is active.
208    #[must_use]
209    pub fn is_active(&self) -> bool {
210        let connection_mode_arc = self.connection_mode.load();
211        ConnectionMode::from_atomic(&connection_mode_arc).is_active()
212            && !self.signal.load(Ordering::Relaxed)
213    }
214
215    /// Returns a value indicating whether the client is closed.
216    #[must_use]
217    pub fn is_closed(&self) -> bool {
218        let connection_mode_arc = self.connection_mode.load();
219        ConnectionMode::from_atomic(&connection_mode_arc).is_closed()
220            || self.signal.load(Ordering::Relaxed)
221    }
222
223    /// Sets the account ID.
224    pub fn set_account_id(&mut self, account_id: AccountId) {
225        self.account_id = account_id;
226    }
227
228    /// Caches multiple instruments.
229    ///
230    /// Clears the existing cache first, then adds all provided instruments.
231    pub fn cache_instruments(&mut self, instruments: Vec<InstrumentAny>) {
232        self.instruments_cache.clear();
233        let mut count = 0;
234
235        log::debug!("Initializing BitMEX instrument cache");
236
237        for inst in instruments {
238            let symbol = inst.symbol().inner();
239            self.instruments_cache.insert(symbol, inst.clone());
240            log::debug!("Cached instrument: {symbol}");
241            count += 1;
242        }
243
244        log::info!("BitMEX instrument cache initialized with {count} instruments");
245    }
246
247    /// Caches a single instrument.
248    ///
249    /// Any existing instrument with the same symbol will be replaced.
250    pub fn cache_instrument(&self, instrument: InstrumentAny) {
251        self.instruments_cache
252            .insert(instrument.symbol().inner(), instrument.clone());
253
254        // Before connect() the handler isn't running; this send will fail and that's expected
255        // because connect() replays the instruments via InitializeInstruments
256        if let Ok(cmd_tx) = self.cmd_tx.try_read()
257            && let Err(e) = cmd_tx.send(HandlerCommand::UpdateInstrument(instrument))
258        {
259            log::debug!("Failed to send instrument update to handler: {e}");
260        }
261    }
262
263    /// Connect to the BitMEX WebSocket server.
264    ///
265    /// # Errors
266    ///
267    /// Returns an error if the WebSocket connection fails or authentication fails (if credentials provided).
268    ///
269    /// # Panics
270    ///
271    /// Panics if subscription or authentication messages fail to serialize to JSON.
272    pub async fn connect(&mut self) -> Result<(), BitmexWsError> {
273        let (client, raw_rx) = self.connect_inner().await?;
274
275        // Reset shutdown signal so is_active() works after close+reconnect
276        self.signal.store(false, Ordering::Relaxed);
277
278        // Replace connection state so all clones see the underlying WebSocketClient's state
279        self.connection_mode.store(client.connection_mode_atomic());
280
281        let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<NautilusWsMessage>();
282        self.out_rx = Some(Arc::new(out_rx));
283
284        let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
285        *self.cmd_tx.write().await = cmd_tx.clone();
286
287        // Send WebSocketClient to handler
288        if let Err(e) = cmd_tx.send(HandlerCommand::SetClient(client)) {
289            return Err(BitmexWsError::ClientError(format!(
290                "Failed to send WebSocketClient to handler: {e}"
291            )));
292        }
293
294        // Replay cached instruments to the new handler via the new channel
295        if !self.instruments_cache.is_empty() {
296            let cached_instruments: Vec<InstrumentAny> = self
297                .instruments_cache
298                .iter()
299                .map(|entry| entry.value().clone())
300                .collect();
301            if let Err(e) = cmd_tx.send(HandlerCommand::InitializeInstruments(cached_instruments)) {
302                log::error!("Failed to replay instruments to handler: {e}");
303            }
304        }
305
306        let signal = self.signal.clone();
307        let account_id = self.account_id;
308        let credential = self.credential.clone();
309        let auth_tracker = self.auth_tracker.clone();
310        let subscriptions = self.subscriptions.clone();
311        let order_type_cache = self.order_type_cache.clone();
312        let order_symbol_cache = self.order_symbol_cache.clone();
313        let cmd_tx_for_reconnect = cmd_tx.clone();
314
315        let stream_handle = get_runtime().spawn(async move {
316            let mut handler = FeedHandler::new(
317                signal.clone(),
318                cmd_rx,
319                raw_rx,
320                out_tx,
321                account_id,
322                auth_tracker.clone(),
323                subscriptions.clone(),
324                order_type_cache,
325                order_symbol_cache,
326            );
327
328            // Helper closure to resubscribe all tracked subscriptions after reconnection
329            let resubscribe_all = || {
330                // Use SubscriptionState as source of truth for what to restore
331                let topics = subscriptions.all_topics();
332
333                if topics.is_empty() {
334                    return;
335                }
336
337                log::debug!(
338                    "Resubscribing to confirmed subscriptions: count={}",
339                    topics.len()
340                );
341
342                for topic in &topics {
343                    subscriptions.mark_subscribe(topic.as_str());
344                }
345
346                // Serialize subscription messages
347                let mut payloads = Vec::with_capacity(topics.len());
348                for topic in &topics {
349                    let message = BitmexSubscription {
350                        op: BitmexWsOperation::Subscribe,
351                        args: vec![Ustr::from(topic.as_ref())],
352                    };
353                    if let Ok(payload) = serde_json::to_string(&message) {
354                        payloads.push(payload);
355                    }
356                }
357
358                if let Err(e) =
359                    cmd_tx_for_reconnect.send(HandlerCommand::Subscribe { topics: payloads })
360                {
361                    log::error!("Failed to send resubscribe command: {e}");
362                }
363            };
364
365            // Run message processing with reconnection handling
366            loop {
367                match handler.next().await {
368                    Some(NautilusWsMessage::Reconnected) => {
369                        if signal.load(Ordering::Relaxed) {
370                            continue;
371                        }
372
373                        log::info!("WebSocket reconnected");
374
375                        // Mark all confirmed subscriptions as failed so they transition to pending state
376                        let confirmed_topics: Vec<String> = {
377                            let confirmed = subscriptions.confirmed();
378                            let mut topics = Vec::new();
379
380                            for entry in confirmed.iter() {
381                                let (channel, symbols) = entry.pair();
382
383                                if *channel == BitmexWsTopic::Instrument.as_ref() {
384                                    continue;
385                                }
386
387                                for symbol in symbols {
388                                    if symbol.is_empty() {
389                                        topics.push(channel.to_string());
390                                    } else {
391                                        topics.push(format!("{channel}:{symbol}"));
392                                    }
393                                }
394                            }
395
396                            topics
397                        };
398
399                        if !confirmed_topics.is_empty() {
400                            log::debug!(
401                                "Marking confirmed subscriptions as pending for replay: count={}",
402                                confirmed_topics.len()
403                            );
404                            for topic in confirmed_topics {
405                                subscriptions.mark_failure(&topic);
406                            }
407                        }
408
409                        if let Some(cred) = &credential {
410                            log::debug!("Re-authenticating after reconnection");
411
412                            let expires =
413                                (chrono::Utc::now() + chrono::Duration::seconds(30)).timestamp();
414                            let signature = cred.sign("GET", "/realtime", expires, "");
415
416                            let auth_message = BitmexAuthentication {
417                                op: BitmexWsAuthAction::AuthKeyExpires,
418                                args: (cred.api_key.to_string(), expires, signature),
419                            };
420
421                            if let Ok(payload) = serde_json::to_string(&auth_message) {
422                                if let Err(e) = cmd_tx_for_reconnect
423                                    .send(HandlerCommand::Authenticate { payload })
424                                {
425                                    log::error!("Failed to send reconnection auth command: {e}");
426                                }
427                            } else {
428                                log::error!("Failed to serialize reconnection auth message");
429                            }
430                        }
431
432                        // Unauthenticated sessions resubscribe immediately after reconnection,
433                        // authenticated sessions wait for Authenticated message
434                        if credential.is_none() {
435                            log::debug!("No authentication required, resubscribing immediately");
436                            resubscribe_all();
437                        }
438
439                        if handler.send(NautilusWsMessage::Reconnected).is_err() {
440                            log::error!("Failed to forward reconnect event (receiver dropped)");
441                            break;
442                        }
443
444                        continue;
445                    }
446                    Some(NautilusWsMessage::Authenticated) => {
447                        log::debug!("Authenticated after reconnection, resubscribing");
448                        resubscribe_all();
449                        continue;
450                    }
451                    Some(msg) => {
452                        if handler.send(msg).is_err() {
453                            log::error!("Failed to send message (receiver dropped)");
454                            break;
455                        }
456                    }
457                    None => {
458                        // Stream ended - check if it's a stop signal
459                        if handler.is_stopped() {
460                            log::debug!("Stop signal received, ending message processing");
461                            break;
462                        }
463                        // Otherwise it's an unexpected stream end
464                        log::warn!("WebSocket stream ended unexpectedly");
465                        break;
466                    }
467                }
468            }
469
470            log::debug!("Handler task exiting");
471        });
472
473        self.task_handle = Some(Arc::new(stream_handle));
474
475        if self.credential.is_some()
476            && let Err(e) = self.authenticate().await
477        {
478            if let Some(handle) = self.task_handle.take() {
479                handle.abort();
480            }
481            self.signal.store(true, Ordering::Relaxed);
482            return Err(e);
483        }
484
485        // Subscribe to instrument topic
486        let instrument_topic = BitmexWsTopic::Instrument.as_ref().to_string();
487        self.subscriptions.mark_subscribe(&instrument_topic);
488        self.tracked_subscriptions.insert(instrument_topic, ());
489
490        let subscribe_msg = BitmexSubscription {
491            op: BitmexWsOperation::Subscribe,
492            args: vec![Ustr::from(BitmexWsTopic::Instrument.as_ref())],
493        };
494
495        match serde_json::to_string(&subscribe_msg) {
496            Ok(subscribe_json) => {
497                if let Err(e) = self.cmd_tx.read().await.send(HandlerCommand::Subscribe {
498                    topics: vec![subscribe_json],
499                }) {
500                    log::error!("Failed to send subscribe command for instruments: {e}");
501                } else {
502                    log::debug!("Subscribed to all instruments");
503                }
504            }
505            Err(e) => {
506                log::error!("Failed to serialize subscribe message: {e}");
507            }
508        }
509
510        Ok(())
511    }
512
513    /// Connect to the WebSocket and return a message receiver.
514    ///
515    /// # Errors
516    ///
517    /// Returns an error if the WebSocket connection fails or if authentication fails (when credentials are provided).
518    async fn connect_inner(
519        &mut self,
520    ) -> Result<
521        (
522            WebSocketClient,
523            tokio::sync::mpsc::UnboundedReceiver<Message>,
524        ),
525        BitmexWsError,
526    > {
527        let (message_handler, rx) = channel_message_handler();
528
529        // No-op ping handler: handler owns the WebSocketClient and responds to pings directly
530        // in the message loop for minimal latency (see handler.rs pong response)
531        let ping_handler: PingHandler = Arc::new(move |_payload: Vec<u8>| {
532            // Handler responds to pings internally via select! loop
533        });
534
535        let config = WebSocketConfig {
536            url: self.url.clone(),
537            headers: vec![(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())],
538            heartbeat: self.heartbeat,
539            heartbeat_msg: None,
540            reconnect_timeout_ms: Some(5_000),
541            reconnect_delay_initial_ms: None, // Use default
542            reconnect_delay_max_ms: None,     // Use default
543            reconnect_backoff_factor: None,   // Use default
544            reconnect_jitter_ms: None,        // Use default
545            reconnect_max_attempts: None,
546        };
547
548        let keyed_quotas = vec![];
549        let client = WebSocketClient::connect(
550            config,
551            Some(message_handler),
552            Some(ping_handler),
553            None, // post_reconnection
554            keyed_quotas,
555            None, // default_quota
556        )
557        .await
558        .map_err(|e| BitmexWsError::ClientError(e.to_string()))?;
559
560        Ok((client, rx))
561    }
562
563    /// Authenticate the WebSocket connection using the provided credentials.
564    ///
565    /// # Errors
566    ///
567    /// Returns an error if the WebSocket is not connected, if authentication fails,
568    /// or if credentials are not available.
569    async fn authenticate(&self) -> Result<(), BitmexWsError> {
570        let credential = match &self.credential {
571            Some(credential) => credential,
572            None => {
573                return Err(BitmexWsError::AuthenticationError(
574                    "API credentials not available to authenticate".to_string(),
575                ));
576            }
577        };
578
579        let receiver = self.auth_tracker.begin();
580
581        let expires = (chrono::Utc::now() + chrono::Duration::seconds(30)).timestamp();
582        let signature = credential.sign("GET", "/realtime", expires, "");
583
584        let auth_message = BitmexAuthentication {
585            op: BitmexWsAuthAction::AuthKeyExpires,
586            args: (credential.api_key.to_string(), expires, signature),
587        };
588
589        let auth_json = serde_json::to_string(&auth_message).map_err(|e| {
590            let msg = format!("Failed to serialize auth message: {e}");
591            self.auth_tracker.fail(msg.clone());
592            BitmexWsError::AuthenticationError(msg)
593        })?;
594
595        // Send Authenticate command to handler
596        self.cmd_tx
597            .read()
598            .await
599            .send(HandlerCommand::Authenticate { payload: auth_json })
600            .map_err(|e| {
601                let msg = format!("Failed to send authenticate command: {e}");
602                self.auth_tracker.fail(msg.clone());
603                BitmexWsError::AuthenticationError(msg)
604            })?;
605
606        self.auth_tracker
607            .wait_for_result::<BitmexWsError>(
608                Duration::from_secs(AUTHENTICATION_TIMEOUT_SECS),
609                receiver,
610            )
611            .await
612    }
613
614    /// Wait until the WebSocket connection is active.
615    ///
616    /// # Errors
617    ///
618    /// Returns an error if the connection times out.
619    pub async fn wait_until_active(&self, timeout_secs: f64) -> Result<(), BitmexWsError> {
620        let timeout = Duration::from_secs_f64(timeout_secs);
621
622        tokio::time::timeout(timeout, async {
623            while !self.is_active() {
624                tokio::time::sleep(Duration::from_millis(10)).await;
625            }
626        })
627        .await
628        .map_err(|_| {
629            BitmexWsError::ClientError(format!(
630                "WebSocket connection timeout after {timeout_secs} seconds"
631            ))
632        })?;
633
634        Ok(())
635    }
636
637    /// Provides the internal stream as a channel-based stream.
638    ///
639    /// # Panics
640    ///
641    /// This function panics:
642    /// - If the websocket is not connected.
643    /// - If `stream` has already been called somewhere else (stream receiver is then taken).
644    pub fn stream(&mut self) -> impl Stream<Item = NautilusWsMessage> + use<> {
645        let rx = self
646            .out_rx
647            .take()
648            .expect("Stream receiver already taken or not connected");
649        let mut rx = Arc::try_unwrap(rx).expect("Cannot take ownership - other references exist");
650        async_stream::stream! {
651            while let Some(msg) = rx.recv().await {
652                yield msg;
653            }
654        }
655    }
656
657    /// Closes the client.
658    ///
659    /// # Errors
660    ///
661    /// Returns an error if the WebSocket is not connected or if closing fails.
662    ///
663    /// # Panics
664    ///
665    /// Panics if the task handle cannot be unwrapped (should never happen in normal usage).
666    pub async fn close(&mut self) -> Result<(), BitmexWsError> {
667        log::debug!("Starting close process");
668
669        self.signal.store(true, Ordering::Relaxed);
670
671        // Send Disconnect command to handler
672        if let Err(e) = self.cmd_tx.read().await.send(HandlerCommand::Disconnect) {
673            log::debug!(
674                "Failed to send disconnect command (handler may already be shut down): {e}"
675            );
676        }
677
678        // Clean up task handle with timeout
679        if let Some(task_handle) = self.task_handle.take() {
680            match Arc::try_unwrap(task_handle) {
681                Ok(handle) => {
682                    log::debug!("Waiting for task handle to complete");
683                    match tokio::time::timeout(Duration::from_secs(2), handle).await {
684                        Ok(Ok(())) => log::debug!("Task handle completed successfully"),
685                        Ok(Err(e)) => log::error!("Task handle encountered an error: {e:?}"),
686                        Err(_) => {
687                            log::warn!(
688                                "Timeout waiting for task handle, task may still be running"
689                            );
690                            // The task will be dropped and should clean up automatically
691                        }
692                    }
693                }
694                Err(arc_handle) => {
695                    log::debug!(
696                        "Cannot take ownership of task handle - other references exist, aborting task"
697                    );
698                    arc_handle.abort();
699                }
700            }
701        } else {
702            log::debug!("No task handle to await");
703        }
704
705        log::debug!("Closed");
706
707        Ok(())
708    }
709
710    /// Subscribe to the specified topics.
711    ///
712    /// # Errors
713    ///
714    /// Returns an error if the WebSocket is not connected or if sending the subscription message fails.
715    ///
716    /// # Panics
717    ///
718    /// Panics if serialization of WebSocket messages fails (should never happen).
719    pub async fn subscribe(&self, topics: Vec<String>) -> Result<(), BitmexWsError> {
720        log::debug!("Subscribing to topics: {topics:?}");
721
722        for topic in &topics {
723            self.subscriptions.mark_subscribe(topic.as_str());
724            self.tracked_subscriptions.insert(topic.clone(), ());
725        }
726
727        // Serialize subscription messages
728        let mut payloads = Vec::with_capacity(topics.len());
729        for topic in &topics {
730            let message = BitmexSubscription {
731                op: BitmexWsOperation::Subscribe,
732                args: vec![Ustr::from(topic.as_ref())],
733            };
734            let payload = serde_json::to_string(&message).map_err(|e| {
735                BitmexWsError::SubscriptionError(format!("Failed to serialize subscription: {e}"))
736            })?;
737            payloads.push(payload);
738        }
739
740        // Send Subscribe command to handler
741        let cmd = HandlerCommand::Subscribe { topics: payloads };
742
743        self.send_cmd(cmd).await.map_err(|e| {
744            BitmexWsError::SubscriptionError(format!("Failed to send subscribe command: {e}"))
745        })
746    }
747
748    /// Unsubscribe from the specified topics.
749    ///
750    /// # Errors
751    ///
752    /// Returns an error if the WebSocket is not connected or if sending the unsubscription message fails.
753    async fn unsubscribe(&self, topics: Vec<String>) -> Result<(), BitmexWsError> {
754        log::debug!("Attempting to unsubscribe from topics: {topics:?}");
755
756        if self.signal.load(Ordering::Relaxed) {
757            log::debug!("Shutdown signal detected, skipping unsubscribe");
758            return Ok(());
759        }
760
761        for topic in &topics {
762            self.subscriptions.mark_unsubscribe(topic.as_str());
763            self.tracked_subscriptions.remove(topic);
764        }
765
766        // Serialize unsubscription messages
767        let mut payloads = Vec::with_capacity(topics.len());
768        for topic in &topics {
769            let message = BitmexSubscription {
770                op: BitmexWsOperation::Unsubscribe,
771                args: vec![Ustr::from(topic.as_ref())],
772            };
773            if let Ok(payload) = serde_json::to_string(&message) {
774                payloads.push(payload);
775            }
776        }
777
778        // Send Unsubscribe command to handler
779        let cmd = HandlerCommand::Unsubscribe { topics: payloads };
780
781        if let Err(e) = self.send_cmd(cmd).await {
782            log::debug!("Failed to send unsubscribe command: {e}");
783        }
784
785        Ok(())
786    }
787
788    /// Get the current number of active subscriptions.
789    #[must_use]
790    pub fn subscription_count(&self) -> usize {
791        self.subscriptions.len()
792    }
793
794    pub fn get_subscriptions(&self, instrument_id: InstrumentId) -> Vec<String> {
795        let symbol = instrument_id.symbol.inner();
796        let confirmed = self.subscriptions.confirmed();
797        let mut channels = Vec::with_capacity(confirmed.len());
798
799        for entry in confirmed.iter() {
800            let (channel, symbols) = entry.pair();
801            if symbols.contains(&symbol) {
802                // Return the full topic string (e.g., "orderBookL2:XBTUSD")
803                channels.push(format!("{channel}:{symbol}"));
804            } else {
805                let has_channel_marker = symbols.iter().any(|s| s.is_empty());
806                if has_channel_marker
807                    && (*channel == BitmexWsAuthChannel::Execution.as_ref()
808                        || *channel == BitmexWsAuthChannel::Order.as_ref())
809                {
810                    // These are account-level subscriptions without symbols
811                    channels.push(channel.to_string());
812                }
813            }
814        }
815
816        channels
817    }
818
819    /// Subscribe to instrument updates for all instruments on the venue.
820    ///
821    /// # Errors
822    ///
823    /// Returns an error if the WebSocket is not connected or if the subscription fails.
824    pub async fn subscribe_instruments(&self) -> Result<(), BitmexWsError> {
825        // Already subscribed automatically on connection
826        log::debug!("Already subscribed to all instruments on connection, skipping");
827        Ok(())
828    }
829
830    /// Subscribe to instrument updates (mark/index prices) for the specified instrument.
831    ///
832    /// # Errors
833    ///
834    /// Returns an error if the WebSocket is not connected or if the subscription fails.
835    pub async fn subscribe_instrument(
836        &self,
837        instrument_id: InstrumentId,
838    ) -> Result<(), BitmexWsError> {
839        // Already subscribed to all instruments on connection
840        log::debug!(
841            "Already subscribed to all instruments on connection (includes {instrument_id}), skipping"
842        );
843        Ok(())
844    }
845
846    /// Subscribe to order book updates for the specified instrument.
847    ///
848    /// # Errors
849    ///
850    /// Returns an error if the WebSocket is not connected or if the subscription fails.
851    pub async fn subscribe_book(&self, instrument_id: InstrumentId) -> Result<(), BitmexWsError> {
852        let topic = BitmexWsTopic::OrderBookL2;
853        let symbol = instrument_id.symbol.inner();
854        self.subscribe(vec![format!("{topic}:{symbol}")]).await
855    }
856
857    /// Subscribe to order book L2 (25 levels) updates for the specified instrument.
858    ///
859    /// # Errors
860    ///
861    /// Returns an error if the WebSocket is not connected or if the subscription fails.
862    pub async fn subscribe_book_25(
863        &self,
864        instrument_id: InstrumentId,
865    ) -> Result<(), BitmexWsError> {
866        let topic = BitmexWsTopic::OrderBookL2_25;
867        let symbol = instrument_id.symbol.inner();
868        self.subscribe(vec![format!("{topic}:{symbol}")]).await
869    }
870
871    /// Subscribe to order book depth 10 updates for the specified instrument.
872    ///
873    /// # Errors
874    ///
875    /// Returns an error if the WebSocket is not connected or if the subscription fails.
876    pub async fn subscribe_book_depth10(
877        &self,
878        instrument_id: InstrumentId,
879    ) -> Result<(), BitmexWsError> {
880        let topic = BitmexWsTopic::OrderBook10;
881        let symbol = instrument_id.symbol.inner();
882        self.subscribe(vec![format!("{topic}:{symbol}")]).await
883    }
884
885    /// Subscribe to quote updates for the specified instrument.
886    ///
887    /// Note: Index symbols (starting with '.') do not have quotes and will be silently ignored.
888    ///
889    /// # Errors
890    ///
891    /// Returns an error if the WebSocket is not connected or if the subscription fails.
892    pub async fn subscribe_quotes(&self, instrument_id: InstrumentId) -> Result<(), BitmexWsError> {
893        let symbol = instrument_id.symbol.inner();
894
895        // Index symbols don't have quotes (bid/ask), only a single price
896        if is_index_symbol(&instrument_id.symbol.inner()) {
897            log::warn!("Ignoring quote subscription for index symbol: {symbol}");
898            return Ok(());
899        }
900
901        let topic = BitmexWsTopic::Quote;
902        self.subscribe(vec![format!("{topic}:{symbol}")]).await
903    }
904
905    /// Subscribe to trade updates for the specified instrument.
906    ///
907    /// Note: Index symbols (starting with '.') do not have trades and will be silently ignored.
908    ///
909    /// # Errors
910    ///
911    /// Returns an error if the WebSocket is not connected or if the subscription fails.
912    pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> Result<(), BitmexWsError> {
913        let symbol = instrument_id.symbol.inner();
914
915        // Index symbols don't have trades
916        if is_index_symbol(&symbol) {
917            log::warn!("Ignoring trade subscription for index symbol: {symbol}");
918            return Ok(());
919        }
920
921        let topic = BitmexWsTopic::Trade;
922        self.subscribe(vec![format!("{topic}:{symbol}")]).await
923    }
924
925    /// Subscribe to mark price updates for the specified instrument.
926    ///
927    /// # Errors
928    ///
929    /// Returns an error if the WebSocket is not connected or if the subscription fails.
930    pub async fn subscribe_mark_prices(
931        &self,
932        instrument_id: InstrumentId,
933    ) -> Result<(), BitmexWsError> {
934        self.subscribe_instrument(instrument_id).await
935    }
936
937    /// Subscribe to index price updates for the specified instrument.
938    ///
939    /// # Errors
940    ///
941    /// Returns an error if the WebSocket is not connected or if the subscription fails.
942    pub async fn subscribe_index_prices(
943        &self,
944        instrument_id: InstrumentId,
945    ) -> Result<(), BitmexWsError> {
946        self.subscribe_instrument(instrument_id).await
947    }
948
949    /// Subscribe to funding rate updates for the specified instrument.
950    ///
951    /// # Errors
952    ///
953    /// Returns an error if the WebSocket is not connected or if the subscription fails.
954    pub async fn subscribe_funding_rates(
955        &self,
956        instrument_id: InstrumentId,
957    ) -> Result<(), BitmexWsError> {
958        let topic = BitmexWsTopic::Funding;
959        let symbol = instrument_id.symbol.inner();
960        self.subscribe(vec![format!("{topic}:{symbol}")]).await
961    }
962
963    /// Subscribe to bar updates for the specified bar type.
964    ///
965    /// # Errors
966    ///
967    /// Returns an error if the WebSocket is not connected or if the subscription fails.
968    pub async fn subscribe_bars(&self, bar_type: BarType) -> Result<(), BitmexWsError> {
969        let topic = topic_from_bar_spec(bar_type.spec());
970        let symbol = bar_type.instrument_id().symbol.inner();
971        self.subscribe(vec![format!("{topic}:{symbol}")]).await
972    }
973
974    /// Unsubscribe from instrument updates for all instruments on the venue.
975    ///
976    /// # Errors
977    ///
978    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
979    pub async fn unsubscribe_instruments(&self) -> Result<(), BitmexWsError> {
980        // No-op: instruments are required for proper operation
981        log::debug!(
982            "Instruments subscription maintained for proper operation, skipping unsubscribe"
983        );
984        Ok(())
985    }
986
987    /// Unsubscribe from instrument updates (mark/index prices) for the specified instrument.
988    ///
989    /// # Errors
990    ///
991    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
992    pub async fn unsubscribe_instrument(
993        &self,
994        instrument_id: InstrumentId,
995    ) -> Result<(), BitmexWsError> {
996        // No-op: instruments are required for proper operation
997        log::debug!(
998            "Instruments subscription maintained for proper operation (includes {instrument_id}), skipping unsubscribe"
999        );
1000        Ok(())
1001    }
1002
1003    /// Unsubscribe from order book updates for the specified instrument.
1004    ///
1005    /// # Errors
1006    ///
1007    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1008    pub async fn unsubscribe_book(&self, instrument_id: InstrumentId) -> Result<(), BitmexWsError> {
1009        let topic = BitmexWsTopic::OrderBookL2;
1010        let symbol = instrument_id.symbol.inner();
1011        self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
1012    }
1013
1014    /// Unsubscribe from order book L2 (25 levels) updates for the specified instrument.
1015    ///
1016    /// # Errors
1017    ///
1018    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1019    pub async fn unsubscribe_book_25(
1020        &self,
1021        instrument_id: InstrumentId,
1022    ) -> Result<(), BitmexWsError> {
1023        let topic = BitmexWsTopic::OrderBookL2_25;
1024        let symbol = instrument_id.symbol.inner();
1025        self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
1026    }
1027
1028    /// Unsubscribe from order book depth 10 updates for the specified instrument.
1029    ///
1030    /// # Errors
1031    ///
1032    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1033    pub async fn unsubscribe_book_depth10(
1034        &self,
1035        instrument_id: InstrumentId,
1036    ) -> Result<(), BitmexWsError> {
1037        let topic = BitmexWsTopic::OrderBook10;
1038        let symbol = instrument_id.symbol.inner();
1039        self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
1040    }
1041
1042    /// Unsubscribe from quote updates for the specified instrument.
1043    ///
1044    /// # Errors
1045    ///
1046    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1047    pub async fn unsubscribe_quotes(
1048        &self,
1049        instrument_id: InstrumentId,
1050    ) -> Result<(), BitmexWsError> {
1051        let symbol = instrument_id.symbol.inner();
1052
1053        // Index symbols don't have quotes
1054        if is_index_symbol(&symbol) {
1055            return Ok(());
1056        }
1057
1058        let topic = BitmexWsTopic::Quote;
1059        self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
1060    }
1061
1062    /// Unsubscribe from trade updates for the specified instrument.
1063    ///
1064    /// # Errors
1065    ///
1066    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1067    pub async fn unsubscribe_trades(
1068        &self,
1069        instrument_id: InstrumentId,
1070    ) -> Result<(), BitmexWsError> {
1071        let symbol = instrument_id.symbol.inner();
1072
1073        // Index symbols don't have trades
1074        if is_index_symbol(&symbol) {
1075            return Ok(());
1076        }
1077
1078        let topic = BitmexWsTopic::Trade;
1079        self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
1080    }
1081
1082    /// Unsubscribe from mark price updates for the specified instrument.
1083    ///
1084    /// # Errors
1085    ///
1086    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1087    pub async fn unsubscribe_mark_prices(
1088        &self,
1089        instrument_id: InstrumentId,
1090    ) -> Result<(), BitmexWsError> {
1091        // No-op: instrument channel shared with index prices
1092        log::debug!(
1093            "Mark prices for {instrument_id} uses shared instrument channel, skipping unsubscribe"
1094        );
1095        Ok(())
1096    }
1097
1098    /// Unsubscribe from index price updates for the specified instrument.
1099    ///
1100    /// # Errors
1101    ///
1102    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1103    pub async fn unsubscribe_index_prices(
1104        &self,
1105        instrument_id: InstrumentId,
1106    ) -> Result<(), BitmexWsError> {
1107        // No-op: instrument channel shared with mark prices
1108        log::debug!(
1109            "Index prices for {instrument_id} uses shared instrument channel, skipping unsubscribe"
1110        );
1111        Ok(())
1112    }
1113
1114    /// Unsubscribe from funding rate updates for the specified instrument.
1115    ///
1116    /// # Errors
1117    ///
1118    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1119    pub async fn unsubscribe_funding_rates(
1120        &self,
1121        instrument_id: InstrumentId,
1122    ) -> Result<(), BitmexWsError> {
1123        // No-op: unsubscribing during shutdown causes race conditions
1124        log::debug!(
1125            "Funding rates for {instrument_id}, skipping unsubscribe to avoid shutdown race"
1126        );
1127        Ok(())
1128    }
1129
1130    /// Unsubscribe from bar updates for the specified bar type.
1131    ///
1132    /// # Errors
1133    ///
1134    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1135    pub async fn unsubscribe_bars(&self, bar_type: BarType) -> Result<(), BitmexWsError> {
1136        let topic = topic_from_bar_spec(bar_type.spec());
1137        let symbol = bar_type.instrument_id().symbol.inner();
1138        self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
1139    }
1140
1141    /// Subscribe to order updates for the authenticated account.
1142    ///
1143    /// # Errors
1144    ///
1145    /// Returns an error if the WebSocket is not connected, not authenticated, or if the subscription fails.
1146    pub async fn subscribe_orders(&self) -> Result<(), BitmexWsError> {
1147        if self.credential.is_none() {
1148            return Err(BitmexWsError::MissingCredentials);
1149        }
1150        self.subscribe(vec![BitmexWsAuthChannel::Order.to_string()])
1151            .await
1152    }
1153
1154    /// Subscribe to execution updates for the authenticated account.
1155    ///
1156    /// # Errors
1157    ///
1158    /// Returns an error if the WebSocket is not connected, not authenticated, or if the subscription fails.
1159    pub async fn subscribe_executions(&self) -> Result<(), BitmexWsError> {
1160        if self.credential.is_none() {
1161            return Err(BitmexWsError::MissingCredentials);
1162        }
1163        self.subscribe(vec![BitmexWsAuthChannel::Execution.to_string()])
1164            .await
1165    }
1166
1167    /// Subscribe to position updates for the authenticated account.
1168    ///
1169    /// # Errors
1170    ///
1171    /// Returns an error if the WebSocket is not connected, not authenticated, or if the subscription fails.
1172    pub async fn subscribe_positions(&self) -> Result<(), BitmexWsError> {
1173        if self.credential.is_none() {
1174            return Err(BitmexWsError::MissingCredentials);
1175        }
1176        self.subscribe(vec![BitmexWsAuthChannel::Position.to_string()])
1177            .await
1178    }
1179
1180    /// Subscribe to margin updates for the authenticated account.
1181    ///
1182    /// # Errors
1183    ///
1184    /// Returns an error if the WebSocket is not connected, not authenticated, or if the subscription fails.
1185    pub async fn subscribe_margin(&self) -> Result<(), BitmexWsError> {
1186        if self.credential.is_none() {
1187            return Err(BitmexWsError::MissingCredentials);
1188        }
1189        self.subscribe(vec![BitmexWsAuthChannel::Margin.to_string()])
1190            .await
1191    }
1192
1193    /// Subscribe to wallet updates for the authenticated account.
1194    ///
1195    /// # Errors
1196    ///
1197    /// Returns an error if the WebSocket is not connected, not authenticated, or if the subscription fails.
1198    pub async fn subscribe_wallet(&self) -> Result<(), BitmexWsError> {
1199        if self.credential.is_none() {
1200            return Err(BitmexWsError::MissingCredentials);
1201        }
1202        self.subscribe(vec![BitmexWsAuthChannel::Wallet.to_string()])
1203            .await
1204    }
1205
1206    /// Unsubscribe from order updates for the authenticated account.
1207    ///
1208    /// # Errors
1209    ///
1210    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1211    pub async fn unsubscribe_orders(&self) -> Result<(), BitmexWsError> {
1212        self.unsubscribe(vec![BitmexWsAuthChannel::Order.to_string()])
1213            .await
1214    }
1215
1216    /// Unsubscribe from execution updates for the authenticated account.
1217    ///
1218    /// # Errors
1219    ///
1220    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1221    pub async fn unsubscribe_executions(&self) -> Result<(), BitmexWsError> {
1222        self.unsubscribe(vec![BitmexWsAuthChannel::Execution.to_string()])
1223            .await
1224    }
1225
1226    /// Unsubscribe from position updates for the authenticated account.
1227    ///
1228    /// # Errors
1229    ///
1230    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1231    pub async fn unsubscribe_positions(&self) -> Result<(), BitmexWsError> {
1232        self.unsubscribe(vec![BitmexWsAuthChannel::Position.to_string()])
1233            .await
1234    }
1235
1236    /// Unsubscribe from margin updates for the authenticated account.
1237    ///
1238    /// # Errors
1239    ///
1240    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1241    pub async fn unsubscribe_margin(&self) -> Result<(), BitmexWsError> {
1242        self.unsubscribe(vec![BitmexWsAuthChannel::Margin.to_string()])
1243            .await
1244    }
1245
1246    /// Unsubscribe from wallet updates for the authenticated account.
1247    ///
1248    /// # Errors
1249    ///
1250    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1251    pub async fn unsubscribe_wallet(&self) -> Result<(), BitmexWsError> {
1252        self.unsubscribe(vec![BitmexWsAuthChannel::Wallet.to_string()])
1253            .await
1254    }
1255
1256    /// Sends a command to the handler.
1257    async fn send_cmd(&self, cmd: HandlerCommand) -> Result<(), BitmexWsError> {
1258        self.cmd_tx
1259            .read()
1260            .await
1261            .send(cmd)
1262            .map_err(|e| BitmexWsError::ClientError(format!("Handler not available: {e}")))
1263    }
1264}
1265
1266#[cfg(test)]
1267mod tests {
1268    use ahash::AHashSet;
1269    use rstest::rstest;
1270    use ustr::Ustr;
1271
1272    use super::*;
1273
1274    #[rstest]
1275    fn test_reconnect_topics_restoration_logic() {
1276        // Create real client with credentials
1277        let client = BitmexWebSocketClient::new(
1278            Some("ws://test.com".to_string()),
1279            Some("test_key".to_string()),
1280            Some("test_secret".to_string()),
1281            Some(AccountId::new("BITMEX-TEST")),
1282            None,
1283        )
1284        .unwrap();
1285
1286        // Populate subscriptions like they would be during normal operation
1287        let subs = client.subscriptions.confirmed();
1288        subs.insert(Ustr::from(BitmexWsTopic::Trade.as_ref()), {
1289            let mut set = AHashSet::new();
1290            set.insert(Ustr::from("XBTUSD"));
1291            set.insert(Ustr::from("ETHUSD"));
1292            set
1293        });
1294
1295        subs.insert(Ustr::from(BitmexWsTopic::OrderBookL2.as_ref()), {
1296            let mut set = AHashSet::new();
1297            set.insert(Ustr::from("XBTUSD"));
1298            set
1299        });
1300
1301        // Private channels (no symbols)
1302        subs.insert(Ustr::from(BitmexWsAuthChannel::Order.as_ref()), {
1303            let mut set = AHashSet::new();
1304            set.insert(Ustr::from(""));
1305            set
1306        });
1307        subs.insert(Ustr::from(BitmexWsAuthChannel::Position.as_ref()), {
1308            let mut set = AHashSet::new();
1309            set.insert(Ustr::from(""));
1310            set
1311        });
1312
1313        // Test the actual reconnection topic building logic
1314        let mut topics_to_restore = Vec::new();
1315        for entry in subs.iter() {
1316            let (channel, symbols) = entry.pair();
1317            for symbol in symbols {
1318                if symbol.is_empty() {
1319                    topics_to_restore.push(channel.to_string());
1320                } else {
1321                    topics_to_restore.push(format!("{channel}:{symbol}"));
1322                }
1323            }
1324        }
1325
1326        // Verify it builds the correct restoration topics
1327        assert!(topics_to_restore.contains(&format!("{}:XBTUSD", BitmexWsTopic::Trade.as_ref())));
1328        assert!(topics_to_restore.contains(&format!("{}:ETHUSD", BitmexWsTopic::Trade.as_ref())));
1329        assert!(
1330            topics_to_restore.contains(&format!("{}:XBTUSD", BitmexWsTopic::OrderBookL2.as_ref()))
1331        );
1332        assert!(topics_to_restore.contains(&BitmexWsAuthChannel::Order.as_ref().to_string()));
1333        assert!(topics_to_restore.contains(&BitmexWsAuthChannel::Position.as_ref().to_string()));
1334        assert_eq!(topics_to_restore.len(), 5);
1335    }
1336
1337    #[rstest]
1338    fn test_reconnect_auth_message_building() {
1339        // Test with credentials
1340        let client_with_creds = BitmexWebSocketClient::new(
1341            Some("ws://test.com".to_string()),
1342            Some("test_key".to_string()),
1343            Some("test_secret".to_string()),
1344            Some(AccountId::new("BITMEX-TEST")),
1345            None,
1346        )
1347        .unwrap();
1348
1349        // Test the actual auth message building logic from lines 220-228
1350        if let Some(cred) = &client_with_creds.credential {
1351            let expires = (chrono::Utc::now() + chrono::Duration::seconds(30)).timestamp();
1352            let signature = cred.sign("GET", "/realtime", expires, "");
1353
1354            let auth_message = BitmexAuthentication {
1355                op: BitmexWsAuthAction::AuthKeyExpires,
1356                args: (cred.api_key.to_string(), expires, signature),
1357            };
1358
1359            // Verify auth message structure
1360            assert_eq!(auth_message.op, BitmexWsAuthAction::AuthKeyExpires);
1361            assert_eq!(auth_message.args.0, "test_key");
1362            assert!(auth_message.args.1 > 0); // expires should be positive
1363            assert!(!auth_message.args.2.is_empty()); // signature should exist
1364        } else {
1365            panic!("Client should have credentials");
1366        }
1367
1368        // Test without credentials
1369        let client_no_creds = BitmexWebSocketClient::new(
1370            Some("ws://test.com".to_string()),
1371            None,
1372            None,
1373            Some(AccountId::new("BITMEX-TEST")),
1374            None,
1375        )
1376        .unwrap();
1377
1378        assert!(client_no_creds.credential.is_none());
1379    }
1380
1381    #[rstest]
1382    fn test_subscription_state_after_unsubscribe() {
1383        let client = BitmexWebSocketClient::new(
1384            Some("ws://test.com".to_string()),
1385            Some("test_key".to_string()),
1386            Some("test_secret".to_string()),
1387            Some(AccountId::new("BITMEX-TEST")),
1388            None,
1389        )
1390        .unwrap();
1391
1392        // Set up initial subscriptions
1393        let subs = client.subscriptions.confirmed();
1394        subs.insert(Ustr::from(BitmexWsTopic::Trade.as_ref()), {
1395            let mut set = AHashSet::new();
1396            set.insert(Ustr::from("XBTUSD"));
1397            set.insert(Ustr::from("ETHUSD"));
1398            set
1399        });
1400
1401        subs.insert(Ustr::from(BitmexWsTopic::OrderBookL2.as_ref()), {
1402            let mut set = AHashSet::new();
1403            set.insert(Ustr::from("XBTUSD"));
1404            set
1405        });
1406
1407        // Simulate unsubscribe logic (like from unsubscribe() method lines 586-599)
1408        let topic = format!("{}:ETHUSD", BitmexWsTopic::Trade.as_ref());
1409        if let Some((channel, symbol)) = topic.split_once(':')
1410            && let Some(mut entry) = subs.get_mut(&Ustr::from(channel))
1411        {
1412            entry.remove(&Ustr::from(symbol));
1413            if entry.is_empty() {
1414                drop(entry);
1415                subs.remove(&Ustr::from(channel));
1416            }
1417        }
1418
1419        // Build restoration topics after unsubscribe
1420        let mut topics_to_restore = Vec::new();
1421        for entry in subs.iter() {
1422            let (channel, symbols) = entry.pair();
1423            for symbol in symbols {
1424                if symbol.is_empty() {
1425                    topics_to_restore.push(channel.to_string());
1426                } else {
1427                    topics_to_restore.push(format!("{channel}:{symbol}"));
1428                }
1429            }
1430        }
1431
1432        // Should have XBTUSD trade but not ETHUSD trade
1433        let trade_xbt = format!("{}:XBTUSD", BitmexWsTopic::Trade.as_ref());
1434        let trade_eth = format!("{}:ETHUSD", BitmexWsTopic::Trade.as_ref());
1435        let book_xbt = format!("{}:XBTUSD", BitmexWsTopic::OrderBookL2.as_ref());
1436
1437        assert!(topics_to_restore.contains(&trade_xbt));
1438        assert!(!topics_to_restore.contains(&trade_eth));
1439        assert!(topics_to_restore.contains(&book_xbt));
1440        assert_eq!(topics_to_restore.len(), 2);
1441    }
1442
1443    #[rstest]
1444    fn test_race_unsubscribe_failure_recovery() {
1445        // Simulates the race condition where venue rejects an unsubscribe request.
1446        // The adapter must perform the 3-step recovery:
1447        // 1. confirm_unsubscribe() - clear pending_unsubscribe
1448        // 2. mark_subscribe() - mark as subscribing again
1449        // 3. confirm_subscribe() - restore to confirmed state
1450        let client = BitmexWebSocketClient::new(
1451            Some("ws://test.com".to_string()),
1452            None,
1453            None,
1454            Some(AccountId::new("BITMEX-TEST")),
1455            None,
1456        )
1457        .unwrap();
1458
1459        let topic = format!("{}:XBTUSD", BitmexWsTopic::Trade.as_ref());
1460
1461        // Initial subscribe flow
1462        client.subscriptions.mark_subscribe(&topic);
1463        client.subscriptions.confirm_subscribe(&topic);
1464        assert_eq!(client.subscriptions.len(), 1);
1465
1466        // User unsubscribes
1467        client.subscriptions.mark_unsubscribe(&topic);
1468        assert_eq!(client.subscriptions.len(), 0);
1469        assert_eq!(
1470            client.subscriptions.pending_unsubscribe_topics(),
1471            vec![topic.clone()]
1472        );
1473
1474        // Venue REJECTS the unsubscribe (error message)
1475        // Adapter must perform 3-step recovery (from lines 1884-1891)
1476        client.subscriptions.confirm_unsubscribe(&topic); // Step 1: clear pending_unsubscribe
1477        client.subscriptions.mark_subscribe(&topic); // Step 2: mark as subscribing
1478        client.subscriptions.confirm_subscribe(&topic); // Step 3: confirm subscription
1479
1480        // Verify recovery: topic should be back in confirmed state
1481        assert_eq!(client.subscriptions.len(), 1);
1482        assert!(client.subscriptions.pending_unsubscribe_topics().is_empty());
1483        assert!(client.subscriptions.pending_subscribe_topics().is_empty());
1484
1485        // Verify topic is in all_topics() for reconnect
1486        let all = client.subscriptions.all_topics();
1487        assert_eq!(all.len(), 1);
1488        assert!(all.contains(&topic));
1489    }
1490
1491    #[rstest]
1492    fn test_race_resubscribe_before_unsubscribe_ack() {
1493        // Simulates: User unsubscribes, then immediately resubscribes before
1494        // the unsubscribe ACK arrives from the venue.
1495        // This is the race condition fixed in the subscription tracker.
1496        let client = BitmexWebSocketClient::new(
1497            Some("ws://test.com".to_string()),
1498            None,
1499            None,
1500            Some(AccountId::new("BITMEX-TEST")),
1501            None,
1502        )
1503        .unwrap();
1504
1505        let topic = format!("{}:XBTUSD", BitmexWsTopic::OrderBookL2.as_ref());
1506
1507        // Initial subscribe
1508        client.subscriptions.mark_subscribe(&topic);
1509        client.subscriptions.confirm_subscribe(&topic);
1510        assert_eq!(client.subscriptions.len(), 1);
1511
1512        // User unsubscribes
1513        client.subscriptions.mark_unsubscribe(&topic);
1514        assert_eq!(client.subscriptions.len(), 0);
1515        assert_eq!(
1516            client.subscriptions.pending_unsubscribe_topics(),
1517            vec![topic.clone()]
1518        );
1519
1520        // User immediately changes mind and resubscribes (before unsubscribe ACK)
1521        client.subscriptions.mark_subscribe(&topic);
1522        assert_eq!(
1523            client.subscriptions.pending_subscribe_topics(),
1524            vec![topic.clone()]
1525        );
1526
1527        // NOW the unsubscribe ACK arrives - should NOT clear pending_subscribe
1528        client.subscriptions.confirm_unsubscribe(&topic);
1529        assert!(client.subscriptions.pending_unsubscribe_topics().is_empty());
1530        assert_eq!(
1531            client.subscriptions.pending_subscribe_topics(),
1532            vec![topic.clone()]
1533        );
1534
1535        // Subscribe ACK arrives
1536        client.subscriptions.confirm_subscribe(&topic);
1537        assert_eq!(client.subscriptions.len(), 1);
1538        assert!(client.subscriptions.pending_subscribe_topics().is_empty());
1539
1540        // Verify final state is correct
1541        let all = client.subscriptions.all_topics();
1542        assert_eq!(all.len(), 1);
1543        assert!(all.contains(&topic));
1544    }
1545
1546    #[rstest]
1547    fn test_race_channel_level_reconnection_with_pending_states() {
1548        // Simulates reconnection with mixed pending states including channel-level subscriptions.
1549        let client = BitmexWebSocketClient::new(
1550            Some("ws://test.com".to_string()),
1551            Some("test_key".to_string()),
1552            Some("test_secret".to_string()),
1553            Some(AccountId::new("BITMEX-TEST")),
1554            None,
1555        )
1556        .unwrap();
1557
1558        // Set up mixed state before reconnection
1559        // Confirmed: trade:XBTUSD
1560        let trade_xbt = format!("{}:XBTUSD", BitmexWsTopic::Trade.as_ref());
1561        client.subscriptions.mark_subscribe(&trade_xbt);
1562        client.subscriptions.confirm_subscribe(&trade_xbt);
1563
1564        // Confirmed: order (channel-level, no symbol)
1565        let order_channel = BitmexWsAuthChannel::Order.as_ref();
1566        client.subscriptions.mark_subscribe(order_channel);
1567        client.subscriptions.confirm_subscribe(order_channel);
1568
1569        // Pending subscribe: trade:ETHUSD
1570        let trade_eth = format!("{}:ETHUSD", BitmexWsTopic::Trade.as_ref());
1571        client.subscriptions.mark_subscribe(&trade_eth);
1572
1573        // Pending unsubscribe: orderBookL2:XBTUSD (user cancelled)
1574        let book_xbt = format!("{}:XBTUSD", BitmexWsTopic::OrderBookL2.as_ref());
1575        client.subscriptions.mark_subscribe(&book_xbt);
1576        client.subscriptions.confirm_subscribe(&book_xbt);
1577        client.subscriptions.mark_unsubscribe(&book_xbt);
1578
1579        // Get topics for reconnection
1580        let topics_to_restore = client.subscriptions.all_topics();
1581
1582        // Should include: confirmed + pending_subscribe (NOT pending_unsubscribe)
1583        assert_eq!(topics_to_restore.len(), 3);
1584        assert!(topics_to_restore.contains(&trade_xbt));
1585        assert!(topics_to_restore.contains(&order_channel.to_string()));
1586        assert!(topics_to_restore.contains(&trade_eth));
1587        assert!(!topics_to_restore.contains(&book_xbt)); // Excluded
1588
1589        // Verify channel-level marker is handled correctly
1590        // order channel should not have ':' delimiter
1591        for topic in &topics_to_restore {
1592            if topic == order_channel {
1593                assert!(
1594                    !topic.contains(':'),
1595                    "Channel-level topic should not have delimiter"
1596                );
1597            }
1598        }
1599    }
1600}