nautilus_network/websocket/
subscription.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Generic subscription state tracking for WebSocket clients.
17//!
18//! This module provides a robust subscription tracker that maintains confirmed and pending
19//! subscription states with reference counting support. It follows a proven pattern used in
20//! production.
21//!
22//! # Key Features
23//!
24//! - **Three-state tracking**: confirmed, pending_subscribe, pending_unsubscribe.
25//! - **Reference counting**: Prevents duplicate subscribe/unsubscribe messages.
26//! - **Reconnection support**: `all_topics()` returns topics to resubscribe after reconnect.
27//! - **Configurable delimiter**: Supports different topic formats (`.` or `:` etc.).
28//!
29//! # Topic Format
30//!
31//! Topics are strings in the format `channel{delimiter}symbol`:
32//! - Dot delimiter: `tickers.BTCUSDT`
33//! - Colon delimiter: `trades:BTC-USDT`
34//!
35//! Channels without symbols are also supported (e.g., `execution` for all instruments).
36
37use std::{
38    num::NonZeroUsize,
39    sync::{Arc, LazyLock},
40};
41
42use ahash::AHashSet;
43use dashmap::DashMap;
44use ustr::Ustr;
45
46/// Marker for channel-level subscriptions (no specific symbol).
47///
48/// An empty string in the symbol set indicates a channel-level subscription
49/// that applies to all symbols for that channel.
50pub(crate) static CHANNEL_LEVEL_MARKER: LazyLock<Ustr> = LazyLock::new(|| Ustr::from(""));
51
52/// Generic subscription state tracker for WebSocket connections.
53///
54/// Maintains three separate states for subscriptions:
55/// - **Confirmed**: Successfully subscribed and actively streaming data.
56/// - **Pending Subscribe**: Subscription requested but not yet confirmed by server.
57/// - **Pending Unsubscribe**: Unsubscription requested but not yet confirmed by server.
58///
59/// # Reference Counting
60///
61/// The tracker maintains reference counts for each topic. When multiple components
62/// subscribe to the same topic, only the first subscription sends a message to the
63/// server. Similarly, only the last unsubscription sends an unsubscribe message.
64///
65/// # Thread Safety
66///
67/// All operations are thread-safe and can be called concurrently from multiple tasks.
68#[derive(Clone, Debug)]
69pub struct SubscriptionState {
70    /// Confirmed active subscriptions.
71    confirmed: Arc<DashMap<Ustr, AHashSet<Ustr>>>,
72    /// Pending subscribe requests awaiting server confirmation.
73    pending_subscribe: Arc<DashMap<Ustr, AHashSet<Ustr>>>,
74    /// Pending unsubscribe requests awaiting server confirmation.
75    pending_unsubscribe: Arc<DashMap<Ustr, AHashSet<Ustr>>>,
76    /// Reference counts for topics to prevent duplicate messages.
77    reference_counts: Arc<DashMap<Ustr, NonZeroUsize>>,
78    /// Topic delimiter character (e.g., '.' or ':').
79    delimiter: char,
80}
81
82impl SubscriptionState {
83    /// Creates a new subscription state tracker with the specified topic delimiter.
84    pub fn new(delimiter: char) -> Self {
85        Self {
86            confirmed: Arc::new(DashMap::new()),
87            pending_subscribe: Arc::new(DashMap::new()),
88            pending_unsubscribe: Arc::new(DashMap::new()),
89            reference_counts: Arc::new(DashMap::new()),
90            delimiter,
91        }
92    }
93
94    /// Returns the delimiter character used for topic splitting.
95    pub fn delimiter(&self) -> char {
96        self.delimiter
97    }
98
99    /// Returns a clone of the confirmed subscriptions map.
100    pub fn confirmed(&self) -> Arc<DashMap<Ustr, AHashSet<Ustr>>> {
101        Arc::clone(&self.confirmed)
102    }
103
104    /// Returns a clone of the pending subscribe map.
105    pub fn pending_subscribe(&self) -> Arc<DashMap<Ustr, AHashSet<Ustr>>> {
106        Arc::clone(&self.pending_subscribe)
107    }
108
109    /// Returns a clone of the pending unsubscribe map.
110    pub fn pending_unsubscribe(&self) -> Arc<DashMap<Ustr, AHashSet<Ustr>>> {
111        Arc::clone(&self.pending_unsubscribe)
112    }
113
114    /// Returns the number of confirmed subscriptions.
115    ///
116    /// Counts both channel-level and symbol-level subscriptions.
117    pub fn len(&self) -> usize {
118        self.confirmed.iter().map(|entry| entry.value().len()).sum()
119    }
120
121    /// Returns true if there are no subscriptions (confirmed or pending).
122    pub fn is_empty(&self) -> bool {
123        self.confirmed.is_empty()
124            && self.pending_subscribe.is_empty()
125            && self.pending_unsubscribe.is_empty()
126    }
127
128    /// Marks a topic as pending subscription.
129    ///
130    /// This should be called after sending a subscribe request to the server.
131    /// Idempotent: if topic is already confirmed, this is a no-op.
132    /// If topic is pending unsubscription, removes it.
133    pub fn mark_subscribe(&self, topic: &str) {
134        let (channel, symbol) = split_topic(topic, self.delimiter);
135
136        // If already confirmed, don't re-add to pending (idempotent)
137        if is_tracked(&self.confirmed, channel, symbol) {
138            return;
139        }
140
141        // Remove from pending_unsubscribe if present
142        untrack_topic(&self.pending_unsubscribe, channel, symbol);
143
144        track_topic(&self.pending_subscribe, channel, symbol);
145    }
146
147    /// Marks a topic as pending unsubscription.
148    ///
149    /// This removes the topic from both confirmed and pending_subscribe,
150    /// then adds it to pending_unsubscribe. This handles the case where
151    /// a user unsubscribes before the initial subscription is confirmed.
152    pub fn mark_unsubscribe(&self, topic: &str) {
153        let (channel, symbol) = split_topic(topic, self.delimiter);
154        track_topic(&self.pending_unsubscribe, channel, symbol);
155        untrack_topic(&self.confirmed, channel, symbol);
156        untrack_topic(&self.pending_subscribe, channel, symbol);
157    }
158
159    /// Confirms a subscription by moving it from pending to confirmed.
160    ///
161    /// This should be called when the server acknowledges a subscribe request.
162    /// Ignores the confirmation if the topic is pending unsubscription (handles
163    /// late confirmations after user has already unsubscribed).
164    pub fn confirm_subscribe(&self, topic: &str) {
165        let (channel, symbol) = split_topic(topic, self.delimiter);
166
167        // Ignore late confirmations if topic is pending unsubscribe
168        if is_tracked(&self.pending_unsubscribe, channel, symbol) {
169            return;
170        }
171
172        untrack_topic(&self.pending_subscribe, channel, symbol);
173        track_topic(&self.confirmed, channel, symbol);
174    }
175
176    /// Confirms an unsubscription by removing it from pending and confirmed state.
177    ///
178    /// This should be called when the server acknowledges an unsubscribe request.
179    /// Removes the topic from pending_unsubscribe and confirmed.
180    /// Does NOT clear pending_subscribe to support immediate re-subscribe patterns
181    /// (e.g., user calls subscribe() before unsubscribe ack arrives).
182    ///
183    /// **Stale ACK handling**: Ignores unsubscribe ACKs if the topic is no longer
184    /// in pending_unsubscribe (meaning user has already re-subscribed). This prevents
185    /// stale ACKs from removing topics that were re-confirmed after the re-subscribe.
186    pub fn confirm_unsubscribe(&self, topic: &str) {
187        let (channel, symbol) = split_topic(topic, self.delimiter);
188
189        // Only process if topic is actually pending unsubscription
190        // This ignores stale unsubscribe ACKs after user has re-subscribed
191        if !is_tracked(&self.pending_unsubscribe, channel, symbol) {
192            return; // Stale ACK, ignore
193        }
194
195        untrack_topic(&self.pending_unsubscribe, channel, symbol);
196        untrack_topic(&self.confirmed, channel, symbol);
197        // Don't clear pending_subscribe - it's a valid re-subscribe request
198    }
199
200    /// Marks a subscription as failed, moving it from confirmed back to pending.
201    ///
202    /// This is useful when a subscription fails but should be retried on reconnect.
203    /// Ignores the failure if the topic is pending unsubscription (user cancelled it).
204    pub fn mark_failure(&self, topic: &str) {
205        let (channel, symbol) = split_topic(topic, self.delimiter);
206
207        // Ignore failures for topics being unsubscribed
208        if is_tracked(&self.pending_unsubscribe, channel, symbol) {
209            return;
210        }
211
212        untrack_topic(&self.confirmed, channel, symbol);
213        track_topic(&self.pending_subscribe, channel, symbol);
214    }
215
216    /// Returns all pending subscribe topics as strings.
217    pub fn pending_subscribe_topics(&self) -> Vec<String> {
218        self.topics_from_map(&self.pending_subscribe)
219    }
220
221    /// Returns all pending unsubscribe topics as strings.
222    pub fn pending_unsubscribe_topics(&self) -> Vec<String> {
223        self.topics_from_map(&self.pending_unsubscribe)
224    }
225
226    /// Returns all topics that should be active (confirmed + pending_subscribe).
227    ///
228    /// This is the key method for reconnection: it returns all topics that should
229    /// be resubscribed after a connection is re-established.
230    ///
231    /// Note: Does NOT include pending_unsubscribe topics, as those are being removed.
232    pub fn all_topics(&self) -> Vec<String> {
233        let mut topics = Vec::new();
234        topics.extend(self.topics_from_map(&self.confirmed));
235        topics.extend(self.topics_from_map(&self.pending_subscribe));
236        topics
237    }
238
239    /// Helper to convert a map to topic strings.
240    fn topics_from_map(&self, map: &DashMap<Ustr, AHashSet<Ustr>>) -> Vec<String> {
241        let mut topics = Vec::new();
242        let marker = *CHANNEL_LEVEL_MARKER;
243
244        for entry in map.iter() {
245            let channel = entry.key();
246            let symbols = entry.value();
247
248            // Check for channel-level subscription marker
249            if symbols.contains(&marker) {
250                topics.push(channel.to_string());
251            }
252
253            // Add symbol-level subscriptions (skip marker)
254            for symbol in symbols.iter() {
255                if *symbol != marker {
256                    topics.push(format!(
257                        "{}{}{}",
258                        channel.as_str(),
259                        self.delimiter,
260                        symbol.as_str()
261                    ));
262                }
263            }
264        }
265
266        topics
267    }
268
269    /// Increments the reference count for a topic.
270    ///
271    /// Returns `true` if this is the first subscription (caller should send subscribe
272    /// message to server).
273    ///
274    /// # Panics
275    ///
276    /// Panics if the reference count exceeds `usize::MAX` subscriptions for a single topic.
277    pub fn add_reference(&self, topic: &str) -> bool {
278        let mut should_subscribe = false;
279        let topic_ustr = Ustr::from(topic);
280
281        self.reference_counts
282            .entry(topic_ustr)
283            .and_modify(|count| {
284                *count = NonZeroUsize::new(count.get() + 1).expect("reference count overflow");
285            })
286            .or_insert_with(|| {
287                should_subscribe = true;
288                NonZeroUsize::new(1).expect("NonZeroUsize::new(1) should never fail")
289            });
290
291        should_subscribe
292    }
293
294    /// Decrements the reference count for a topic.
295    ///
296    /// Returns `true` if this was the last subscription (caller should send unsubscribe
297    /// message to server).
298    ///
299    /// # Panics
300    ///
301    /// Panics if the internal reference count state becomes inconsistent (should never happen
302    /// if the API is used correctly).
303    pub fn remove_reference(&self, topic: &str) -> bool {
304        let topic_ustr = Ustr::from(topic);
305
306        // Use entry API to atomically decrement and remove if zero
307        // This prevents race where another thread adds a reference between the check and remove
308        if let dashmap::mapref::entry::Entry::Occupied(mut entry) =
309            self.reference_counts.entry(topic_ustr)
310        {
311            let current = entry.get().get();
312
313            if current == 1 {
314                entry.remove();
315                return true;
316            }
317
318            *entry.get_mut() = NonZeroUsize::new(current - 1)
319                .expect("reference count should never reach zero here");
320        }
321
322        false
323    }
324
325    /// Returns the current reference count for a topic.
326    ///
327    /// Returns 0 if the topic has no references.
328    pub fn get_reference_count(&self, topic: &str) -> usize {
329        let topic_ustr = Ustr::from(topic);
330        self.reference_counts
331            .get(&topic_ustr)
332            .map_or(0, |count| count.get())
333    }
334
335    /// Clears all subscription state.
336    ///
337    /// This is useful when reconnecting or resetting the client.
338    pub fn clear(&self) {
339        self.confirmed.clear();
340        self.pending_subscribe.clear();
341        self.pending_unsubscribe.clear();
342        self.reference_counts.clear();
343    }
344}
345
346/// Splits a topic into channel and optional symbol using the specified delimiter.
347pub fn split_topic(topic: &str, delimiter: char) -> (&str, Option<&str>) {
348    topic
349        .split_once(delimiter)
350        .map_or((topic, None), |(channel, symbol)| (channel, Some(symbol)))
351}
352
353/// Tracks a topic in the given map by adding it to the channel's symbol set.
354///
355/// Channel-level subscriptions are stored using an empty string marker,
356/// allowing both channel-level and symbol-level subscriptions to coexist.
357fn track_topic(map: &DashMap<Ustr, AHashSet<Ustr>>, channel: &str, symbol: Option<&str>) {
358    let channel_ustr = Ustr::from(channel);
359    let mut entry = map.entry(channel_ustr).or_default();
360
361    if let Some(symbol) = symbol {
362        entry.insert(Ustr::from(symbol));
363    } else {
364        entry.insert(*CHANNEL_LEVEL_MARKER);
365    }
366}
367
368/// Removes a topic from the given map by removing it from the channel's symbol set.
369///
370/// Removes the entire channel entry if no subscriptions remain after removal.
371fn untrack_topic(map: &DashMap<Ustr, AHashSet<Ustr>>, channel: &str, symbol: Option<&str>) {
372    let channel_ustr = Ustr::from(channel);
373    let symbol_to_remove = if let Some(symbol) = symbol {
374        Ustr::from(symbol)
375    } else {
376        *CHANNEL_LEVEL_MARKER
377    };
378
379    // Use entry API to atomically remove symbol and check if empty
380    // This prevents race conditions where another thread adds a symbol between operations
381    if let dashmap::mapref::entry::Entry::Occupied(mut entry) = map.entry(channel_ustr) {
382        entry.get_mut().remove(&symbol_to_remove);
383        if entry.get().is_empty() {
384            entry.remove();
385        }
386    }
387}
388
389/// Checks if a topic exists in the given map.
390fn is_tracked(map: &DashMap<Ustr, AHashSet<Ustr>>, channel: &str, symbol: Option<&str>) -> bool {
391    let channel_ustr = Ustr::from(channel);
392    let symbol_to_check = if let Some(symbol) = symbol {
393        Ustr::from(symbol)
394    } else {
395        *CHANNEL_LEVEL_MARKER
396    };
397
398    if let Some(entry) = map.get(&channel_ustr) {
399        entry.contains(&symbol_to_check)
400    } else {
401        false
402    }
403}
404
405////////////////////////////////////////////////////////////////////////////////
406// Tests
407////////////////////////////////////////////////////////////////////////////////
408
409#[cfg(test)]
410mod tests {
411    use rstest::rstest;
412
413    use super::*;
414
415    #[rstest]
416    fn test_split_topic_with_symbol() {
417        let (channel, symbol) = split_topic("tickers.BTCUSDT", '.');
418        assert_eq!(channel, "tickers");
419        assert_eq!(symbol, Some("BTCUSDT"));
420
421        let (channel, symbol) = split_topic("orderBookL2:XBTUSD", ':');
422        assert_eq!(channel, "orderBookL2");
423        assert_eq!(symbol, Some("XBTUSD"));
424    }
425
426    #[rstest]
427    fn test_split_topic_without_symbol() {
428        let (channel, symbol) = split_topic("orderbook", '.');
429        assert_eq!(channel, "orderbook");
430        assert_eq!(symbol, None);
431    }
432
433    #[rstest]
434    fn test_new_state_is_empty() {
435        let state = SubscriptionState::new('.');
436        assert!(state.is_empty());
437        assert_eq!(state.len(), 0);
438    }
439
440    #[rstest]
441    fn test_mark_subscribe() {
442        let state = SubscriptionState::new('.');
443        state.mark_subscribe("tickers.BTCUSDT");
444
445        assert_eq!(state.pending_subscribe_topics(), vec!["tickers.BTCUSDT"]);
446        assert_eq!(state.len(), 0); // Not confirmed yet
447    }
448
449    #[rstest]
450    fn test_confirm_subscribe() {
451        let state = SubscriptionState::new('.');
452        state.mark_subscribe("tickers.BTCUSDT");
453        state.confirm_subscribe("tickers.BTCUSDT");
454
455        assert!(state.pending_subscribe_topics().is_empty());
456        assert_eq!(state.len(), 1);
457    }
458
459    #[rstest]
460    fn test_mark_unsubscribe() {
461        let state = SubscriptionState::new('.');
462        state.mark_subscribe("tickers.BTCUSDT");
463        state.confirm_subscribe("tickers.BTCUSDT");
464        state.mark_unsubscribe("tickers.BTCUSDT");
465
466        assert_eq!(state.len(), 0); // Removed from confirmed
467        assert_eq!(state.pending_unsubscribe_topics(), vec!["tickers.BTCUSDT"]);
468    }
469
470    #[rstest]
471    fn test_confirm_unsubscribe() {
472        let state = SubscriptionState::new('.');
473        state.mark_subscribe("tickers.BTCUSDT");
474        state.confirm_subscribe("tickers.BTCUSDT");
475        state.mark_unsubscribe("tickers.BTCUSDT");
476        state.confirm_unsubscribe("tickers.BTCUSDT");
477
478        assert!(state.is_empty());
479    }
480
481    #[rstest]
482    fn test_resubscribe_before_unsubscribe_ack() {
483        // Regression test for race condition:
484        // User unsubscribes, then immediately resubscribes before the unsubscribe ACK arrives.
485        // The unsubscribe ACK should NOT clear the pending_subscribe entry.
486        let state = SubscriptionState::new('.');
487
488        state.mark_subscribe("tickers.BTCUSDT");
489        state.confirm_subscribe("tickers.BTCUSDT");
490        assert_eq!(state.len(), 1);
491
492        state.mark_unsubscribe("tickers.BTCUSDT");
493        assert_eq!(state.len(), 0);
494        assert_eq!(state.pending_unsubscribe_topics(), vec!["tickers.BTCUSDT"]);
495
496        // User immediately resubscribes (before unsubscribe ACK)
497        state.mark_subscribe("tickers.BTCUSDT");
498        assert_eq!(state.pending_subscribe_topics(), vec!["tickers.BTCUSDT"]);
499
500        // Stale unsubscribe ACK arrives - should be ignored (pending_unsubscribe already cleared)
501        state.confirm_unsubscribe("tickers.BTCUSDT");
502        assert!(state.pending_unsubscribe_topics().is_empty());
503        assert_eq!(state.pending_subscribe_topics(), vec!["tickers.BTCUSDT"]); // Must still be pending
504
505        // Subscribe ACK confirms successfully
506        state.confirm_subscribe("tickers.BTCUSDT");
507        assert_eq!(state.len(), 1);
508        assert!(state.pending_subscribe_topics().is_empty());
509
510        // Topic available for reconnect
511        let all = state.all_topics();
512        assert_eq!(all.len(), 1);
513        assert!(all.contains(&"tickers.BTCUSDT".to_string()));
514    }
515
516    #[rstest]
517    fn test_stale_unsubscribe_ack_after_resubscribe_confirmed() {
518        // Regression test for P1 bug: Stale unsubscribe ACK removing confirmed topic.
519        // Scenario: User unsubscribes, immediately resubscribes, subscribe ACK arrives
520        // FIRST (out of order), then stale unsubscribe ACK arrives.
521        // The stale ACK must NOT remove the topic from confirmed state.
522        let state = SubscriptionState::new('.');
523
524        // Initial subscription
525        state.mark_subscribe("tickers.BTCUSDT");
526        state.confirm_subscribe("tickers.BTCUSDT");
527        assert_eq!(state.len(), 1);
528
529        // User unsubscribes
530        state.mark_unsubscribe("tickers.BTCUSDT");
531        assert_eq!(state.len(), 0);
532        assert_eq!(state.pending_unsubscribe_topics(), vec!["tickers.BTCUSDT"]);
533
534        // User immediately resubscribes (before unsubscribe ACK)
535        state.mark_subscribe("tickers.BTCUSDT");
536        assert!(state.pending_unsubscribe_topics().is_empty()); // Cleared by mark_subscribe
537        assert_eq!(state.pending_subscribe_topics(), vec!["tickers.BTCUSDT"]);
538
539        // Subscribe ACK arrives FIRST (out of order!)
540        state.confirm_subscribe("tickers.BTCUSDT");
541        assert_eq!(state.len(), 1); // Back in confirmed
542        assert!(state.pending_subscribe_topics().is_empty());
543
544        // NOW the stale unsubscribe ACK arrives
545        // This must be ignored because topic is no longer in pending_unsubscribe
546        state.confirm_unsubscribe("tickers.BTCUSDT");
547
548        // Topic should STILL be confirmed (not removed by stale ACK)
549        assert_eq!(state.len(), 1); // Must remain confirmed
550        assert!(state.pending_unsubscribe_topics().is_empty());
551        assert!(state.pending_subscribe_topics().is_empty());
552
553        // Topic should be in all_topics (for reconnect)
554        let all = state.all_topics();
555        assert_eq!(all.len(), 1);
556        assert!(all.contains(&"tickers.BTCUSDT".to_string()));
557    }
558
559    #[rstest]
560    fn test_mark_failure() {
561        let state = SubscriptionState::new('.');
562        state.mark_subscribe("tickers.BTCUSDT");
563        state.confirm_subscribe("tickers.BTCUSDT");
564        state.mark_failure("tickers.BTCUSDT");
565
566        assert_eq!(state.len(), 0);
567        assert_eq!(state.pending_subscribe_topics(), vec!["tickers.BTCUSDT"]);
568    }
569
570    #[rstest]
571    fn test_all_topics_includes_confirmed_and_pending_subscribe() {
572        let state = SubscriptionState::new('.');
573        state.mark_subscribe("tickers.BTCUSDT");
574        state.confirm_subscribe("tickers.BTCUSDT");
575        state.mark_subscribe("tickers.ETHUSDT");
576
577        let topics = state.all_topics();
578        assert_eq!(topics.len(), 2);
579        assert!(topics.contains(&"tickers.BTCUSDT".to_string()));
580        assert!(topics.contains(&"tickers.ETHUSDT".to_string()));
581    }
582
583    #[rstest]
584    fn test_all_topics_excludes_pending_unsubscribe() {
585        let state = SubscriptionState::new('.');
586        state.mark_subscribe("tickers.BTCUSDT");
587        state.confirm_subscribe("tickers.BTCUSDT");
588        state.mark_unsubscribe("tickers.BTCUSDT");
589
590        let topics = state.all_topics();
591        assert!(topics.is_empty());
592    }
593
594    #[rstest]
595    fn test_reference_counting_single_topic() {
596        let state = SubscriptionState::new('.');
597
598        assert!(state.add_reference("tickers.BTCUSDT"));
599        assert_eq!(state.get_reference_count("tickers.BTCUSDT"), 1);
600
601        assert!(!state.add_reference("tickers.BTCUSDT"));
602        assert_eq!(state.get_reference_count("tickers.BTCUSDT"), 2);
603
604        assert!(!state.remove_reference("tickers.BTCUSDT"));
605        assert_eq!(state.get_reference_count("tickers.BTCUSDT"), 1);
606
607        assert!(state.remove_reference("tickers.BTCUSDT"));
608        assert_eq!(state.get_reference_count("tickers.BTCUSDT"), 0);
609    }
610
611    #[rstest]
612    fn test_reference_counting_multiple_topics() {
613        let state = SubscriptionState::new('.');
614
615        assert!(state.add_reference("tickers.BTCUSDT"));
616        assert!(state.add_reference("tickers.ETHUSDT"));
617
618        assert!(!state.add_reference("tickers.BTCUSDT"));
619        assert_eq!(state.get_reference_count("tickers.BTCUSDT"), 2);
620        assert_eq!(state.get_reference_count("tickers.ETHUSDT"), 1);
621
622        assert!(!state.remove_reference("tickers.BTCUSDT"));
623        assert!(state.remove_reference("tickers.ETHUSDT"));
624    }
625
626    #[rstest]
627    fn test_topic_without_symbol() {
628        let state = SubscriptionState::new('.');
629        state.mark_subscribe("orderbook");
630        state.confirm_subscribe("orderbook");
631
632        assert_eq!(state.len(), 1);
633        assert_eq!(state.all_topics(), vec!["orderbook"]);
634    }
635
636    #[rstest]
637    fn test_different_delimiters() {
638        let state_dot = SubscriptionState::new('.');
639        state_dot.mark_subscribe("tickers.BTCUSDT");
640        assert_eq!(
641            state_dot.pending_subscribe_topics(),
642            vec!["tickers.BTCUSDT"]
643        );
644
645        let state_colon = SubscriptionState::new(':');
646        state_colon.mark_subscribe("orderBookL2:XBTUSD");
647        assert_eq!(
648            state_colon.pending_subscribe_topics(),
649            vec!["orderBookL2:XBTUSD"]
650        );
651    }
652
653    #[rstest]
654    fn test_clear() {
655        let state = SubscriptionState::new('.');
656        state.mark_subscribe("tickers.BTCUSDT");
657        state.confirm_subscribe("tickers.BTCUSDT");
658        state.add_reference("tickers.BTCUSDT");
659
660        state.clear();
661
662        assert!(state.is_empty());
663        assert_eq!(state.get_reference_count("tickers.BTCUSDT"), 0);
664    }
665
666    #[rstest]
667    fn test_multiple_symbols_same_channel() {
668        let state = SubscriptionState::new('.');
669        state.mark_subscribe("tickers.BTCUSDT");
670        state.mark_subscribe("tickers.ETHUSDT");
671        state.confirm_subscribe("tickers.BTCUSDT");
672        state.confirm_subscribe("tickers.ETHUSDT");
673
674        assert_eq!(state.len(), 2);
675        let topics = state.all_topics();
676        assert!(topics.contains(&"tickers.BTCUSDT".to_string()));
677        assert!(topics.contains(&"tickers.ETHUSDT".to_string()));
678    }
679
680    #[rstest]
681    fn test_mixed_channel_and_symbol_subscriptions() {
682        let state = SubscriptionState::new('.');
683
684        // Subscribe to channel-level first
685        state.mark_subscribe("tickers");
686        state.confirm_subscribe("tickers");
687        assert_eq!(state.len(), 1);
688        assert_eq!(state.all_topics(), vec!["tickers"]);
689
690        // Add symbol-level subscription to same channel
691        state.mark_subscribe("tickers.BTCUSDT");
692        state.confirm_subscribe("tickers.BTCUSDT");
693        assert_eq!(state.len(), 2);
694
695        // Both should be present
696        let topics = state.all_topics();
697        assert_eq!(topics.len(), 2);
698        assert!(topics.contains(&"tickers".to_string()));
699        assert!(topics.contains(&"tickers.BTCUSDT".to_string()));
700
701        // Add another symbol
702        state.mark_subscribe("tickers.ETHUSDT");
703        state.confirm_subscribe("tickers.ETHUSDT");
704        assert_eq!(state.len(), 3);
705
706        let topics = state.all_topics();
707        assert_eq!(topics.len(), 3);
708        assert!(topics.contains(&"tickers".to_string()));
709        assert!(topics.contains(&"tickers.BTCUSDT".to_string()));
710        assert!(topics.contains(&"tickers.ETHUSDT".to_string()));
711
712        // Unsubscribe from channel-level only
713        state.mark_unsubscribe("tickers");
714        state.confirm_unsubscribe("tickers");
715        assert_eq!(state.len(), 2);
716
717        let topics = state.all_topics();
718        assert_eq!(topics.len(), 2);
719        assert!(!topics.contains(&"tickers".to_string()));
720        assert!(topics.contains(&"tickers.BTCUSDT".to_string()));
721        assert!(topics.contains(&"tickers.ETHUSDT".to_string()));
722    }
723
724    #[rstest]
725    fn test_symbol_subscription_before_channel() {
726        let state = SubscriptionState::new('.');
727
728        // Subscribe to symbol first
729        state.mark_subscribe("tickers.BTCUSDT");
730        state.confirm_subscribe("tickers.BTCUSDT");
731        assert_eq!(state.len(), 1);
732
733        // Then add channel-level
734        state.mark_subscribe("tickers");
735        state.confirm_subscribe("tickers");
736        assert_eq!(state.len(), 2);
737
738        // Both should be present after reconnect
739        let topics = state.all_topics();
740        assert_eq!(topics.len(), 2);
741        assert!(topics.contains(&"tickers".to_string()));
742        assert!(topics.contains(&"tickers.BTCUSDT".to_string()));
743    }
744
745    #[rstest]
746    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
747    async fn test_concurrent_subscribe_same_topic() {
748        let state = Arc::new(SubscriptionState::new('.'));
749        let mut handles = vec![];
750
751        // Spawn 10 tasks all subscribing to the same topic
752        for _ in 0..10 {
753            let state_clone = Arc::clone(&state);
754            let handle = tokio::spawn(async move {
755                state_clone.add_reference("tickers.BTCUSDT");
756                state_clone.mark_subscribe("tickers.BTCUSDT");
757                state_clone.confirm_subscribe("tickers.BTCUSDT");
758            });
759            handles.push(handle);
760        }
761
762        for handle in handles {
763            handle.await.unwrap();
764        }
765
766        // Reference count should be exactly 10
767        assert_eq!(state.get_reference_count("tickers.BTCUSDT"), 10);
768        assert_eq!(state.len(), 1);
769    }
770
771    #[rstest]
772    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
773    async fn test_concurrent_subscribe_unsubscribe() {
774        let state = Arc::new(SubscriptionState::new('.'));
775        let mut handles = vec![];
776
777        // Spawn 20 tasks, each adding 2 references to their own unique topic
778        // This ensures deterministic behavior - we know exactly what the final state should be
779        for i in 0..20 {
780            let state_clone = Arc::clone(&state);
781            let handle = tokio::spawn(async move {
782                let topic = format!("tickers.SYMBOL{i}");
783                // Add 2 references
784                state_clone.add_reference(&topic);
785                state_clone.add_reference(&topic);
786                state_clone.mark_subscribe(&topic);
787                state_clone.confirm_subscribe(&topic);
788
789                // Remove 1 reference (should still have 1 remaining)
790                state_clone.remove_reference(&topic);
791            });
792            handles.push(handle);
793        }
794
795        for handle in handles {
796            handle.await.unwrap();
797        }
798
799        // Each of the 20 topics should still have 1 reference
800        for i in 0..20 {
801            let topic = format!("tickers.SYMBOL{i}");
802            assert_eq!(state.get_reference_count(&topic), 1);
803        }
804
805        // Should have exactly 20 confirmed subscriptions
806        assert_eq!(state.len(), 20);
807        assert!(!state.is_empty());
808    }
809
810    #[rstest]
811    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
812    async fn test_concurrent_reference_counting_same_topic() {
813        let state = Arc::new(SubscriptionState::new('.'));
814        let topic = "tickers.BTCUSDT";
815        let mut handles = vec![];
816
817        // Spawn 10 tasks all adding 10 references to the same topic
818        for _ in 0..10 {
819            let state_clone = Arc::clone(&state);
820            let handle = tokio::spawn(async move {
821                for _ in 0..10 {
822                    state_clone.add_reference(topic);
823                }
824            });
825            handles.push(handle);
826        }
827
828        for handle in handles {
829            handle.await.unwrap();
830        }
831
832        // Should have exactly 100 references (10 tasks * 10 refs each)
833        assert_eq!(state.get_reference_count(topic), 100);
834
835        // Now remove 50 references sequentially
836        for _ in 0..50 {
837            state.remove_reference(topic);
838        }
839
840        // Should have exactly 50 references remaining
841        assert_eq!(state.get_reference_count(topic), 50);
842    }
843
844    #[rstest]
845    fn test_reconnection_scenario() {
846        let state = SubscriptionState::new('.');
847
848        // Initial subscriptions
849        state.add_reference("tickers.BTCUSDT");
850        state.mark_subscribe("tickers.BTCUSDT");
851        state.confirm_subscribe("tickers.BTCUSDT");
852
853        state.add_reference("tickers.ETHUSDT");
854        state.mark_subscribe("tickers.ETHUSDT");
855        state.confirm_subscribe("tickers.ETHUSDT");
856
857        state.add_reference("orderbook");
858        state.mark_subscribe("orderbook");
859        state.confirm_subscribe("orderbook");
860
861        assert_eq!(state.len(), 3);
862
863        // Simulate disconnect - topics should be available for resubscription
864        let topics_to_resubscribe = state.all_topics();
865        assert_eq!(topics_to_resubscribe.len(), 3);
866        assert!(topics_to_resubscribe.contains(&"tickers.BTCUSDT".to_string()));
867        assert!(topics_to_resubscribe.contains(&"tickers.ETHUSDT".to_string()));
868        assert!(topics_to_resubscribe.contains(&"orderbook".to_string()));
869
870        // On reconnect, mark all as pending again
871        for topic in &topics_to_resubscribe {
872            state.mark_subscribe(topic);
873        }
874
875        // Simulate server confirmations
876        for topic in &topics_to_resubscribe {
877            state.confirm_subscribe(topic);
878        }
879
880        // Should still have all 3 subscriptions
881        assert_eq!(state.len(), 3);
882        assert_eq!(state.all_topics().len(), 3);
883    }
884
885    #[rstest]
886    fn test_state_machine_invalid_transitions() {
887        let state = SubscriptionState::new('.');
888
889        // Confirm subscribe without marking first - should not crash
890        state.confirm_subscribe("tickers.BTCUSDT");
891        assert_eq!(state.len(), 1); // Gets added to confirmed
892
893        // Confirm unsubscribe without marking first - should not crash
894        state.confirm_unsubscribe("tickers.ETHUSDT");
895        assert_eq!(state.len(), 1); // Nothing changes
896
897        // Double confirm subscribe
898        state.mark_subscribe("orderbook");
899        state.confirm_subscribe("orderbook");
900        state.confirm_subscribe("orderbook"); // Second confirm is idempotent
901        assert_eq!(state.len(), 2);
902
903        // Unsubscribe something that was never subscribed
904        state.mark_unsubscribe("nonexistent");
905        state.confirm_unsubscribe("nonexistent");
906        assert_eq!(state.len(), 2); // Still 2
907    }
908
909    #[rstest]
910    fn test_mark_failure_moves_to_pending() {
911        let state = SubscriptionState::new('.');
912
913        // Subscribe and confirm
914        state.mark_subscribe("tickers.BTCUSDT");
915        state.confirm_subscribe("tickers.BTCUSDT");
916        assert_eq!(state.len(), 1);
917        assert!(state.pending_subscribe_topics().is_empty());
918
919        // Mark as failed
920        state.mark_failure("tickers.BTCUSDT");
921
922        // Should be removed from confirmed and back in pending
923        assert_eq!(state.len(), 0);
924        assert_eq!(state.pending_subscribe_topics(), vec!["tickers.BTCUSDT"]);
925
926        // all_topics should still include it for reconnection
927        assert_eq!(state.all_topics(), vec!["tickers.BTCUSDT"]);
928    }
929
930    #[rstest]
931    fn test_pending_subscribe_excludes_pending_unsubscribe() {
932        let state = SubscriptionState::new('.');
933
934        // Subscribe and confirm
935        state.mark_subscribe("tickers.BTCUSDT");
936        state.confirm_subscribe("tickers.BTCUSDT");
937
938        // Mark for unsubscribe
939        state.mark_unsubscribe("tickers.BTCUSDT");
940
941        // Should be in pending_unsubscribe but NOT in all_topics
942        assert_eq!(state.pending_unsubscribe_topics(), vec!["tickers.BTCUSDT"]);
943        assert!(state.all_topics().is_empty());
944        assert_eq!(state.len(), 0);
945    }
946
947    #[rstest]
948    fn test_remove_reference_nonexistent_topic() {
949        let state = SubscriptionState::new('.');
950
951        // Removing reference to topic that was never added
952        let should_unsubscribe = state.remove_reference("nonexistent");
953
954        // Should return false and not crash
955        assert!(!should_unsubscribe);
956        assert_eq!(state.get_reference_count("nonexistent"), 0);
957    }
958
959    #[rstest]
960    fn test_edge_case_empty_channel_name() {
961        let state = SubscriptionState::new('.');
962
963        // Edge case: empty string as topic
964        state.mark_subscribe("");
965        state.confirm_subscribe("");
966
967        assert_eq!(state.len(), 1);
968        assert_eq!(state.all_topics(), vec![""]);
969    }
970
971    #[rstest]
972    fn test_special_characters_in_topics() {
973        let state = SubscriptionState::new('.');
974
975        // Topics with special characters
976        let special_topics = vec![
977            "channel.symbol-with-dash",
978            "channel.SYMBOL_WITH_UNDERSCORE",
979            "channel.symbol123",
980            "channel.symbol@special",
981        ];
982
983        for topic in &special_topics {
984            state.mark_subscribe(topic);
985            state.confirm_subscribe(topic);
986        }
987
988        assert_eq!(state.len(), special_topics.len());
989
990        let all_topics = state.all_topics();
991        for topic in &special_topics {
992            assert!(
993                all_topics.contains(&(*topic).to_string()),
994                "Missing topic: {topic}"
995            );
996        }
997    }
998
999    #[rstest]
1000    fn test_clear_resets_all_state() {
1001        let state = SubscriptionState::new('.');
1002
1003        // Add multiple subscriptions and references
1004        for i in 0..10 {
1005            let topic = format!("channel{i}.SYMBOL");
1006            state.add_reference(&topic);
1007            state.add_reference(&topic); // Add twice
1008            state.mark_subscribe(&topic);
1009            state.confirm_subscribe(&topic);
1010        }
1011
1012        assert_eq!(state.len(), 10);
1013        assert!(!state.is_empty());
1014
1015        // Clear everything
1016        state.clear();
1017
1018        // Verify complete reset
1019        assert_eq!(state.len(), 0);
1020        assert!(state.is_empty());
1021        assert!(state.all_topics().is_empty());
1022        assert!(state.pending_subscribe_topics().is_empty());
1023        assert!(state.pending_unsubscribe_topics().is_empty());
1024
1025        // Verify reference counts are cleared
1026        for i in 0..10 {
1027            let topic = format!("channel{i}.SYMBOL");
1028            assert_eq!(state.get_reference_count(&topic), 0);
1029        }
1030    }
1031
1032    #[rstest]
1033    fn test_different_delimiter_does_not_affect_storage() {
1034        // Verify delimiter is only used for parsing, not storage
1035        let state_dot = SubscriptionState::new('.');
1036        let state_colon = SubscriptionState::new(':');
1037
1038        // Add same logical subscription with different delimiters
1039        state_dot.mark_subscribe("channel.SYMBOL");
1040        state_colon.mark_subscribe("channel:SYMBOL");
1041
1042        // Both should work correctly
1043        assert_eq!(state_dot.pending_subscribe_topics(), vec!["channel.SYMBOL"]);
1044        assert_eq!(
1045            state_colon.pending_subscribe_topics(),
1046            vec!["channel:SYMBOL"]
1047        );
1048    }
1049
1050    #[rstest]
1051    fn test_unsubscribe_before_subscribe_confirmed() {
1052        let state = SubscriptionState::new('.');
1053
1054        // User subscribes
1055        state.mark_subscribe("tickers.BTCUSDT");
1056        assert_eq!(state.pending_subscribe_topics(), vec!["tickers.BTCUSDT"]);
1057
1058        // User immediately changes mind before server confirms
1059        state.mark_unsubscribe("tickers.BTCUSDT");
1060
1061        // Should be removed from pending_subscribe and added to pending_unsubscribe
1062        assert!(state.pending_subscribe_topics().is_empty());
1063        assert_eq!(state.pending_unsubscribe_topics(), vec!["tickers.BTCUSDT"]);
1064
1065        // Confirm the unsubscribe
1066        state.confirm_unsubscribe("tickers.BTCUSDT");
1067
1068        // Should be completely gone
1069        assert!(state.is_empty());
1070        assert!(state.all_topics().is_empty());
1071        assert_eq!(state.len(), 0);
1072    }
1073
1074    #[rstest]
1075    fn test_late_subscribe_confirmation_after_unsubscribe() {
1076        let state = SubscriptionState::new('.');
1077
1078        // User subscribes
1079        state.mark_subscribe("tickers.BTCUSDT");
1080
1081        // User immediately unsubscribes
1082        state.mark_unsubscribe("tickers.BTCUSDT");
1083
1084        // Late subscribe confirmation arrives from server
1085        state.confirm_subscribe("tickers.BTCUSDT");
1086
1087        // Should NOT be added to confirmed (unsubscribe takes precedence)
1088        assert_eq!(state.len(), 0);
1089        assert!(state.pending_subscribe_topics().is_empty());
1090
1091        // Confirm the unsubscribe
1092        state.confirm_unsubscribe("tickers.BTCUSDT");
1093
1094        // Should still be empty
1095        assert!(state.is_empty());
1096        assert!(state.all_topics().is_empty());
1097    }
1098
1099    #[rstest]
1100    fn test_unsubscribe_clears_all_states() {
1101        let state = SubscriptionState::new('.');
1102
1103        // Subscribe and confirm
1104        state.mark_subscribe("tickers.BTCUSDT");
1105        state.confirm_subscribe("tickers.BTCUSDT");
1106        assert_eq!(state.len(), 1);
1107
1108        // Unsubscribe
1109        state.mark_unsubscribe("tickers.BTCUSDT");
1110
1111        // Should be removed from confirmed
1112        assert_eq!(state.len(), 0);
1113        assert_eq!(state.pending_unsubscribe_topics(), vec!["tickers.BTCUSDT"]);
1114
1115        // Late subscribe confirmation somehow arrives (race condition)
1116        state.confirm_subscribe("tickers.BTCUSDT");
1117
1118        // confirm_unsubscribe should clean everything
1119        state.confirm_unsubscribe("tickers.BTCUSDT");
1120
1121        // Completely empty
1122        assert!(state.is_empty());
1123        assert_eq!(state.len(), 0);
1124        assert!(state.pending_subscribe_topics().is_empty());
1125        assert!(state.pending_unsubscribe_topics().is_empty());
1126        assert!(state.all_topics().is_empty());
1127    }
1128
1129    #[rstest]
1130    fn test_mark_failure_respects_pending_unsubscribe() {
1131        let state = SubscriptionState::new('.');
1132
1133        // Subscribe and confirm
1134        state.mark_subscribe("tickers.BTCUSDT");
1135        state.confirm_subscribe("tickers.BTCUSDT");
1136        assert_eq!(state.len(), 1);
1137
1138        // User unsubscribes
1139        state.mark_unsubscribe("tickers.BTCUSDT");
1140        assert_eq!(state.len(), 0);
1141        assert_eq!(state.pending_unsubscribe_topics(), vec!["tickers.BTCUSDT"]);
1142
1143        // Meanwhile, a network error triggers mark_failure
1144        state.mark_failure("tickers.BTCUSDT");
1145
1146        // Should NOT be added to pending_subscribe (user wanted to unsubscribe)
1147        assert!(state.pending_subscribe_topics().is_empty());
1148        assert_eq!(state.pending_unsubscribe_topics(), vec!["tickers.BTCUSDT"]);
1149
1150        // all_topics should NOT include it
1151        assert!(state.all_topics().is_empty());
1152
1153        // Confirm unsubscribe
1154        state.confirm_unsubscribe("tickers.BTCUSDT");
1155        assert!(state.is_empty());
1156    }
1157
1158    #[rstest]
1159    #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
1160    async fn test_concurrent_stress_mixed_operations() {
1161        let state = Arc::new(SubscriptionState::new('.'));
1162        let mut handles = vec![];
1163
1164        // Spawn 50 tasks doing random interleaved operations
1165        for i in 0..50 {
1166            let state_clone = Arc::clone(&state);
1167            let handle = tokio::spawn(async move {
1168                let topic1 = format!("channel.SYMBOL{i}");
1169                let topic2 = format!("channel.SYMBOL{}", i + 100);
1170
1171                // Add references
1172                state_clone.add_reference(&topic1);
1173                state_clone.add_reference(&topic2);
1174
1175                // Mark and confirm subscriptions
1176                state_clone.mark_subscribe(&topic1);
1177                state_clone.confirm_subscribe(&topic1);
1178                state_clone.mark_subscribe(&topic2);
1179
1180                // Interleave some unsubscribes
1181                if i % 3 == 0 {
1182                    state_clone.mark_unsubscribe(&topic1);
1183                    state_clone.confirm_unsubscribe(&topic1);
1184                }
1185
1186                // More reference operations
1187                state_clone.add_reference(&topic2);
1188                state_clone.remove_reference(&topic2);
1189
1190                // Confirm topic2
1191                state_clone.confirm_subscribe(&topic2);
1192            });
1193            handles.push(handle);
1194        }
1195
1196        for handle in handles {
1197            handle.await.unwrap();
1198        }
1199
1200        // Verify state is consistent (no panics, all maps accessible)
1201        let all = state.all_topics();
1202        let confirmed_count = state.len();
1203
1204        // We have 50 topic2s (always confirmed) + topic1s (50 - number unsubscribed)
1205        // About 17 topic1s get unsubscribed (i % 3 == 0), leaving ~33 topic1s + 50 topic2s = ~83
1206        assert!(confirmed_count > 50); // At least all topic2s
1207        assert!(confirmed_count <= 100); // At most all topic1s + topic2s
1208        assert_eq!(
1209            all.len(),
1210            confirmed_count + state.pending_subscribe_topics().len()
1211        );
1212    }
1213
1214    #[rstest]
1215    fn test_edge_case_malformed_topics() {
1216        let state = SubscriptionState::new('.');
1217
1218        // Topics with multiple delimiters (splits on first delimiter)
1219        state.mark_subscribe("channel.symbol.extra");
1220        state.confirm_subscribe("channel.symbol.extra");
1221        let topics = state.all_topics();
1222        assert!(topics.contains(&"channel.symbol.extra".to_string()));
1223
1224        // Topic with leading delimiter (empty channel, symbol is "channel")
1225        state.mark_subscribe(".channel");
1226        state.confirm_subscribe(".channel");
1227        assert_eq!(state.len(), 2);
1228
1229        // Topic with trailing delimiter - treated as channel-level (empty symbol = marker)
1230        // "channel." splits to ("channel", Some("")), and empty string is the channel marker
1231        state.mark_subscribe("channel.");
1232        state.confirm_subscribe("channel.");
1233        assert_eq!(state.len(), 3);
1234
1235        // Topic without delimiter - explicitly channel-level
1236        state.mark_subscribe("tickers");
1237        state.confirm_subscribe("tickers");
1238        assert_eq!(state.len(), 4);
1239
1240        // Verify all are retrievable (note: "channel." becomes "channel")
1241        let all = state.all_topics();
1242        assert_eq!(all.len(), 4);
1243        assert!(all.contains(&"channel.symbol.extra".to_string()));
1244        assert!(all.contains(&".channel".to_string()));
1245        assert!(all.contains(&"channel".to_string())); // "channel." treated as channel-level
1246        assert!(all.contains(&"tickers".to_string()));
1247    }
1248
1249    #[rstest]
1250    fn test_reference_count_underflow_safety() {
1251        let state = SubscriptionState::new('.');
1252
1253        // Remove without ever adding
1254        assert!(!state.remove_reference("never.added"));
1255        assert_eq!(state.get_reference_count("never.added"), 0);
1256
1257        // Add one, remove multiple times
1258        state.add_reference("once.added");
1259        assert_eq!(state.get_reference_count("once.added"), 1);
1260
1261        assert!(state.remove_reference("once.added")); // Should return true (last ref)
1262        assert_eq!(state.get_reference_count("once.added"), 0);
1263
1264        assert!(!state.remove_reference("once.added")); // Should not crash, returns false
1265        assert!(!state.remove_reference("once.added")); // Multiple times
1266        assert_eq!(state.get_reference_count("once.added"), 0);
1267
1268        // Verify we can add again after underflow attempts
1269        assert!(state.add_reference("once.added"));
1270        assert_eq!(state.get_reference_count("once.added"), 1);
1271    }
1272
1273    #[rstest]
1274    fn test_reconnection_with_partial_state() {
1275        let state = SubscriptionState::new('.');
1276
1277        // Setup: Some confirmed, some pending subscribe, some pending unsubscribe
1278        // Confirmed
1279        state.mark_subscribe("confirmed.BTCUSDT");
1280        state.confirm_subscribe("confirmed.BTCUSDT");
1281
1282        // Pending subscribe (not yet confirmed)
1283        state.mark_subscribe("pending.ETHUSDT");
1284
1285        // Pending unsubscribe (user cancelled)
1286        state.mark_subscribe("cancelled.XRPUSDT");
1287        state.confirm_subscribe("cancelled.XRPUSDT");
1288        state.mark_unsubscribe("cancelled.XRPUSDT");
1289
1290        // Verify state before reconnect
1291        assert_eq!(state.len(), 1); // Only confirmed.BTCUSDT
1292        let all = state.all_topics();
1293        assert_eq!(all.len(), 2); // confirmed + pending_subscribe (not pending_unsubscribe)
1294        assert!(all.contains(&"confirmed.BTCUSDT".to_string()));
1295        assert!(all.contains(&"pending.ETHUSDT".to_string()));
1296        assert!(!all.contains(&"cancelled.XRPUSDT".to_string())); // Should NOT be included
1297
1298        // Simulate disconnect and reconnect
1299        let topics_to_resubscribe = state.all_topics();
1300
1301        // Clear confirmed on disconnect (simulate connection drop)
1302        state.confirmed().clear();
1303
1304        // Mark all for resubscription
1305        for topic in &topics_to_resubscribe {
1306            state.mark_subscribe(topic);
1307        }
1308
1309        // Server confirms both
1310        for topic in &topics_to_resubscribe {
1311            state.confirm_subscribe(topic);
1312        }
1313
1314        // Verify final state
1315        assert_eq!(state.len(), 2); // Both confirmed
1316        let final_topics = state.all_topics();
1317        assert_eq!(final_topics.len(), 2);
1318        assert!(final_topics.contains(&"confirmed.BTCUSDT".to_string()));
1319        assert!(final_topics.contains(&"pending.ETHUSDT".to_string()));
1320        assert!(!final_topics.contains(&"cancelled.XRPUSDT".to_string()));
1321    }
1322
1323    /// Verifies all invariants of the subscription state.
1324    ///
1325    /// # Invariants
1326    ///
1327    /// 1. **Mutual exclusivity**: A topic cannot exist in multiple states simultaneously
1328    ///    (one of: confirmed, pending_subscribe, pending_unsubscribe, or none).
1329    /// 2. **all_topics consistency**: `all_topics()` must equal `confirmed ∪ pending_subscribe`
1330    /// 3. **len consistency**: `len()` must equal total count of symbols in confirmed map
1331    /// 4. **is_empty consistency**: `is_empty()` true iff all maps are empty
1332    /// 5. **Reference count non-negative**: All reference counts >= 0
1333    fn check_invariants(state: &SubscriptionState, label: &str) {
1334        // Collect all topics from each state
1335        let confirmed_topics: AHashSet<String> = state
1336            .topics_from_map(&state.confirmed)
1337            .into_iter()
1338            .collect();
1339        let pending_sub_topics: AHashSet<String> =
1340            state.pending_subscribe_topics().into_iter().collect();
1341        let pending_unsub_topics: AHashSet<String> =
1342            state.pending_unsubscribe_topics().into_iter().collect();
1343
1344        // INVARIANT 1: Mutual exclusivity - no topic in multiple states
1345        let confirmed_and_pending_sub: Vec<_> =
1346            confirmed_topics.intersection(&pending_sub_topics).collect();
1347        assert!(
1348            confirmed_and_pending_sub.is_empty(),
1349            "{label}: Topic in both confirmed and pending_subscribe: {confirmed_and_pending_sub:?}"
1350        );
1351
1352        let confirmed_and_pending_unsub: Vec<_> = confirmed_topics
1353            .intersection(&pending_unsub_topics)
1354            .collect();
1355        assert!(
1356            confirmed_and_pending_unsub.is_empty(),
1357            "{label}: Topic in both confirmed and pending_unsubscribe: {confirmed_and_pending_unsub:?}"
1358        );
1359
1360        let pending_sub_and_unsub: Vec<_> = pending_sub_topics
1361            .intersection(&pending_unsub_topics)
1362            .collect();
1363        assert!(
1364            pending_sub_and_unsub.is_empty(),
1365            "{label}: Topic in both pending_subscribe and pending_unsubscribe: {pending_sub_and_unsub:?}"
1366        );
1367
1368        // INVARIANT 2: all_topics() == confirmed ∪ pending_subscribe
1369        let all_topics: AHashSet<String> = state.all_topics().into_iter().collect();
1370        let expected_all: AHashSet<String> = confirmed_topics
1371            .union(&pending_sub_topics)
1372            .cloned()
1373            .collect();
1374        assert_eq!(
1375            all_topics, expected_all,
1376            "{label}: all_topics() doesn't match confirmed ∪ pending_subscribe"
1377        );
1378
1379        // Ensure pending_unsubscribe is NOT in all_topics
1380        for topic in &pending_unsub_topics {
1381            assert!(
1382                !all_topics.contains(topic),
1383                "{label}: pending_unsubscribe topic {topic} incorrectly in all_topics()"
1384            );
1385        }
1386
1387        // INVARIANT 3: len() == sum of confirmed symbol counts
1388        let expected_len: usize = state
1389            .confirmed
1390            .iter()
1391            .map(|entry| entry.value().len())
1392            .sum();
1393        assert_eq!(
1394            state.len(),
1395            expected_len,
1396            "{label}: len() mismatch. Expected {expected_len}, was {}",
1397            state.len()
1398        );
1399
1400        // INVARIANT 4: is_empty() consistency
1401        let should_be_empty = state.confirmed.is_empty()
1402            && pending_sub_topics.is_empty()
1403            && pending_unsub_topics.is_empty();
1404        assert_eq!(
1405            state.is_empty(),
1406            should_be_empty,
1407            "{label}: is_empty() inconsistent. Maps empty: {should_be_empty}, is_empty(): {}",
1408            state.is_empty()
1409        );
1410
1411        // INVARIANT 5: Reference counts non-negative (NonZeroUsize enforces > 0, absence = 0)
1412        for entry in state.reference_counts.iter() {
1413            let count = entry.value().get();
1414            assert!(
1415                count > 0,
1416                "{label}: Reference count should be NonZeroUsize (> 0), was {count} for {:?}",
1417                entry.key()
1418            );
1419        }
1420    }
1421
1422    /// Checks that a topic exists in exactly one of the three states or none.
1423    fn check_topic_exclusivity(state: &SubscriptionState, topic: &str, label: &str) {
1424        let (channel, symbol) = split_topic(topic, state.delimiter);
1425
1426        let in_confirmed = is_tracked(&state.confirmed, channel, symbol);
1427        let in_pending_sub = is_tracked(&state.pending_subscribe, channel, symbol);
1428        let in_pending_unsub = is_tracked(&state.pending_unsubscribe, channel, symbol);
1429
1430        let count = [in_confirmed, in_pending_sub, in_pending_unsub]
1431            .iter()
1432            .filter(|&&x| x)
1433            .count();
1434
1435        assert!(
1436            count <= 1,
1437            "{label}: Topic {topic} in {count} states (should be 0 or 1). \
1438             confirmed: {in_confirmed}, pending_sub: {in_pending_sub}, pending_unsub: {in_pending_unsub}"
1439        );
1440    }
1441
1442    #[cfg(test)]
1443    mod property_tests {
1444        use proptest::prelude::*;
1445
1446        use super::*;
1447
1448        #[derive(Debug, Clone)]
1449        enum Operation {
1450            MarkSubscribe(String),
1451            ConfirmSubscribe(String),
1452            MarkUnsubscribe(String),
1453            ConfirmUnsubscribe(String),
1454            MarkFailure(String),
1455            AddReference(String),
1456            RemoveReference(String),
1457            Clear,
1458        }
1459
1460        // Strategy for generating valid topics
1461        fn topic_strategy() -> impl Strategy<Value = String> {
1462            prop_oneof![
1463                // Symbol-level topics
1464                (any::<u8>(), any::<u8>())
1465                    .prop_map(|(ch, sym)| { format!("channel{}.SYMBOL{}", ch % 5, sym % 10) }),
1466                // Channel-level topics (no symbol)
1467                any::<u8>().prop_map(|ch| format!("channel{}", ch % 5)),
1468            ]
1469        }
1470
1471        // Strategy for generating random operations
1472        fn operation_strategy() -> impl Strategy<Value = Operation> {
1473            topic_strategy().prop_flat_map(|topic| {
1474                prop_oneof![
1475                    Just(Operation::MarkSubscribe(topic.clone())),
1476                    Just(Operation::ConfirmSubscribe(topic.clone())),
1477                    Just(Operation::MarkUnsubscribe(topic.clone())),
1478                    Just(Operation::ConfirmUnsubscribe(topic.clone())),
1479                    Just(Operation::MarkFailure(topic.clone())),
1480                    Just(Operation::AddReference(topic.clone())),
1481                    Just(Operation::RemoveReference(topic)),
1482                    Just(Operation::Clear),
1483                ]
1484            })
1485        }
1486
1487        // Apply an operation to the state
1488        fn apply_operation(state: &SubscriptionState, op: &Operation) {
1489            match op {
1490                Operation::MarkSubscribe(topic) => state.mark_subscribe(topic),
1491                Operation::ConfirmSubscribe(topic) => state.confirm_subscribe(topic),
1492                Operation::MarkUnsubscribe(topic) => state.mark_unsubscribe(topic),
1493                Operation::ConfirmUnsubscribe(topic) => state.confirm_unsubscribe(topic),
1494                Operation::MarkFailure(topic) => state.mark_failure(topic),
1495                Operation::AddReference(topic) => {
1496                    state.add_reference(topic);
1497                }
1498                Operation::RemoveReference(topic) => {
1499                    state.remove_reference(topic);
1500                }
1501                Operation::Clear => state.clear(),
1502            }
1503        }
1504
1505        proptest! {
1506            #![proptest_config(ProptestConfig::with_cases(500))]
1507
1508            /// Property: Invariants hold after any sequence of operations.
1509            #[rstest]
1510            fn prop_invariants_hold_after_operations(
1511                operations in prop::collection::vec(operation_strategy(), 1..50)
1512            ) {
1513                let state = SubscriptionState::new('.');
1514
1515                // Apply all operations
1516                for (i, op) in operations.iter().enumerate() {
1517                    apply_operation(&state, op);
1518
1519                    // Check invariants after each operation
1520                    check_invariants(&state, &format!("After op {i}: {op:?}"));
1521                }
1522
1523                // Final invariant check
1524                check_invariants(&state, "Final state");
1525            }
1526
1527            /// Property: Reference counting is always consistent.
1528            #[rstest]
1529            fn prop_reference_counting_consistency(
1530                ops in prop::collection::vec(
1531                    topic_strategy().prop_flat_map(|t| {
1532                        prop_oneof![
1533                            Just(Operation::AddReference(t.clone())),
1534                            Just(Operation::RemoveReference(t)),
1535                        ]
1536                    }),
1537                    1..100
1538                )
1539            ) {
1540                let state = SubscriptionState::new('.');
1541
1542                for op in &ops {
1543                    apply_operation(&state, op);
1544
1545                    // All reference counts must be >= 0 (NonZeroUsize or absent)
1546                    for entry in state.reference_counts.iter() {
1547                        assert!(entry.value().get() > 0);
1548                    }
1549                }
1550            }
1551
1552            /// Property: all_topics() always equals confirmed ∪ pending_subscribe.
1553            #[rstest]
1554            fn prop_all_topics_is_union(
1555                operations in prop::collection::vec(operation_strategy(), 1..50)
1556            ) {
1557                let state = SubscriptionState::new('.');
1558
1559                for op in &operations {
1560                    apply_operation(&state, op);
1561
1562                    // Verify all_topics() == confirmed ∪ pending_subscribe
1563                    let all_topics: AHashSet<String> = state.all_topics().into_iter().collect();
1564                    let confirmed: AHashSet<String> = state.topics_from_map(&state.confirmed).into_iter().collect();
1565                    let pending_sub: AHashSet<String> = state.pending_subscribe_topics().into_iter().collect();
1566                    let expected: AHashSet<String> = confirmed.union(&pending_sub).cloned().collect();
1567
1568                    assert_eq!(all_topics, expected);
1569
1570                    // Ensure pending_unsubscribe topics are NOT in all_topics
1571                    let pending_unsub: AHashSet<String> = state.pending_unsubscribe_topics().into_iter().collect();
1572                    for topic in pending_unsub {
1573                        assert!(!all_topics.contains(&topic));
1574                    }
1575                }
1576            }
1577
1578            /// Property: clear() resets to empty state.
1579            #[rstest]
1580            fn prop_clear_resets_completely(
1581                operations in prop::collection::vec(operation_strategy(), 1..30)
1582            ) {
1583                let state = SubscriptionState::new('.');
1584
1585                // Apply random operations
1586                for op in &operations {
1587                    apply_operation(&state, op);
1588                }
1589
1590                // Clear and verify complete reset
1591                state.clear();
1592
1593                assert!(state.is_empty());
1594                assert_eq!(state.len(), 0);
1595                assert!(state.all_topics().is_empty());
1596                assert!(state.pending_subscribe_topics().is_empty());
1597                assert!(state.pending_unsubscribe_topics().is_empty());
1598                assert!(state.confirmed.is_empty());
1599                assert!(state.pending_subscribe.is_empty());
1600                assert!(state.pending_unsubscribe.is_empty());
1601                assert!(state.reference_counts.is_empty());
1602            }
1603
1604            /// Property: Topics are mutually exclusive across states.
1605            #[rstest]
1606            fn prop_topic_mutual_exclusivity(
1607                operations in prop::collection::vec(operation_strategy(), 1..50),
1608                topic in topic_strategy()
1609            ) {
1610                let state = SubscriptionState::new('.');
1611
1612                for (i, op) in operations.iter().enumerate() {
1613                    apply_operation(&state, op);
1614                    check_topic_exclusivity(&state, &topic, &format!("After op {i}: {op:?}"));
1615                }
1616            }
1617        }
1618    }
1619
1620    #[rstest]
1621    fn test_exhaustive_two_step_transitions() {
1622        let operations = [
1623            "mark_subscribe",
1624            "confirm_subscribe",
1625            "mark_unsubscribe",
1626            "confirm_unsubscribe",
1627            "mark_failure",
1628        ];
1629
1630        for &op1 in &operations {
1631            for &op2 in &operations {
1632                let state = SubscriptionState::new('.');
1633                let topic = "test.TOPIC";
1634
1635                // Apply two operations
1636                apply_op(&state, op1, topic);
1637                apply_op(&state, op2, topic);
1638
1639                // Verify invariants hold
1640                check_invariants(&state, &format!("{op1} → {op2}"));
1641                check_topic_exclusivity(&state, topic, &format!("{op1} → {op2}"));
1642            }
1643        }
1644    }
1645
1646    fn apply_op(state: &SubscriptionState, op: &str, topic: &str) {
1647        match op {
1648            "mark_subscribe" => state.mark_subscribe(topic),
1649            "confirm_subscribe" => state.confirm_subscribe(topic),
1650            "mark_unsubscribe" => state.mark_unsubscribe(topic),
1651            "confirm_unsubscribe" => state.confirm_unsubscribe(topic),
1652            "mark_failure" => state.mark_failure(topic),
1653            _ => panic!("Unknown operation: {op}"),
1654        }
1655    }
1656
1657    #[rstest]
1658    #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
1659    async fn test_stress_rapid_resubscribe_pattern() {
1660        // Stress test the race condition we fixed: rapid unsubscribe → resubscribe
1661        let state = Arc::new(SubscriptionState::new('.'));
1662        let mut handles = vec![];
1663
1664        for i in 0..100 {
1665            let state_clone = Arc::clone(&state);
1666            let handle = tokio::spawn(async move {
1667                let topic = format!("rapid.SYMBOL{}", i % 10); // 10 unique topics, lots of contention
1668
1669                // Initial subscribe
1670                state_clone.mark_subscribe(&topic);
1671                state_clone.confirm_subscribe(&topic);
1672
1673                // Rapid unsubscribe → resubscribe (race condition scenario)
1674                state_clone.mark_unsubscribe(&topic);
1675                // Immediately resubscribe before unsubscribe ACK
1676                state_clone.mark_subscribe(&topic);
1677                // Now unsubscribe ACK arrives
1678                state_clone.confirm_unsubscribe(&topic);
1679                // Subscribe ACK arrives
1680                state_clone.confirm_subscribe(&topic);
1681            });
1682            handles.push(handle);
1683        }
1684
1685        for handle in handles {
1686            handle.await.unwrap();
1687        }
1688
1689        check_invariants(&state, "After rapid resubscribe stress test");
1690    }
1691
1692    #[rstest]
1693    #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
1694    async fn test_stress_failure_recovery_loop() {
1695        // Stress test failure → recovery loops
1696        // Each task gets its own unique topic to avoid race conditions in the test itself
1697        let state = Arc::new(SubscriptionState::new('.'));
1698        let mut handles = vec![];
1699
1700        for i in 0..30 {
1701            let state_clone = Arc::clone(&state);
1702            let handle = tokio::spawn(async move {
1703                let topic = format!("failure.SYMBOL{i}"); // Unique topic per task
1704
1705                // Subscribe and confirm
1706                state_clone.mark_subscribe(&topic);
1707                state_clone.confirm_subscribe(&topic);
1708
1709                // Simulate multiple failures and recoveries
1710                for _ in 0..5 {
1711                    state_clone.mark_failure(&topic);
1712                    state_clone.confirm_subscribe(&topic); // Re-confirm after retry
1713                }
1714            });
1715            handles.push(handle);
1716        }
1717
1718        for handle in handles {
1719            handle.await.unwrap();
1720        }
1721
1722        check_invariants(&state, "After failure recovery loops");
1723
1724        // All should eventually be confirmed (30 unique topics)
1725        assert_eq!(state.len(), 30);
1726    }
1727}