nautilus_bitmex/websocket/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides the WebSocket client integration for the [BitMEX](https://bitmex.com) WebSocket API.
17//!
18//! This module defines and implements a [`BitmexWebSocketClient`] for
19//! connecting to BitMEX WebSocket streams. It handles authentication (when credentials
20//! are provided), manages subscriptions to market data and account update channels,
21//! and parses incoming messages into structured Nautilus domain objects.
22
23use std::{
24    sync::{
25        Arc,
26        atomic::{AtomicBool, AtomicU8, Ordering},
27    },
28    time::Duration,
29};
30
31use arc_swap::ArcSwap;
32use dashmap::DashMap;
33use futures_util::Stream;
34use nautilus_common::live::get_runtime;
35use nautilus_core::{
36    consts::NAUTILUS_USER_AGENT,
37    env::{get_env_var, get_or_env_var_opt},
38};
39use nautilus_model::{
40    data::bar::BarType,
41    enums::OrderType,
42    identifiers::{AccountId, ClientOrderId, InstrumentId},
43    instruments::{Instrument, InstrumentAny},
44};
45use nautilus_network::{
46    http::USER_AGENT,
47    mode::ConnectionMode,
48    websocket::{
49        AUTHENTICATION_TIMEOUT_SECS, AuthTracker, PingHandler, SubscriptionState, WebSocketClient,
50        WebSocketConfig, channel_message_handler,
51    },
52};
53use tokio_tungstenite::tungstenite::Message;
54use ustr::Ustr;
55
56use super::{
57    enums::{BitmexWsAuthAction, BitmexWsAuthChannel, BitmexWsOperation, BitmexWsTopic},
58    error::BitmexWsError,
59    handler::{FeedHandler, HandlerCommand},
60    messages::{BitmexAuthentication, BitmexSubscription, NautilusWsMessage},
61    parse::{is_index_symbol, topic_from_bar_spec},
62};
63use crate::common::{
64    consts::{BITMEX_WS_TOPIC_DELIMITER, BITMEX_WS_URL},
65    credential::Credential,
66};
67
68/// Provides a WebSocket client for connecting to the [BitMEX](https://bitmex.com) real-time API.
69///
70/// Key runtime patterns:
71/// - Authentication handshakes are managed by the internal auth tracker, ensuring resubscriptions
72///   occur only after BitMEX acknowledges `authKey` messages.
73/// - The subscription state maintains pending and confirmed topics so reconnection replay is
74///   deterministic and per-topic errors are surfaced.
75#[derive(Clone, Debug)]
76#[cfg_attr(
77    feature = "python",
78    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.bitmex")
79)]
80pub struct BitmexWebSocketClient {
81    url: String,
82    credential: Option<Credential>,
83    heartbeat: Option<u64>,
84    account_id: AccountId,
85    auth_tracker: AuthTracker,
86    signal: Arc<AtomicBool>,
87    connection_mode: Arc<ArcSwap<AtomicU8>>,
88    cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<HandlerCommand>>>,
89    out_rx: Option<Arc<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>>>,
90    task_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
91    subscriptions: SubscriptionState,
92    tracked_subscriptions: Arc<DashMap<String, ()>>,
93    instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
94    order_type_cache: Arc<DashMap<ClientOrderId, OrderType>>,
95    order_symbol_cache: Arc<DashMap<ClientOrderId, Ustr>>,
96}
97
98impl BitmexWebSocketClient {
99    /// Creates a new [`BitmexWebSocketClient`] instance.
100    ///
101    /// # Errors
102    ///
103    /// Returns an error if only one of `api_key` or `api_secret` is provided (both or neither required).
104    pub fn new(
105        url: Option<String>,
106        api_key: Option<String>,
107        api_secret: Option<String>,
108        account_id: Option<AccountId>,
109        heartbeat: Option<u64>,
110    ) -> anyhow::Result<Self> {
111        let credential = match (api_key, api_secret) {
112            (Some(key), Some(secret)) => Some(Credential::new(key, secret)),
113            (None, None) => None,
114            _ => anyhow::bail!("Both `api_key` and `api_secret` must be provided together"),
115        };
116
117        let account_id = account_id.unwrap_or(AccountId::from("BITMEX-master"));
118
119        let initial_mode = AtomicU8::new(ConnectionMode::Closed.as_u8());
120        let connection_mode = Arc::new(ArcSwap::from_pointee(initial_mode));
121
122        // We don't have a handler yet; this placeholder keeps cache_instrument() working,
123        // connect() swaps in the real channel and replays any queued instruments so the
124        // handler sees them once it starts.
125        let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
126
127        Ok(Self {
128            url: url.unwrap_or(BITMEX_WS_URL.to_string()),
129            credential,
130            heartbeat,
131            account_id,
132            auth_tracker: AuthTracker::new(),
133            signal: Arc::new(AtomicBool::new(false)),
134            connection_mode,
135            cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
136            out_rx: None,
137            task_handle: None,
138            subscriptions: SubscriptionState::new(BITMEX_WS_TOPIC_DELIMITER),
139            tracked_subscriptions: Arc::new(DashMap::new()),
140            instruments_cache: Arc::new(DashMap::new()),
141            order_type_cache: Arc::new(DashMap::new()),
142            order_symbol_cache: Arc::new(DashMap::new()),
143        })
144    }
145
146    /// Creates a new [`BitmexWebSocketClient`] with environment variable credential resolution.
147    ///
148    /// If `api_key` or `api_secret` are not provided, they will be loaded from
149    /// environment variables based on the `testnet` flag:
150    /// - Testnet: `BITMEX_TESTNET_API_KEY`, `BITMEX_TESTNET_API_SECRET`
151    /// - Mainnet: `BITMEX_API_KEY`, `BITMEX_API_SECRET`
152    ///
153    /// # Errors
154    ///
155    /// Returns an error if only one of `api_key` or `api_secret` is provided.
156    pub fn new_with_env(
157        url: Option<String>,
158        api_key: Option<String>,
159        api_secret: Option<String>,
160        account_id: Option<AccountId>,
161        heartbeat: Option<u64>,
162        testnet: bool,
163    ) -> anyhow::Result<Self> {
164        let (api_key_env, api_secret_env) = if testnet {
165            ("BITMEX_TESTNET_API_KEY", "BITMEX_TESTNET_API_SECRET")
166        } else {
167            ("BITMEX_API_KEY", "BITMEX_API_SECRET")
168        };
169
170        let key = get_or_env_var_opt(api_key, api_key_env);
171        let secret = get_or_env_var_opt(api_secret, api_secret_env);
172
173        Self::new(url, key, secret, account_id, heartbeat)
174    }
175
176    /// Creates a new authenticated [`BitmexWebSocketClient`] using environment variables.
177    ///
178    /// # Errors
179    ///
180    /// Returns an error if environment variables are not set or credentials are invalid.
181    pub fn from_env() -> anyhow::Result<Self> {
182        let url = get_env_var("BITMEX_WS_URL")?;
183        let api_key = get_env_var("BITMEX_API_KEY")?;
184        let api_secret = get_env_var("BITMEX_API_SECRET")?;
185
186        Self::new(Some(url), Some(api_key), Some(api_secret), None, None)
187    }
188
189    /// Returns the websocket url being used by the client.
190    #[must_use]
191    pub const fn url(&self) -> &str {
192        self.url.as_str()
193    }
194
195    /// Returns the public API key being used by the client.
196    #[must_use]
197    pub fn api_key(&self) -> Option<&str> {
198        self.credential.as_ref().map(|c| c.api_key.as_str())
199    }
200
201    /// Returns a masked version of the API key for logging purposes.
202    #[must_use]
203    pub fn api_key_masked(&self) -> Option<String> {
204        self.credential.as_ref().map(|c| c.api_key_masked())
205    }
206
207    /// Returns a value indicating whether the client is active.
208    #[must_use]
209    pub fn is_active(&self) -> bool {
210        let connection_mode_arc = self.connection_mode.load();
211        ConnectionMode::from_atomic(&connection_mode_arc).is_active()
212            && !self.signal.load(Ordering::Relaxed)
213    }
214
215    /// Returns a value indicating whether the client is closed.
216    #[must_use]
217    pub fn is_closed(&self) -> bool {
218        let connection_mode_arc = self.connection_mode.load();
219        ConnectionMode::from_atomic(&connection_mode_arc).is_closed()
220            || self.signal.load(Ordering::Relaxed)
221    }
222
223    /// Sets the account ID.
224    pub fn set_account_id(&mut self, account_id: AccountId) {
225        self.account_id = account_id;
226    }
227
228    /// Caches multiple instruments.
229    ///
230    /// Clears the existing cache first, then adds all provided instruments.
231    pub fn cache_instruments(&mut self, instruments: Vec<InstrumentAny>) {
232        self.instruments_cache.clear();
233        let mut count = 0;
234
235        log::debug!("Initializing BitMEX instrument cache");
236
237        for inst in instruments {
238            let symbol = inst.symbol().inner();
239            self.instruments_cache.insert(symbol, inst.clone());
240            log::debug!("Cached instrument: {symbol}");
241            count += 1;
242        }
243
244        log::info!("BitMEX instrument cache initialized with {count} instruments");
245    }
246
247    /// Caches a single instrument.
248    ///
249    /// Any existing instrument with the same symbol will be replaced.
250    pub fn cache_instrument(&self, instrument: InstrumentAny) {
251        self.instruments_cache
252            .insert(instrument.symbol().inner(), instrument.clone());
253
254        // Before connect() the handler isn't running; this send will fail and that's expected
255        // because connect() replays the instruments via InitializeInstruments
256        if let Ok(cmd_tx) = self.cmd_tx.try_read()
257            && let Err(e) = cmd_tx.send(HandlerCommand::UpdateInstrument(instrument))
258        {
259            log::debug!("Failed to send instrument update to handler: {e}");
260        }
261    }
262
263    /// Connect to the BitMEX WebSocket server.
264    ///
265    /// # Errors
266    ///
267    /// Returns an error if the WebSocket connection fails or authentication fails (if credentials provided).
268    ///
269    /// # Panics
270    ///
271    /// Panics if subscription or authentication messages fail to serialize to JSON.
272    pub async fn connect(&mut self) -> Result<(), BitmexWsError> {
273        let (client, raw_rx) = self.connect_inner().await?;
274
275        // Replace connection state so all clones see the underlying WebSocketClient's state
276        self.connection_mode.store(client.connection_mode_atomic());
277
278        let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<NautilusWsMessage>();
279        self.out_rx = Some(Arc::new(out_rx));
280
281        let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
282        *self.cmd_tx.write().await = cmd_tx.clone();
283
284        // Send WebSocketClient to handler
285        if let Err(e) = cmd_tx.send(HandlerCommand::SetClient(client)) {
286            return Err(BitmexWsError::ClientError(format!(
287                "Failed to send WebSocketClient to handler: {e}"
288            )));
289        }
290
291        // Replay cached instruments to the new handler via the new channel
292        if !self.instruments_cache.is_empty() {
293            let cached_instruments: Vec<InstrumentAny> = self
294                .instruments_cache
295                .iter()
296                .map(|entry| entry.value().clone())
297                .collect();
298            if let Err(e) = cmd_tx.send(HandlerCommand::InitializeInstruments(cached_instruments)) {
299                log::error!("Failed to replay instruments to handler: {e}");
300            }
301        }
302
303        let signal = self.signal.clone();
304        let account_id = self.account_id;
305        let credential = self.credential.clone();
306        let auth_tracker = self.auth_tracker.clone();
307        let subscriptions = self.subscriptions.clone();
308        let order_type_cache = self.order_type_cache.clone();
309        let order_symbol_cache = self.order_symbol_cache.clone();
310        let cmd_tx_for_reconnect = cmd_tx.clone();
311
312        let stream_handle = get_runtime().spawn(async move {
313            let mut handler = FeedHandler::new(
314                signal.clone(),
315                cmd_rx,
316                raw_rx,
317                out_tx,
318                account_id,
319                auth_tracker.clone(),
320                subscriptions.clone(),
321                order_type_cache,
322                order_symbol_cache,
323            );
324
325            // Helper closure to resubscribe all tracked subscriptions after reconnection
326            let resubscribe_all = || {
327                // Use SubscriptionState as source of truth for what to restore
328                let topics = subscriptions.all_topics();
329
330                if topics.is_empty() {
331                    return;
332                }
333
334                log::debug!(
335                    "Resubscribing to confirmed subscriptions: count={}",
336                    topics.len()
337                );
338
339                for topic in &topics {
340                    subscriptions.mark_subscribe(topic.as_str());
341                }
342
343                // Serialize subscription messages
344                let mut payloads = Vec::with_capacity(topics.len());
345                for topic in &topics {
346                    let message = BitmexSubscription {
347                        op: BitmexWsOperation::Subscribe,
348                        args: vec![Ustr::from(topic.as_ref())],
349                    };
350                    if let Ok(payload) = serde_json::to_string(&message) {
351                        payloads.push(payload);
352                    }
353                }
354
355                if let Err(e) =
356                    cmd_tx_for_reconnect.send(HandlerCommand::Subscribe { topics: payloads })
357                {
358                    log::error!("Failed to send resubscribe command: {e}");
359                }
360            };
361
362            // Run message processing with reconnection handling
363            loop {
364                match handler.next().await {
365                    Some(NautilusWsMessage::Reconnected) => {
366                        if signal.load(Ordering::Relaxed) {
367                            continue;
368                        }
369
370                        log::info!("WebSocket reconnected");
371
372                        // Mark all confirmed subscriptions as failed so they transition to pending state
373                        let confirmed_topics: Vec<String> = {
374                            let confirmed = subscriptions.confirmed();
375                            let mut topics = Vec::new();
376
377                            for entry in confirmed.iter() {
378                                let (channel, symbols) = entry.pair();
379
380                                if *channel == BitmexWsTopic::Instrument.as_ref() {
381                                    continue;
382                                }
383
384                                for symbol in symbols {
385                                    if symbol.is_empty() {
386                                        topics.push(channel.to_string());
387                                    } else {
388                                        topics.push(format!("{channel}:{symbol}"));
389                                    }
390                                }
391                            }
392
393                            topics
394                        };
395
396                        if !confirmed_topics.is_empty() {
397                            log::debug!(
398                                "Marking confirmed subscriptions as pending for replay: count={}",
399                                confirmed_topics.len()
400                            );
401                            for topic in confirmed_topics {
402                                subscriptions.mark_failure(&topic);
403                            }
404                        }
405
406                        if let Some(cred) = &credential {
407                            log::debug!("Re-authenticating after reconnection");
408
409                            let expires =
410                                (chrono::Utc::now() + chrono::Duration::seconds(30)).timestamp();
411                            let signature = cred.sign("GET", "/realtime", expires, "");
412
413                            let auth_message = BitmexAuthentication {
414                                op: BitmexWsAuthAction::AuthKeyExpires,
415                                args: (cred.api_key.to_string(), expires, signature),
416                            };
417
418                            if let Ok(payload) = serde_json::to_string(&auth_message) {
419                                if let Err(e) = cmd_tx_for_reconnect
420                                    .send(HandlerCommand::Authenticate { payload })
421                                {
422                                    log::error!("Failed to send reconnection auth command: {e}");
423                                }
424                            } else {
425                                log::error!("Failed to serialize reconnection auth message");
426                            }
427                        }
428
429                        // Unauthenticated sessions resubscribe immediately after reconnection,
430                        // authenticated sessions wait for Authenticated message
431                        if credential.is_none() {
432                            log::debug!("No authentication required, resubscribing immediately");
433                            resubscribe_all();
434                        }
435
436                        // TODO: Implement proper Reconnected event forwarding to consumers.
437                        // Currently intercepted for internal housekeeping only. Will add new
438                        // message type from WebSocketClient to notify consumers of reconnections.
439
440                        continue;
441                    }
442                    Some(NautilusWsMessage::Authenticated) => {
443                        log::debug!("Authenticated after reconnection, resubscribing");
444                        resubscribe_all();
445                        continue;
446                    }
447                    Some(msg) => {
448                        if handler.send(msg).is_err() {
449                            log::error!("Failed to send message (receiver dropped)");
450                            break;
451                        }
452                    }
453                    None => {
454                        // Stream ended - check if it's a stop signal
455                        if handler.is_stopped() {
456                            log::debug!("Stop signal received, ending message processing");
457                            break;
458                        }
459                        // Otherwise it's an unexpected stream end
460                        log::warn!("WebSocket stream ended unexpectedly");
461                        break;
462                    }
463                }
464            }
465
466            log::debug!("Handler task exiting");
467        });
468
469        self.task_handle = Some(Arc::new(stream_handle));
470
471        if self.credential.is_some()
472            && let Err(e) = self.authenticate().await
473        {
474            return Err(e);
475        }
476
477        // Subscribe to instrument topic
478        let instrument_topic = BitmexWsTopic::Instrument.as_ref().to_string();
479        self.subscriptions.mark_subscribe(&instrument_topic);
480        self.tracked_subscriptions.insert(instrument_topic, ());
481
482        let subscribe_msg = BitmexSubscription {
483            op: BitmexWsOperation::Subscribe,
484            args: vec![Ustr::from(BitmexWsTopic::Instrument.as_ref())],
485        };
486
487        match serde_json::to_string(&subscribe_msg) {
488            Ok(subscribe_json) => {
489                if let Err(e) = self.cmd_tx.read().await.send(HandlerCommand::Subscribe {
490                    topics: vec![subscribe_json],
491                }) {
492                    log::error!("Failed to send subscribe command for instruments: {e}");
493                } else {
494                    log::debug!("Subscribed to all instruments");
495                }
496            }
497            Err(e) => {
498                log::error!("Failed to serialize subscribe message: {e}");
499            }
500        }
501
502        Ok(())
503    }
504
505    /// Connect to the WebSocket and return a message receiver.
506    ///
507    /// # Errors
508    ///
509    /// Returns an error if the WebSocket connection fails or if authentication fails (when credentials are provided).
510    async fn connect_inner(
511        &mut self,
512    ) -> Result<
513        (
514            WebSocketClient,
515            tokio::sync::mpsc::UnboundedReceiver<Message>,
516        ),
517        BitmexWsError,
518    > {
519        let (message_handler, rx) = channel_message_handler();
520
521        // No-op ping handler: handler owns the WebSocketClient and responds to pings directly
522        // in the message loop for minimal latency (see handler.rs pong response)
523        let ping_handler: PingHandler = Arc::new(move |_payload: Vec<u8>| {
524            // Handler responds to pings internally via select! loop
525        });
526
527        let config = WebSocketConfig {
528            url: self.url.clone(),
529            headers: vec![(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())],
530            heartbeat: self.heartbeat,
531            heartbeat_msg: None,
532            reconnect_timeout_ms: Some(5_000),
533            reconnect_delay_initial_ms: None, // Use default
534            reconnect_delay_max_ms: None,     // Use default
535            reconnect_backoff_factor: None,   // Use default
536            reconnect_jitter_ms: None,        // Use default
537            reconnect_max_attempts: None,
538        };
539
540        let keyed_quotas = vec![];
541        let client = WebSocketClient::connect(
542            config,
543            Some(message_handler),
544            Some(ping_handler),
545            None, // post_reconnection
546            keyed_quotas,
547            None, // default_quota
548        )
549        .await
550        .map_err(|e| BitmexWsError::ClientError(e.to_string()))?;
551
552        Ok((client, rx))
553    }
554
555    /// Authenticate the WebSocket connection using the provided credentials.
556    ///
557    /// # Errors
558    ///
559    /// Returns an error if the WebSocket is not connected, if authentication fails,
560    /// or if credentials are not available.
561    async fn authenticate(&self) -> Result<(), BitmexWsError> {
562        let credential = match &self.credential {
563            Some(credential) => credential,
564            None => {
565                return Err(BitmexWsError::AuthenticationError(
566                    "API credentials not available to authenticate".to_string(),
567                ));
568            }
569        };
570
571        let receiver = self.auth_tracker.begin();
572
573        let expires = (chrono::Utc::now() + chrono::Duration::seconds(30)).timestamp();
574        let signature = credential.sign("GET", "/realtime", expires, "");
575
576        let auth_message = BitmexAuthentication {
577            op: BitmexWsAuthAction::AuthKeyExpires,
578            args: (credential.api_key.to_string(), expires, signature),
579        };
580
581        let auth_json = serde_json::to_string(&auth_message).map_err(|e| {
582            let msg = format!("Failed to serialize auth message: {e}");
583            self.auth_tracker.fail(msg.clone());
584            BitmexWsError::AuthenticationError(msg)
585        })?;
586
587        // Send Authenticate command to handler
588        self.cmd_tx
589            .read()
590            .await
591            .send(HandlerCommand::Authenticate { payload: auth_json })
592            .map_err(|e| {
593                let msg = format!("Failed to send authenticate command: {e}");
594                self.auth_tracker.fail(msg.clone());
595                BitmexWsError::AuthenticationError(msg)
596            })?;
597
598        self.auth_tracker
599            .wait_for_result::<BitmexWsError>(
600                Duration::from_secs(AUTHENTICATION_TIMEOUT_SECS),
601                receiver,
602            )
603            .await
604    }
605
606    /// Wait until the WebSocket connection is active.
607    ///
608    /// # Errors
609    ///
610    /// Returns an error if the connection times out.
611    pub async fn wait_until_active(&self, timeout_secs: f64) -> Result<(), BitmexWsError> {
612        let timeout = Duration::from_secs_f64(timeout_secs);
613
614        tokio::time::timeout(timeout, async {
615            while !self.is_active() {
616                tokio::time::sleep(Duration::from_millis(10)).await;
617            }
618        })
619        .await
620        .map_err(|_| {
621            BitmexWsError::ClientError(format!(
622                "WebSocket connection timeout after {timeout_secs} seconds"
623            ))
624        })?;
625
626        Ok(())
627    }
628
629    /// Provides the internal stream as a channel-based stream.
630    ///
631    /// # Panics
632    ///
633    /// This function panics:
634    /// - If the websocket is not connected.
635    /// - If `stream` has already been called somewhere else (stream receiver is then taken).
636    pub fn stream(&mut self) -> impl Stream<Item = NautilusWsMessage> + use<> {
637        let rx = self
638            .out_rx
639            .take()
640            .expect("Stream receiver already taken or not connected");
641        let mut rx = Arc::try_unwrap(rx).expect("Cannot take ownership - other references exist");
642        async_stream::stream! {
643            while let Some(msg) = rx.recv().await {
644                yield msg;
645            }
646        }
647    }
648
649    /// Closes the client.
650    ///
651    /// # Errors
652    ///
653    /// Returns an error if the WebSocket is not connected or if closing fails.
654    ///
655    /// # Panics
656    ///
657    /// Panics if the task handle cannot be unwrapped (should never happen in normal usage).
658    pub async fn close(&mut self) -> Result<(), BitmexWsError> {
659        log::debug!("Starting close process");
660
661        self.signal.store(true, Ordering::Relaxed);
662
663        // Send Disconnect command to handler
664        if let Err(e) = self.cmd_tx.read().await.send(HandlerCommand::Disconnect) {
665            log::debug!(
666                "Failed to send disconnect command (handler may already be shut down): {e}"
667            );
668        }
669
670        // Clean up task handle with timeout
671        if let Some(task_handle) = self.task_handle.take() {
672            match Arc::try_unwrap(task_handle) {
673                Ok(handle) => {
674                    log::debug!("Waiting for task handle to complete");
675                    match tokio::time::timeout(Duration::from_secs(2), handle).await {
676                        Ok(Ok(())) => log::debug!("Task handle completed successfully"),
677                        Ok(Err(e)) => log::error!("Task handle encountered an error: {e:?}"),
678                        Err(_) => {
679                            log::warn!(
680                                "Timeout waiting for task handle, task may still be running"
681                            );
682                            // The task will be dropped and should clean up automatically
683                        }
684                    }
685                }
686                Err(arc_handle) => {
687                    log::debug!(
688                        "Cannot take ownership of task handle - other references exist, aborting task"
689                    );
690                    arc_handle.abort();
691                }
692            }
693        } else {
694            log::debug!("No task handle to await");
695        }
696
697        log::debug!("Closed");
698
699        Ok(())
700    }
701
702    /// Subscribe to the specified topics.
703    ///
704    /// # Errors
705    ///
706    /// Returns an error if the WebSocket is not connected or if sending the subscription message fails.
707    ///
708    /// # Panics
709    ///
710    /// Panics if serialization of WebSocket messages fails (should never happen).
711    pub async fn subscribe(&self, topics: Vec<String>) -> Result<(), BitmexWsError> {
712        log::debug!("Subscribing to topics: {topics:?}");
713
714        for topic in &topics {
715            self.subscriptions.mark_subscribe(topic.as_str());
716            self.tracked_subscriptions.insert(topic.clone(), ());
717        }
718
719        // Serialize subscription messages
720        let mut payloads = Vec::with_capacity(topics.len());
721        for topic in &topics {
722            let message = BitmexSubscription {
723                op: BitmexWsOperation::Subscribe,
724                args: vec![Ustr::from(topic.as_ref())],
725            };
726            let payload = serde_json::to_string(&message).map_err(|e| {
727                BitmexWsError::SubscriptionError(format!("Failed to serialize subscription: {e}"))
728            })?;
729            payloads.push(payload);
730        }
731
732        // Send Subscribe command to handler
733        let cmd = HandlerCommand::Subscribe { topics: payloads };
734
735        self.send_cmd(cmd).await.map_err(|e| {
736            BitmexWsError::SubscriptionError(format!("Failed to send subscribe command: {e}"))
737        })
738    }
739
740    /// Unsubscribe from the specified topics.
741    ///
742    /// # Errors
743    ///
744    /// Returns an error if the WebSocket is not connected or if sending the unsubscription message fails.
745    async fn unsubscribe(&self, topics: Vec<String>) -> Result<(), BitmexWsError> {
746        log::debug!("Attempting to unsubscribe from topics: {topics:?}");
747
748        if self.signal.load(Ordering::Relaxed) {
749            log::debug!("Shutdown signal detected, skipping unsubscribe");
750            return Ok(());
751        }
752
753        for topic in &topics {
754            self.subscriptions.mark_unsubscribe(topic.as_str());
755            self.tracked_subscriptions.remove(topic);
756        }
757
758        // Serialize unsubscription messages
759        let mut payloads = Vec::with_capacity(topics.len());
760        for topic in &topics {
761            let message = BitmexSubscription {
762                op: BitmexWsOperation::Unsubscribe,
763                args: vec![Ustr::from(topic.as_ref())],
764            };
765            if let Ok(payload) = serde_json::to_string(&message) {
766                payloads.push(payload);
767            }
768        }
769
770        // Send Unsubscribe command to handler
771        let cmd = HandlerCommand::Unsubscribe { topics: payloads };
772
773        if let Err(e) = self.send_cmd(cmd).await {
774            log::debug!("Failed to send unsubscribe command: {e}");
775        }
776
777        Ok(())
778    }
779
780    /// Get the current number of active subscriptions.
781    #[must_use]
782    pub fn subscription_count(&self) -> usize {
783        self.subscriptions.len()
784    }
785
786    pub fn get_subscriptions(&self, instrument_id: InstrumentId) -> Vec<String> {
787        let symbol = instrument_id.symbol.inner();
788        let confirmed = self.subscriptions.confirmed();
789        let mut channels = Vec::with_capacity(confirmed.len());
790
791        for entry in confirmed.iter() {
792            let (channel, symbols) = entry.pair();
793            if symbols.contains(&symbol) {
794                // Return the full topic string (e.g., "orderBookL2:XBTUSD")
795                channels.push(format!("{channel}:{symbol}"));
796            } else {
797                let has_channel_marker = symbols.iter().any(|s| s.is_empty());
798                if has_channel_marker
799                    && (*channel == BitmexWsAuthChannel::Execution.as_ref()
800                        || *channel == BitmexWsAuthChannel::Order.as_ref())
801                {
802                    // These are account-level subscriptions without symbols
803                    channels.push(channel.to_string());
804                }
805            }
806        }
807
808        channels
809    }
810
811    /// Subscribe to instrument updates for all instruments on the venue.
812    ///
813    /// # Errors
814    ///
815    /// Returns an error if the WebSocket is not connected or if the subscription fails.
816    pub async fn subscribe_instruments(&self) -> Result<(), BitmexWsError> {
817        // Already subscribed automatically on connection
818        log::debug!("Already subscribed to all instruments on connection, skipping");
819        Ok(())
820    }
821
822    /// Subscribe to instrument updates (mark/index prices) for the specified instrument.
823    ///
824    /// # Errors
825    ///
826    /// Returns an error if the WebSocket is not connected or if the subscription fails.
827    pub async fn subscribe_instrument(
828        &self,
829        instrument_id: InstrumentId,
830    ) -> Result<(), BitmexWsError> {
831        // Already subscribed to all instruments on connection
832        log::debug!(
833            "Already subscribed to all instruments on connection (includes {instrument_id}), skipping"
834        );
835        Ok(())
836    }
837
838    /// Subscribe to order book updates for the specified instrument.
839    ///
840    /// # Errors
841    ///
842    /// Returns an error if the WebSocket is not connected or if the subscription fails.
843    pub async fn subscribe_book(&self, instrument_id: InstrumentId) -> Result<(), BitmexWsError> {
844        let topic = BitmexWsTopic::OrderBookL2;
845        let symbol = instrument_id.symbol.inner();
846        self.subscribe(vec![format!("{topic}:{symbol}")]).await
847    }
848
849    /// Subscribe to order book L2 (25 levels) updates for the specified instrument.
850    ///
851    /// # Errors
852    ///
853    /// Returns an error if the WebSocket is not connected or if the subscription fails.
854    pub async fn subscribe_book_25(
855        &self,
856        instrument_id: InstrumentId,
857    ) -> Result<(), BitmexWsError> {
858        let topic = BitmexWsTopic::OrderBookL2_25;
859        let symbol = instrument_id.symbol.inner();
860        self.subscribe(vec![format!("{topic}:{symbol}")]).await
861    }
862
863    /// Subscribe to order book depth 10 updates for the specified instrument.
864    ///
865    /// # Errors
866    ///
867    /// Returns an error if the WebSocket is not connected or if the subscription fails.
868    pub async fn subscribe_book_depth10(
869        &self,
870        instrument_id: InstrumentId,
871    ) -> Result<(), BitmexWsError> {
872        let topic = BitmexWsTopic::OrderBook10;
873        let symbol = instrument_id.symbol.inner();
874        self.subscribe(vec![format!("{topic}:{symbol}")]).await
875    }
876
877    /// Subscribe to quote updates for the specified instrument.
878    ///
879    /// Note: Index symbols (starting with '.') do not have quotes and will be silently ignored.
880    ///
881    /// # Errors
882    ///
883    /// Returns an error if the WebSocket is not connected or if the subscription fails.
884    pub async fn subscribe_quotes(&self, instrument_id: InstrumentId) -> Result<(), BitmexWsError> {
885        let symbol = instrument_id.symbol.inner();
886
887        // Index symbols don't have quotes (bid/ask), only a single price
888        if is_index_symbol(&instrument_id.symbol.inner()) {
889            log::warn!("Ignoring quote subscription for index symbol: {symbol}");
890            return Ok(());
891        }
892
893        let topic = BitmexWsTopic::Quote;
894        self.subscribe(vec![format!("{topic}:{symbol}")]).await
895    }
896
897    /// Subscribe to trade updates for the specified instrument.
898    ///
899    /// Note: Index symbols (starting with '.') do not have trades and will be silently ignored.
900    ///
901    /// # Errors
902    ///
903    /// Returns an error if the WebSocket is not connected or if the subscription fails.
904    pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> Result<(), BitmexWsError> {
905        let symbol = instrument_id.symbol.inner();
906
907        // Index symbols don't have trades
908        if is_index_symbol(&symbol) {
909            log::warn!("Ignoring trade subscription for index symbol: {symbol}");
910            return Ok(());
911        }
912
913        let topic = BitmexWsTopic::Trade;
914        self.subscribe(vec![format!("{topic}:{symbol}")]).await
915    }
916
917    /// Subscribe to mark price updates for the specified instrument.
918    ///
919    /// # Errors
920    ///
921    /// Returns an error if the WebSocket is not connected or if the subscription fails.
922    pub async fn subscribe_mark_prices(
923        &self,
924        instrument_id: InstrumentId,
925    ) -> Result<(), BitmexWsError> {
926        self.subscribe_instrument(instrument_id).await
927    }
928
929    /// Subscribe to index price updates for the specified instrument.
930    ///
931    /// # Errors
932    ///
933    /// Returns an error if the WebSocket is not connected or if the subscription fails.
934    pub async fn subscribe_index_prices(
935        &self,
936        instrument_id: InstrumentId,
937    ) -> Result<(), BitmexWsError> {
938        self.subscribe_instrument(instrument_id).await
939    }
940
941    /// Subscribe to funding rate updates for the specified instrument.
942    ///
943    /// # Errors
944    ///
945    /// Returns an error if the WebSocket is not connected or if the subscription fails.
946    pub async fn subscribe_funding_rates(
947        &self,
948        instrument_id: InstrumentId,
949    ) -> Result<(), BitmexWsError> {
950        let topic = BitmexWsTopic::Funding;
951        let symbol = instrument_id.symbol.inner();
952        self.subscribe(vec![format!("{topic}:{symbol}")]).await
953    }
954
955    /// Subscribe to bar updates for the specified bar type.
956    ///
957    /// # Errors
958    ///
959    /// Returns an error if the WebSocket is not connected or if the subscription fails.
960    pub async fn subscribe_bars(&self, bar_type: BarType) -> Result<(), BitmexWsError> {
961        let topic = topic_from_bar_spec(bar_type.spec());
962        let symbol = bar_type.instrument_id().symbol.inner();
963        self.subscribe(vec![format!("{topic}:{symbol}")]).await
964    }
965
966    /// Unsubscribe from instrument updates for all instruments on the venue.
967    ///
968    /// # Errors
969    ///
970    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
971    pub async fn unsubscribe_instruments(&self) -> Result<(), BitmexWsError> {
972        // No-op: instruments are required for proper operation
973        log::debug!(
974            "Instruments subscription maintained for proper operation, skipping unsubscribe"
975        );
976        Ok(())
977    }
978
979    /// Unsubscribe from instrument updates (mark/index prices) for the specified instrument.
980    ///
981    /// # Errors
982    ///
983    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
984    pub async fn unsubscribe_instrument(
985        &self,
986        instrument_id: InstrumentId,
987    ) -> Result<(), BitmexWsError> {
988        // No-op: instruments are required for proper operation
989        log::debug!(
990            "Instruments subscription maintained for proper operation (includes {instrument_id}), skipping unsubscribe"
991        );
992        Ok(())
993    }
994
995    /// Unsubscribe from order book updates for the specified instrument.
996    ///
997    /// # Errors
998    ///
999    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1000    pub async fn unsubscribe_book(&self, instrument_id: InstrumentId) -> Result<(), BitmexWsError> {
1001        let topic = BitmexWsTopic::OrderBookL2;
1002        let symbol = instrument_id.symbol.inner();
1003        self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
1004    }
1005
1006    /// Unsubscribe from order book L2 (25 levels) updates for the specified instrument.
1007    ///
1008    /// # Errors
1009    ///
1010    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1011    pub async fn unsubscribe_book_25(
1012        &self,
1013        instrument_id: InstrumentId,
1014    ) -> Result<(), BitmexWsError> {
1015        let topic = BitmexWsTopic::OrderBookL2_25;
1016        let symbol = instrument_id.symbol.inner();
1017        self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
1018    }
1019
1020    /// Unsubscribe from order book depth 10 updates for the specified instrument.
1021    ///
1022    /// # Errors
1023    ///
1024    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1025    pub async fn unsubscribe_book_depth10(
1026        &self,
1027        instrument_id: InstrumentId,
1028    ) -> Result<(), BitmexWsError> {
1029        let topic = BitmexWsTopic::OrderBook10;
1030        let symbol = instrument_id.symbol.inner();
1031        self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
1032    }
1033
1034    /// Unsubscribe from quote updates for the specified instrument.
1035    ///
1036    /// # Errors
1037    ///
1038    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1039    pub async fn unsubscribe_quotes(
1040        &self,
1041        instrument_id: InstrumentId,
1042    ) -> Result<(), BitmexWsError> {
1043        let symbol = instrument_id.symbol.inner();
1044
1045        // Index symbols don't have quotes
1046        if is_index_symbol(&symbol) {
1047            return Ok(());
1048        }
1049
1050        let topic = BitmexWsTopic::Quote;
1051        self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
1052    }
1053
1054    /// Unsubscribe from trade updates for the specified instrument.
1055    ///
1056    /// # Errors
1057    ///
1058    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1059    pub async fn unsubscribe_trades(
1060        &self,
1061        instrument_id: InstrumentId,
1062    ) -> Result<(), BitmexWsError> {
1063        let symbol = instrument_id.symbol.inner();
1064
1065        // Index symbols don't have trades
1066        if is_index_symbol(&symbol) {
1067            return Ok(());
1068        }
1069
1070        let topic = BitmexWsTopic::Trade;
1071        self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
1072    }
1073
1074    /// Unsubscribe from mark price updates for the specified instrument.
1075    ///
1076    /// # Errors
1077    ///
1078    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1079    pub async fn unsubscribe_mark_prices(
1080        &self,
1081        instrument_id: InstrumentId,
1082    ) -> Result<(), BitmexWsError> {
1083        // No-op: instrument channel shared with index prices
1084        log::debug!(
1085            "Mark prices for {instrument_id} uses shared instrument channel, skipping unsubscribe"
1086        );
1087        Ok(())
1088    }
1089
1090    /// Unsubscribe from index price updates for the specified instrument.
1091    ///
1092    /// # Errors
1093    ///
1094    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1095    pub async fn unsubscribe_index_prices(
1096        &self,
1097        instrument_id: InstrumentId,
1098    ) -> Result<(), BitmexWsError> {
1099        // No-op: instrument channel shared with mark prices
1100        log::debug!(
1101            "Index prices for {instrument_id} uses shared instrument channel, skipping unsubscribe"
1102        );
1103        Ok(())
1104    }
1105
1106    /// Unsubscribe from funding rate updates for the specified instrument.
1107    ///
1108    /// # Errors
1109    ///
1110    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1111    pub async fn unsubscribe_funding_rates(
1112        &self,
1113        instrument_id: InstrumentId,
1114    ) -> Result<(), BitmexWsError> {
1115        // No-op: unsubscribing during shutdown causes race conditions
1116        log::debug!(
1117            "Funding rates for {instrument_id}, skipping unsubscribe to avoid shutdown race"
1118        );
1119        Ok(())
1120    }
1121
1122    /// Unsubscribe from bar updates for the specified bar type.
1123    ///
1124    /// # Errors
1125    ///
1126    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1127    pub async fn unsubscribe_bars(&self, bar_type: BarType) -> Result<(), BitmexWsError> {
1128        let topic = topic_from_bar_spec(bar_type.spec());
1129        let symbol = bar_type.instrument_id().symbol.inner();
1130        self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
1131    }
1132
1133    /// Subscribe to order updates for the authenticated account.
1134    ///
1135    /// # Errors
1136    ///
1137    /// Returns an error if the WebSocket is not connected, not authenticated, or if the subscription fails.
1138    pub async fn subscribe_orders(&self) -> Result<(), BitmexWsError> {
1139        if self.credential.is_none() {
1140            return Err(BitmexWsError::MissingCredentials);
1141        }
1142        self.subscribe(vec![BitmexWsAuthChannel::Order.to_string()])
1143            .await
1144    }
1145
1146    /// Subscribe to execution updates for the authenticated account.
1147    ///
1148    /// # Errors
1149    ///
1150    /// Returns an error if the WebSocket is not connected, not authenticated, or if the subscription fails.
1151    pub async fn subscribe_executions(&self) -> Result<(), BitmexWsError> {
1152        if self.credential.is_none() {
1153            return Err(BitmexWsError::MissingCredentials);
1154        }
1155        self.subscribe(vec![BitmexWsAuthChannel::Execution.to_string()])
1156            .await
1157    }
1158
1159    /// Subscribe to position updates for the authenticated account.
1160    ///
1161    /// # Errors
1162    ///
1163    /// Returns an error if the WebSocket is not connected, not authenticated, or if the subscription fails.
1164    pub async fn subscribe_positions(&self) -> Result<(), BitmexWsError> {
1165        if self.credential.is_none() {
1166            return Err(BitmexWsError::MissingCredentials);
1167        }
1168        self.subscribe(vec![BitmexWsAuthChannel::Position.to_string()])
1169            .await
1170    }
1171
1172    /// Subscribe to margin updates for the authenticated account.
1173    ///
1174    /// # Errors
1175    ///
1176    /// Returns an error if the WebSocket is not connected, not authenticated, or if the subscription fails.
1177    pub async fn subscribe_margin(&self) -> Result<(), BitmexWsError> {
1178        if self.credential.is_none() {
1179            return Err(BitmexWsError::MissingCredentials);
1180        }
1181        self.subscribe(vec![BitmexWsAuthChannel::Margin.to_string()])
1182            .await
1183    }
1184
1185    /// Subscribe to wallet updates for the authenticated account.
1186    ///
1187    /// # Errors
1188    ///
1189    /// Returns an error if the WebSocket is not connected, not authenticated, or if the subscription fails.
1190    pub async fn subscribe_wallet(&self) -> Result<(), BitmexWsError> {
1191        if self.credential.is_none() {
1192            return Err(BitmexWsError::MissingCredentials);
1193        }
1194        self.subscribe(vec![BitmexWsAuthChannel::Wallet.to_string()])
1195            .await
1196    }
1197
1198    /// Unsubscribe from order updates for the authenticated account.
1199    ///
1200    /// # Errors
1201    ///
1202    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1203    pub async fn unsubscribe_orders(&self) -> Result<(), BitmexWsError> {
1204        self.unsubscribe(vec![BitmexWsAuthChannel::Order.to_string()])
1205            .await
1206    }
1207
1208    /// Unsubscribe from execution updates for the authenticated account.
1209    ///
1210    /// # Errors
1211    ///
1212    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1213    pub async fn unsubscribe_executions(&self) -> Result<(), BitmexWsError> {
1214        self.unsubscribe(vec![BitmexWsAuthChannel::Execution.to_string()])
1215            .await
1216    }
1217
1218    /// Unsubscribe from position updates for the authenticated account.
1219    ///
1220    /// # Errors
1221    ///
1222    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1223    pub async fn unsubscribe_positions(&self) -> Result<(), BitmexWsError> {
1224        self.unsubscribe(vec![BitmexWsAuthChannel::Position.to_string()])
1225            .await
1226    }
1227
1228    /// Unsubscribe from margin updates for the authenticated account.
1229    ///
1230    /// # Errors
1231    ///
1232    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1233    pub async fn unsubscribe_margin(&self) -> Result<(), BitmexWsError> {
1234        self.unsubscribe(vec![BitmexWsAuthChannel::Margin.to_string()])
1235            .await
1236    }
1237
1238    /// Unsubscribe from wallet updates for the authenticated account.
1239    ///
1240    /// # Errors
1241    ///
1242    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1243    pub async fn unsubscribe_wallet(&self) -> Result<(), BitmexWsError> {
1244        self.unsubscribe(vec![BitmexWsAuthChannel::Wallet.to_string()])
1245            .await
1246    }
1247
1248    /// Sends a command to the handler.
1249    async fn send_cmd(&self, cmd: HandlerCommand) -> Result<(), BitmexWsError> {
1250        self.cmd_tx
1251            .read()
1252            .await
1253            .send(cmd)
1254            .map_err(|e| BitmexWsError::ClientError(format!("Handler not available: {e}")))
1255    }
1256}
1257
1258#[cfg(test)]
1259mod tests {
1260    use ahash::AHashSet;
1261    use rstest::rstest;
1262    use ustr::Ustr;
1263
1264    use super::*;
1265
1266    #[rstest]
1267    fn test_reconnect_topics_restoration_logic() {
1268        // Create real client with credentials
1269        let client = BitmexWebSocketClient::new(
1270            Some("ws://test.com".to_string()),
1271            Some("test_key".to_string()),
1272            Some("test_secret".to_string()),
1273            Some(AccountId::new("BITMEX-TEST")),
1274            None,
1275        )
1276        .unwrap();
1277
1278        // Populate subscriptions like they would be during normal operation
1279        let subs = client.subscriptions.confirmed();
1280        subs.insert(Ustr::from(BitmexWsTopic::Trade.as_ref()), {
1281            let mut set = AHashSet::new();
1282            set.insert(Ustr::from("XBTUSD"));
1283            set.insert(Ustr::from("ETHUSD"));
1284            set
1285        });
1286
1287        subs.insert(Ustr::from(BitmexWsTopic::OrderBookL2.as_ref()), {
1288            let mut set = AHashSet::new();
1289            set.insert(Ustr::from("XBTUSD"));
1290            set
1291        });
1292
1293        // Private channels (no symbols)
1294        subs.insert(Ustr::from(BitmexWsAuthChannel::Order.as_ref()), {
1295            let mut set = AHashSet::new();
1296            set.insert(Ustr::from(""));
1297            set
1298        });
1299        subs.insert(Ustr::from(BitmexWsAuthChannel::Position.as_ref()), {
1300            let mut set = AHashSet::new();
1301            set.insert(Ustr::from(""));
1302            set
1303        });
1304
1305        // Test the actual reconnection topic building logic
1306        let mut topics_to_restore = Vec::new();
1307        for entry in subs.iter() {
1308            let (channel, symbols) = entry.pair();
1309            for symbol in symbols {
1310                if symbol.is_empty() {
1311                    topics_to_restore.push(channel.to_string());
1312                } else {
1313                    topics_to_restore.push(format!("{channel}:{symbol}"));
1314                }
1315            }
1316        }
1317
1318        // Verify it builds the correct restoration topics
1319        assert!(topics_to_restore.contains(&format!("{}:XBTUSD", BitmexWsTopic::Trade.as_ref())));
1320        assert!(topics_to_restore.contains(&format!("{}:ETHUSD", BitmexWsTopic::Trade.as_ref())));
1321        assert!(
1322            topics_to_restore.contains(&format!("{}:XBTUSD", BitmexWsTopic::OrderBookL2.as_ref()))
1323        );
1324        assert!(topics_to_restore.contains(&BitmexWsAuthChannel::Order.as_ref().to_string()));
1325        assert!(topics_to_restore.contains(&BitmexWsAuthChannel::Position.as_ref().to_string()));
1326        assert_eq!(topics_to_restore.len(), 5);
1327    }
1328
1329    #[rstest]
1330    fn test_reconnect_auth_message_building() {
1331        // Test with credentials
1332        let client_with_creds = BitmexWebSocketClient::new(
1333            Some("ws://test.com".to_string()),
1334            Some("test_key".to_string()),
1335            Some("test_secret".to_string()),
1336            Some(AccountId::new("BITMEX-TEST")),
1337            None,
1338        )
1339        .unwrap();
1340
1341        // Test the actual auth message building logic from lines 220-228
1342        if let Some(cred) = &client_with_creds.credential {
1343            let expires = (chrono::Utc::now() + chrono::Duration::seconds(30)).timestamp();
1344            let signature = cred.sign("GET", "/realtime", expires, "");
1345
1346            let auth_message = BitmexAuthentication {
1347                op: BitmexWsAuthAction::AuthKeyExpires,
1348                args: (cred.api_key.to_string(), expires, signature),
1349            };
1350
1351            // Verify auth message structure
1352            assert_eq!(auth_message.op, BitmexWsAuthAction::AuthKeyExpires);
1353            assert_eq!(auth_message.args.0, "test_key");
1354            assert!(auth_message.args.1 > 0); // expires should be positive
1355            assert!(!auth_message.args.2.is_empty()); // signature should exist
1356        } else {
1357            panic!("Client should have credentials");
1358        }
1359
1360        // Test without credentials
1361        let client_no_creds = BitmexWebSocketClient::new(
1362            Some("ws://test.com".to_string()),
1363            None,
1364            None,
1365            Some(AccountId::new("BITMEX-TEST")),
1366            None,
1367        )
1368        .unwrap();
1369
1370        assert!(client_no_creds.credential.is_none());
1371    }
1372
1373    #[rstest]
1374    fn test_subscription_state_after_unsubscribe() {
1375        let client = BitmexWebSocketClient::new(
1376            Some("ws://test.com".to_string()),
1377            Some("test_key".to_string()),
1378            Some("test_secret".to_string()),
1379            Some(AccountId::new("BITMEX-TEST")),
1380            None,
1381        )
1382        .unwrap();
1383
1384        // Set up initial subscriptions
1385        let subs = client.subscriptions.confirmed();
1386        subs.insert(Ustr::from(BitmexWsTopic::Trade.as_ref()), {
1387            let mut set = AHashSet::new();
1388            set.insert(Ustr::from("XBTUSD"));
1389            set.insert(Ustr::from("ETHUSD"));
1390            set
1391        });
1392
1393        subs.insert(Ustr::from(BitmexWsTopic::OrderBookL2.as_ref()), {
1394            let mut set = AHashSet::new();
1395            set.insert(Ustr::from("XBTUSD"));
1396            set
1397        });
1398
1399        // Simulate unsubscribe logic (like from unsubscribe() method lines 586-599)
1400        let topic = format!("{}:ETHUSD", BitmexWsTopic::Trade.as_ref());
1401        if let Some((channel, symbol)) = topic.split_once(':')
1402            && let Some(mut entry) = subs.get_mut(&Ustr::from(channel))
1403        {
1404            entry.remove(&Ustr::from(symbol));
1405            if entry.is_empty() {
1406                drop(entry);
1407                subs.remove(&Ustr::from(channel));
1408            }
1409        }
1410
1411        // Build restoration topics after unsubscribe
1412        let mut topics_to_restore = Vec::new();
1413        for entry in subs.iter() {
1414            let (channel, symbols) = entry.pair();
1415            for symbol in symbols {
1416                if symbol.is_empty() {
1417                    topics_to_restore.push(channel.to_string());
1418                } else {
1419                    topics_to_restore.push(format!("{channel}:{symbol}"));
1420                }
1421            }
1422        }
1423
1424        // Should have XBTUSD trade but not ETHUSD trade
1425        let trade_xbt = format!("{}:XBTUSD", BitmexWsTopic::Trade.as_ref());
1426        let trade_eth = format!("{}:ETHUSD", BitmexWsTopic::Trade.as_ref());
1427        let book_xbt = format!("{}:XBTUSD", BitmexWsTopic::OrderBookL2.as_ref());
1428
1429        assert!(topics_to_restore.contains(&trade_xbt));
1430        assert!(!topics_to_restore.contains(&trade_eth));
1431        assert!(topics_to_restore.contains(&book_xbt));
1432        assert_eq!(topics_to_restore.len(), 2);
1433    }
1434
1435    #[rstest]
1436    fn test_race_unsubscribe_failure_recovery() {
1437        // Simulates the race condition where venue rejects an unsubscribe request.
1438        // The adapter must perform the 3-step recovery:
1439        // 1. confirm_unsubscribe() - clear pending_unsubscribe
1440        // 2. mark_subscribe() - mark as subscribing again
1441        // 3. confirm_subscribe() - restore to confirmed state
1442        let client = BitmexWebSocketClient::new(
1443            Some("ws://test.com".to_string()),
1444            None,
1445            None,
1446            Some(AccountId::new("BITMEX-TEST")),
1447            None,
1448        )
1449        .unwrap();
1450
1451        let topic = format!("{}:XBTUSD", BitmexWsTopic::Trade.as_ref());
1452
1453        // Initial subscribe flow
1454        client.subscriptions.mark_subscribe(&topic);
1455        client.subscriptions.confirm_subscribe(&topic);
1456        assert_eq!(client.subscriptions.len(), 1);
1457
1458        // User unsubscribes
1459        client.subscriptions.mark_unsubscribe(&topic);
1460        assert_eq!(client.subscriptions.len(), 0);
1461        assert_eq!(
1462            client.subscriptions.pending_unsubscribe_topics(),
1463            vec![topic.clone()]
1464        );
1465
1466        // Venue REJECTS the unsubscribe (error message)
1467        // Adapter must perform 3-step recovery (from lines 1884-1891)
1468        client.subscriptions.confirm_unsubscribe(&topic); // Step 1: clear pending_unsubscribe
1469        client.subscriptions.mark_subscribe(&topic); // Step 2: mark as subscribing
1470        client.subscriptions.confirm_subscribe(&topic); // Step 3: confirm subscription
1471
1472        // Verify recovery: topic should be back in confirmed state
1473        assert_eq!(client.subscriptions.len(), 1);
1474        assert!(client.subscriptions.pending_unsubscribe_topics().is_empty());
1475        assert!(client.subscriptions.pending_subscribe_topics().is_empty());
1476
1477        // Verify topic is in all_topics() for reconnect
1478        let all = client.subscriptions.all_topics();
1479        assert_eq!(all.len(), 1);
1480        assert!(all.contains(&topic));
1481    }
1482
1483    #[rstest]
1484    fn test_race_resubscribe_before_unsubscribe_ack() {
1485        // Simulates: User unsubscribes, then immediately resubscribes before
1486        // the unsubscribe ACK arrives from the venue.
1487        // This is the race condition fixed in the subscription tracker.
1488        let client = BitmexWebSocketClient::new(
1489            Some("ws://test.com".to_string()),
1490            None,
1491            None,
1492            Some(AccountId::new("BITMEX-TEST")),
1493            None,
1494        )
1495        .unwrap();
1496
1497        let topic = format!("{}:XBTUSD", BitmexWsTopic::OrderBookL2.as_ref());
1498
1499        // Initial subscribe
1500        client.subscriptions.mark_subscribe(&topic);
1501        client.subscriptions.confirm_subscribe(&topic);
1502        assert_eq!(client.subscriptions.len(), 1);
1503
1504        // User unsubscribes
1505        client.subscriptions.mark_unsubscribe(&topic);
1506        assert_eq!(client.subscriptions.len(), 0);
1507        assert_eq!(
1508            client.subscriptions.pending_unsubscribe_topics(),
1509            vec![topic.clone()]
1510        );
1511
1512        // User immediately changes mind and resubscribes (before unsubscribe ACK)
1513        client.subscriptions.mark_subscribe(&topic);
1514        assert_eq!(
1515            client.subscriptions.pending_subscribe_topics(),
1516            vec![topic.clone()]
1517        );
1518
1519        // NOW the unsubscribe ACK arrives - should NOT clear pending_subscribe
1520        client.subscriptions.confirm_unsubscribe(&topic);
1521        assert!(client.subscriptions.pending_unsubscribe_topics().is_empty());
1522        assert_eq!(
1523            client.subscriptions.pending_subscribe_topics(),
1524            vec![topic.clone()]
1525        );
1526
1527        // Subscribe ACK arrives
1528        client.subscriptions.confirm_subscribe(&topic);
1529        assert_eq!(client.subscriptions.len(), 1);
1530        assert!(client.subscriptions.pending_subscribe_topics().is_empty());
1531
1532        // Verify final state is correct
1533        let all = client.subscriptions.all_topics();
1534        assert_eq!(all.len(), 1);
1535        assert!(all.contains(&topic));
1536    }
1537
1538    #[rstest]
1539    fn test_race_channel_level_reconnection_with_pending_states() {
1540        // Simulates reconnection with mixed pending states including channel-level subscriptions.
1541        let client = BitmexWebSocketClient::new(
1542            Some("ws://test.com".to_string()),
1543            Some("test_key".to_string()),
1544            Some("test_secret".to_string()),
1545            Some(AccountId::new("BITMEX-TEST")),
1546            None,
1547        )
1548        .unwrap();
1549
1550        // Set up mixed state before reconnection
1551        // Confirmed: trade:XBTUSD
1552        let trade_xbt = format!("{}:XBTUSD", BitmexWsTopic::Trade.as_ref());
1553        client.subscriptions.mark_subscribe(&trade_xbt);
1554        client.subscriptions.confirm_subscribe(&trade_xbt);
1555
1556        // Confirmed: order (channel-level, no symbol)
1557        let order_channel = BitmexWsAuthChannel::Order.as_ref();
1558        client.subscriptions.mark_subscribe(order_channel);
1559        client.subscriptions.confirm_subscribe(order_channel);
1560
1561        // Pending subscribe: trade:ETHUSD
1562        let trade_eth = format!("{}:ETHUSD", BitmexWsTopic::Trade.as_ref());
1563        client.subscriptions.mark_subscribe(&trade_eth);
1564
1565        // Pending unsubscribe: orderBookL2:XBTUSD (user cancelled)
1566        let book_xbt = format!("{}:XBTUSD", BitmexWsTopic::OrderBookL2.as_ref());
1567        client.subscriptions.mark_subscribe(&book_xbt);
1568        client.subscriptions.confirm_subscribe(&book_xbt);
1569        client.subscriptions.mark_unsubscribe(&book_xbt);
1570
1571        // Get topics for reconnection
1572        let topics_to_restore = client.subscriptions.all_topics();
1573
1574        // Should include: confirmed + pending_subscribe (NOT pending_unsubscribe)
1575        assert_eq!(topics_to_restore.len(), 3);
1576        assert!(topics_to_restore.contains(&trade_xbt));
1577        assert!(topics_to_restore.contains(&order_channel.to_string()));
1578        assert!(topics_to_restore.contains(&trade_eth));
1579        assert!(!topics_to_restore.contains(&book_xbt)); // Excluded
1580
1581        // Verify channel-level marker is handled correctly
1582        // order channel should not have ':' delimiter
1583        for topic in &topics_to_restore {
1584            if topic == order_channel {
1585                assert!(
1586                    !topic.contains(':'),
1587                    "Channel-level topic should not have delimiter"
1588                );
1589            }
1590        }
1591    }
1592}