nautilus_network/websocket/
subscription.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Generic subscription state tracking for WebSocket clients.
17//!
18//! This module provides a robust subscription tracker that maintains confirmed and pending
19//! subscription states with reference counting support. It follows a proven pattern used in
20//! production.
21//!
22//! # Key Features
23//!
24//! - **Three-state tracking**: confirmed, pending_subscribe, pending_unsubscribe.
25//! - **Reference counting**: Prevents duplicate subscribe/unsubscribe messages.
26//! - **Reconnection support**: `all_topics()` returns topics to resubscribe after reconnect.
27//! - **Configurable delimiter**: Supports different topic formats (`.` or `:` etc.).
28//!
29//! # Topic Format
30//!
31//! Topics are strings in the format `channel{delimiter}symbol`:
32//! - Dot delimiter: `tickers.BTCUSDT`
33//! - Colon delimiter: `trades:BTC-USDT`
34//!
35//! Channels without symbols are also supported (e.g., `execution` for all instruments).
36
37use std::{
38    num::NonZeroUsize,
39    sync::{Arc, LazyLock},
40};
41
42use ahash::AHashSet;
43use dashmap::DashMap;
44use ustr::Ustr;
45
46/// Marker for channel-level subscriptions (no specific symbol).
47///
48/// An empty string in the symbol set indicates a channel-level subscription
49/// that applies to all symbols for that channel.
50pub(crate) static CHANNEL_LEVEL_MARKER: LazyLock<Ustr> = LazyLock::new(|| Ustr::from(""));
51
52/// Generic subscription state tracker for WebSocket connections.
53///
54/// Maintains three separate states for subscriptions:
55/// - **Confirmed**: Successfully subscribed and actively streaming data.
56/// - **Pending Subscribe**: Subscription requested but not yet confirmed by server.
57/// - **Pending Unsubscribe**: Unsubscription requested but not yet confirmed by server.
58///
59/// # Reference Counting
60///
61/// The tracker maintains reference counts for each topic. When multiple components
62/// subscribe to the same topic, only the first subscription sends a message to the
63/// server. Similarly, only the last unsubscription sends an unsubscribe message.
64///
65/// # Thread Safety
66///
67/// All operations are thread-safe and can be called concurrently from multiple tasks.
68#[derive(Clone, Debug)]
69pub struct SubscriptionState {
70    /// Confirmed active subscriptions.
71    confirmed: Arc<DashMap<Ustr, AHashSet<Ustr>>>,
72    /// Pending subscribe requests awaiting server confirmation.
73    pending_subscribe: Arc<DashMap<Ustr, AHashSet<Ustr>>>,
74    /// Pending unsubscribe requests awaiting server confirmation.
75    pending_unsubscribe: Arc<DashMap<Ustr, AHashSet<Ustr>>>,
76    /// Reference counts for topics to prevent duplicate messages.
77    reference_counts: Arc<DashMap<Ustr, NonZeroUsize>>,
78    /// Topic delimiter character (e.g., '.' or ':').
79    delimiter: char,
80}
81
82impl SubscriptionState {
83    /// Creates a new subscription state tracker with the specified topic delimiter.
84    pub fn new(delimiter: char) -> Self {
85        Self {
86            confirmed: Arc::new(DashMap::new()),
87            pending_subscribe: Arc::new(DashMap::new()),
88            pending_unsubscribe: Arc::new(DashMap::new()),
89            reference_counts: Arc::new(DashMap::new()),
90            delimiter,
91        }
92    }
93
94    /// Returns the delimiter character used for topic splitting.
95    pub fn delimiter(&self) -> char {
96        self.delimiter
97    }
98
99    /// Returns a clone of the confirmed subscriptions map.
100    pub fn confirmed(&self) -> Arc<DashMap<Ustr, AHashSet<Ustr>>> {
101        Arc::clone(&self.confirmed)
102    }
103
104    /// Returns a clone of the pending subscribe map.
105    pub fn pending_subscribe(&self) -> Arc<DashMap<Ustr, AHashSet<Ustr>>> {
106        Arc::clone(&self.pending_subscribe)
107    }
108
109    /// Returns a clone of the pending unsubscribe map.
110    pub fn pending_unsubscribe(&self) -> Arc<DashMap<Ustr, AHashSet<Ustr>>> {
111        Arc::clone(&self.pending_unsubscribe)
112    }
113
114    /// Returns the number of confirmed subscriptions.
115    ///
116    /// Counts both channel-level and symbol-level subscriptions.
117    pub fn len(&self) -> usize {
118        self.confirmed.iter().map(|entry| entry.value().len()).sum()
119    }
120
121    /// Returns true if there are no subscriptions (confirmed or pending).
122    pub fn is_empty(&self) -> bool {
123        self.confirmed.is_empty()
124            && self.pending_subscribe.is_empty()
125            && self.pending_unsubscribe.is_empty()
126    }
127
128    /// Returns true if a channel:symbol pair is subscribed (confirmed or pending subscribe).
129    pub fn is_subscribed(&self, channel: &Ustr, symbol: &Ustr) -> bool {
130        if let Some(symbols) = self.confirmed.get(channel)
131            && symbols.contains(symbol)
132        {
133            return true;
134        }
135        if let Some(symbols) = self.pending_subscribe.get(channel)
136            && symbols.contains(symbol)
137        {
138            return true;
139        }
140        false
141    }
142
143    /// Marks a topic as pending subscription.
144    ///
145    /// This should be called after sending a subscribe request to the server.
146    /// Idempotent: if topic is already confirmed, this is a no-op.
147    /// If topic is pending unsubscription, removes it.
148    pub fn mark_subscribe(&self, topic: &str) {
149        let (channel, symbol) = split_topic(topic, self.delimiter);
150
151        // If already confirmed, don't re-add to pending (idempotent)
152        if is_tracked(&self.confirmed, channel, symbol) {
153            return;
154        }
155
156        // Remove from pending_unsubscribe if present
157        untrack_topic(&self.pending_unsubscribe, channel, symbol);
158
159        track_topic(&self.pending_subscribe, channel, symbol);
160    }
161
162    /// Marks a topic as pending unsubscription.
163    ///
164    /// This removes the topic from both confirmed and pending_subscribe,
165    /// then adds it to pending_unsubscribe. This handles the case where
166    /// a user unsubscribes before the initial subscription is confirmed.
167    pub fn mark_unsubscribe(&self, topic: &str) {
168        let (channel, symbol) = split_topic(topic, self.delimiter);
169        track_topic(&self.pending_unsubscribe, channel, symbol);
170        untrack_topic(&self.confirmed, channel, symbol);
171        untrack_topic(&self.pending_subscribe, channel, symbol);
172    }
173
174    /// Confirms a subscription by moving it from pending to confirmed.
175    ///
176    /// This should be called when the server acknowledges a subscribe request.
177    /// Ignores the confirmation if the topic is pending unsubscription (handles
178    /// late confirmations after user has already unsubscribed).
179    pub fn confirm_subscribe(&self, topic: &str) {
180        let (channel, symbol) = split_topic(topic, self.delimiter);
181
182        // Ignore late confirmations if topic is pending unsubscribe
183        if is_tracked(&self.pending_unsubscribe, channel, symbol) {
184            return;
185        }
186
187        untrack_topic(&self.pending_subscribe, channel, symbol);
188        track_topic(&self.confirmed, channel, symbol);
189    }
190
191    /// Confirms an unsubscription by removing it from pending and confirmed state.
192    ///
193    /// This should be called when the server acknowledges an unsubscribe request.
194    /// Removes the topic from pending_unsubscribe and confirmed.
195    /// Does NOT clear pending_subscribe to support immediate re-subscribe patterns
196    /// (e.g., user calls subscribe() before unsubscribe ack arrives).
197    ///
198    /// **Stale ACK handling**: Ignores unsubscribe ACKs if the topic is no longer
199    /// in pending_unsubscribe (meaning user has already re-subscribed). This prevents
200    /// stale ACKs from removing topics that were re-confirmed after the re-subscribe.
201    pub fn confirm_unsubscribe(&self, topic: &str) {
202        let (channel, symbol) = split_topic(topic, self.delimiter);
203
204        // Only process if topic is actually pending unsubscription
205        // This ignores stale unsubscribe ACKs after user has re-subscribed
206        if !is_tracked(&self.pending_unsubscribe, channel, symbol) {
207            return; // Stale ACK, ignore
208        }
209
210        untrack_topic(&self.pending_unsubscribe, channel, symbol);
211        untrack_topic(&self.confirmed, channel, symbol);
212        // Don't clear pending_subscribe - it's a valid re-subscribe request
213    }
214
215    /// Marks a subscription as failed, moving it from confirmed back to pending.
216    ///
217    /// This is useful when a subscription fails but should be retried on reconnect.
218    /// Ignores the failure if the topic is pending unsubscription (user cancelled it).
219    pub fn mark_failure(&self, topic: &str) {
220        let (channel, symbol) = split_topic(topic, self.delimiter);
221
222        // Ignore failures for topics being unsubscribed
223        if is_tracked(&self.pending_unsubscribe, channel, symbol) {
224            return;
225        }
226
227        untrack_topic(&self.confirmed, channel, symbol);
228        track_topic(&self.pending_subscribe, channel, symbol);
229    }
230
231    /// Returns all pending subscribe topics as strings.
232    pub fn pending_subscribe_topics(&self) -> Vec<String> {
233        self.topics_from_map(&self.pending_subscribe)
234    }
235
236    /// Returns all pending unsubscribe topics as strings.
237    pub fn pending_unsubscribe_topics(&self) -> Vec<String> {
238        self.topics_from_map(&self.pending_unsubscribe)
239    }
240
241    /// Returns all topics that should be active (confirmed + pending_subscribe).
242    ///
243    /// This is the key method for reconnection: it returns all topics that should
244    /// be resubscribed after a connection is re-established.
245    ///
246    /// Note: Does NOT include pending_unsubscribe topics, as those are being removed.
247    pub fn all_topics(&self) -> Vec<String> {
248        let mut topics = Vec::new();
249        topics.extend(self.topics_from_map(&self.confirmed));
250        topics.extend(self.topics_from_map(&self.pending_subscribe));
251        topics
252    }
253
254    /// Helper to convert a map to topic strings.
255    fn topics_from_map(&self, map: &DashMap<Ustr, AHashSet<Ustr>>) -> Vec<String> {
256        let mut topics = Vec::new();
257        let marker = *CHANNEL_LEVEL_MARKER;
258
259        for entry in map {
260            let channel = entry.key();
261            let symbols = entry.value();
262
263            // Check for channel-level subscription marker
264            if symbols.contains(&marker) {
265                topics.push(channel.to_string());
266            }
267
268            // Add symbol-level subscriptions (skip marker)
269            for symbol in symbols {
270                if *symbol != marker {
271                    topics.push(format!(
272                        "{}{}{}",
273                        channel.as_str(),
274                        self.delimiter,
275                        symbol.as_str()
276                    ));
277                }
278            }
279        }
280
281        topics
282    }
283
284    /// Increments the reference count for a topic.
285    ///
286    /// Returns `true` if this is the first subscription (caller should send subscribe
287    /// message to server).
288    ///
289    /// # Panics
290    ///
291    /// Panics if the reference count exceeds `usize::MAX` subscriptions for a single topic.
292    pub fn add_reference(&self, topic: &str) -> bool {
293        let mut should_subscribe = false;
294        let topic_ustr = Ustr::from(topic);
295
296        self.reference_counts
297            .entry(topic_ustr)
298            .and_modify(|count| {
299                *count = NonZeroUsize::new(count.get() + 1).expect("reference count overflow");
300            })
301            .or_insert_with(|| {
302                should_subscribe = true;
303                NonZeroUsize::new(1).expect("NonZeroUsize::new(1) should never fail")
304            });
305
306        should_subscribe
307    }
308
309    /// Decrements the reference count for a topic.
310    ///
311    /// Returns `true` if this was the last subscription (caller should send unsubscribe
312    /// message to server).
313    ///
314    /// # Panics
315    ///
316    /// Panics if the internal reference count state becomes inconsistent (should never happen
317    /// if the API is used correctly).
318    pub fn remove_reference(&self, topic: &str) -> bool {
319        let topic_ustr = Ustr::from(topic);
320
321        // Use entry API to atomically decrement and remove if zero
322        // This prevents race where another thread adds a reference between the check and remove
323        if let dashmap::mapref::entry::Entry::Occupied(mut entry) =
324            self.reference_counts.entry(topic_ustr)
325        {
326            let current = entry.get().get();
327
328            if current == 1 {
329                entry.remove();
330                return true;
331            }
332
333            *entry.get_mut() = NonZeroUsize::new(current - 1)
334                .expect("reference count should never reach zero here");
335        }
336
337        false
338    }
339
340    /// Returns the current reference count for a topic.
341    ///
342    /// Returns 0 if the topic has no references.
343    pub fn get_reference_count(&self, topic: &str) -> usize {
344        let topic_ustr = Ustr::from(topic);
345        self.reference_counts
346            .get(&topic_ustr)
347            .map_or(0, |count| count.get())
348    }
349
350    /// Clears all subscription state.
351    ///
352    /// This is useful when reconnecting or resetting the client.
353    pub fn clear(&self) {
354        self.confirmed.clear();
355        self.pending_subscribe.clear();
356        self.pending_unsubscribe.clear();
357        self.reference_counts.clear();
358    }
359}
360
361/// Splits a topic into channel and optional symbol using the specified delimiter.
362pub fn split_topic(topic: &str, delimiter: char) -> (&str, Option<&str>) {
363    topic
364        .split_once(delimiter)
365        .map_or((topic, None), |(channel, symbol)| (channel, Some(symbol)))
366}
367
368/// Tracks a topic in the given map by adding it to the channel's symbol set.
369///
370/// Channel-level subscriptions are stored using an empty string marker,
371/// allowing both channel-level and symbol-level subscriptions to coexist.
372fn track_topic(map: &DashMap<Ustr, AHashSet<Ustr>>, channel: &str, symbol: Option<&str>) {
373    let channel_ustr = Ustr::from(channel);
374    let mut entry = map.entry(channel_ustr).or_default();
375
376    if let Some(symbol) = symbol {
377        entry.insert(Ustr::from(symbol));
378    } else {
379        entry.insert(*CHANNEL_LEVEL_MARKER);
380    }
381}
382
383/// Removes a topic from the given map by removing it from the channel's symbol set.
384///
385/// Removes the entire channel entry if no subscriptions remain after removal.
386fn untrack_topic(map: &DashMap<Ustr, AHashSet<Ustr>>, channel: &str, symbol: Option<&str>) {
387    let channel_ustr = Ustr::from(channel);
388    let symbol_to_remove = if let Some(symbol) = symbol {
389        Ustr::from(symbol)
390    } else {
391        *CHANNEL_LEVEL_MARKER
392    };
393
394    // Use entry API to atomically remove symbol and check if empty
395    // This prevents race conditions where another thread adds a symbol between operations
396    if let dashmap::mapref::entry::Entry::Occupied(mut entry) = map.entry(channel_ustr) {
397        entry.get_mut().remove(&symbol_to_remove);
398        if entry.get().is_empty() {
399            entry.remove();
400        }
401    }
402}
403
404/// Checks if a topic exists in the given map.
405fn is_tracked(map: &DashMap<Ustr, AHashSet<Ustr>>, channel: &str, symbol: Option<&str>) -> bool {
406    let channel_ustr = Ustr::from(channel);
407    let symbol_to_check = if let Some(symbol) = symbol {
408        Ustr::from(symbol)
409    } else {
410        *CHANNEL_LEVEL_MARKER
411    };
412
413    if let Some(entry) = map.get(&channel_ustr) {
414        entry.contains(&symbol_to_check)
415    } else {
416        false
417    }
418}
419
420#[cfg(test)]
421mod tests {
422    use rstest::rstest;
423
424    use super::*;
425
426    #[rstest]
427    fn test_split_topic_with_symbol() {
428        let (channel, symbol) = split_topic("tickers.BTCUSDT", '.');
429        assert_eq!(channel, "tickers");
430        assert_eq!(symbol, Some("BTCUSDT"));
431
432        let (channel, symbol) = split_topic("orderBookL2:XBTUSD", ':');
433        assert_eq!(channel, "orderBookL2");
434        assert_eq!(symbol, Some("XBTUSD"));
435    }
436
437    #[rstest]
438    fn test_split_topic_without_symbol() {
439        let (channel, symbol) = split_topic("orderbook", '.');
440        assert_eq!(channel, "orderbook");
441        assert_eq!(symbol, None);
442    }
443
444    #[rstest]
445    fn test_new_state_is_empty() {
446        let state = SubscriptionState::new('.');
447        assert!(state.is_empty());
448        assert_eq!(state.len(), 0);
449    }
450
451    #[rstest]
452    fn test_mark_subscribe() {
453        let state = SubscriptionState::new('.');
454        state.mark_subscribe("tickers.BTCUSDT");
455
456        assert_eq!(state.pending_subscribe_topics(), vec!["tickers.BTCUSDT"]);
457        assert_eq!(state.len(), 0); // Not confirmed yet
458    }
459
460    #[rstest]
461    fn test_confirm_subscribe() {
462        let state = SubscriptionState::new('.');
463        state.mark_subscribe("tickers.BTCUSDT");
464        state.confirm_subscribe("tickers.BTCUSDT");
465
466        assert!(state.pending_subscribe_topics().is_empty());
467        assert_eq!(state.len(), 1);
468    }
469
470    #[rstest]
471    fn test_is_subscribed_empty_state() {
472        let state = SubscriptionState::new('.');
473        let channel = Ustr::from("tickers");
474        let symbol = Ustr::from("BTCUSDT");
475
476        assert!(!state.is_subscribed(&channel, &symbol));
477    }
478
479    #[rstest]
480    fn test_is_subscribed_pending() {
481        let state = SubscriptionState::new('.');
482        let channel = Ustr::from("tickers");
483        let symbol = Ustr::from("BTCUSDT");
484
485        state.mark_subscribe("tickers.BTCUSDT");
486
487        assert!(state.is_subscribed(&channel, &symbol));
488    }
489
490    #[rstest]
491    fn test_is_subscribed_confirmed() {
492        let state = SubscriptionState::new('.');
493        let channel = Ustr::from("tickers");
494        let symbol = Ustr::from("BTCUSDT");
495
496        state.mark_subscribe("tickers.BTCUSDT");
497        state.confirm_subscribe("tickers.BTCUSDT");
498
499        assert!(state.is_subscribed(&channel, &symbol));
500    }
501
502    #[rstest]
503    fn test_is_subscribed_after_unsubscribe() {
504        let state = SubscriptionState::new('.');
505        let channel = Ustr::from("tickers");
506        let symbol = Ustr::from("BTCUSDT");
507
508        state.mark_subscribe("tickers.BTCUSDT");
509        state.confirm_subscribe("tickers.BTCUSDT");
510        state.mark_unsubscribe("tickers.BTCUSDT");
511
512        // Pending unsubscribe should not count as subscribed
513        assert!(!state.is_subscribed(&channel, &symbol));
514    }
515
516    #[rstest]
517    fn test_is_subscribed_after_confirm_unsubscribe() {
518        let state = SubscriptionState::new('.');
519        let channel = Ustr::from("tickers");
520        let symbol = Ustr::from("BTCUSDT");
521
522        state.mark_subscribe("tickers.BTCUSDT");
523        state.confirm_subscribe("tickers.BTCUSDT");
524        state.mark_unsubscribe("tickers.BTCUSDT");
525        state.confirm_unsubscribe("tickers.BTCUSDT");
526
527        assert!(!state.is_subscribed(&channel, &symbol));
528    }
529
530    #[rstest]
531    fn test_mark_unsubscribe() {
532        let state = SubscriptionState::new('.');
533        state.mark_subscribe("tickers.BTCUSDT");
534        state.confirm_subscribe("tickers.BTCUSDT");
535        state.mark_unsubscribe("tickers.BTCUSDT");
536
537        assert_eq!(state.len(), 0); // Removed from confirmed
538        assert_eq!(state.pending_unsubscribe_topics(), vec!["tickers.BTCUSDT"]);
539    }
540
541    #[rstest]
542    fn test_confirm_unsubscribe() {
543        let state = SubscriptionState::new('.');
544        state.mark_subscribe("tickers.BTCUSDT");
545        state.confirm_subscribe("tickers.BTCUSDT");
546        state.mark_unsubscribe("tickers.BTCUSDT");
547        state.confirm_unsubscribe("tickers.BTCUSDT");
548
549        assert!(state.is_empty());
550    }
551
552    #[rstest]
553    fn test_resubscribe_before_unsubscribe_ack() {
554        // Regression test for race condition:
555        // User unsubscribes, then immediately resubscribes before the unsubscribe ACK arrives.
556        // The unsubscribe ACK should NOT clear the pending_subscribe entry.
557        let state = SubscriptionState::new('.');
558
559        state.mark_subscribe("tickers.BTCUSDT");
560        state.confirm_subscribe("tickers.BTCUSDT");
561        assert_eq!(state.len(), 1);
562
563        state.mark_unsubscribe("tickers.BTCUSDT");
564        assert_eq!(state.len(), 0);
565        assert_eq!(state.pending_unsubscribe_topics(), vec!["tickers.BTCUSDT"]);
566
567        // User immediately resubscribes (before unsubscribe ACK)
568        state.mark_subscribe("tickers.BTCUSDT");
569        assert_eq!(state.pending_subscribe_topics(), vec!["tickers.BTCUSDT"]);
570
571        // Stale unsubscribe ACK arrives - should be ignored (pending_unsubscribe already cleared)
572        state.confirm_unsubscribe("tickers.BTCUSDT");
573        assert!(state.pending_unsubscribe_topics().is_empty());
574        assert_eq!(state.pending_subscribe_topics(), vec!["tickers.BTCUSDT"]); // Must still be pending
575
576        // Subscribe ACK confirms successfully
577        state.confirm_subscribe("tickers.BTCUSDT");
578        assert_eq!(state.len(), 1);
579        assert!(state.pending_subscribe_topics().is_empty());
580
581        // Topic available for reconnect
582        let all = state.all_topics();
583        assert_eq!(all.len(), 1);
584        assert!(all.contains(&"tickers.BTCUSDT".to_string()));
585    }
586
587    #[rstest]
588    fn test_stale_unsubscribe_ack_after_resubscribe_confirmed() {
589        // Regression test for P1 bug: Stale unsubscribe ACK removing confirmed topic.
590        // Scenario: User unsubscribes, immediately resubscribes, subscribe ACK arrives
591        // FIRST (out of order), then stale unsubscribe ACK arrives.
592        // The stale ACK must NOT remove the topic from confirmed state.
593        let state = SubscriptionState::new('.');
594
595        // Initial subscription
596        state.mark_subscribe("tickers.BTCUSDT");
597        state.confirm_subscribe("tickers.BTCUSDT");
598        assert_eq!(state.len(), 1);
599
600        // User unsubscribes
601        state.mark_unsubscribe("tickers.BTCUSDT");
602        assert_eq!(state.len(), 0);
603        assert_eq!(state.pending_unsubscribe_topics(), vec!["tickers.BTCUSDT"]);
604
605        // User immediately resubscribes (before unsubscribe ACK)
606        state.mark_subscribe("tickers.BTCUSDT");
607        assert!(state.pending_unsubscribe_topics().is_empty()); // Cleared by mark_subscribe
608        assert_eq!(state.pending_subscribe_topics(), vec!["tickers.BTCUSDT"]);
609
610        // Subscribe ACK arrives FIRST (out of order!)
611        state.confirm_subscribe("tickers.BTCUSDT");
612        assert_eq!(state.len(), 1); // Back in confirmed
613        assert!(state.pending_subscribe_topics().is_empty());
614
615        // NOW the stale unsubscribe ACK arrives
616        // This must be ignored because topic is no longer in pending_unsubscribe
617        state.confirm_unsubscribe("tickers.BTCUSDT");
618
619        // Topic should STILL be confirmed (not removed by stale ACK)
620        assert_eq!(state.len(), 1); // Must remain confirmed
621        assert!(state.pending_unsubscribe_topics().is_empty());
622        assert!(state.pending_subscribe_topics().is_empty());
623
624        // Topic should be in all_topics (for reconnect)
625        let all = state.all_topics();
626        assert_eq!(all.len(), 1);
627        assert!(all.contains(&"tickers.BTCUSDT".to_string()));
628    }
629
630    #[rstest]
631    fn test_mark_failure() {
632        let state = SubscriptionState::new('.');
633        state.mark_subscribe("tickers.BTCUSDT");
634        state.confirm_subscribe("tickers.BTCUSDT");
635        state.mark_failure("tickers.BTCUSDT");
636
637        assert_eq!(state.len(), 0);
638        assert_eq!(state.pending_subscribe_topics(), vec!["tickers.BTCUSDT"]);
639    }
640
641    #[rstest]
642    fn test_all_topics_includes_confirmed_and_pending_subscribe() {
643        let state = SubscriptionState::new('.');
644        state.mark_subscribe("tickers.BTCUSDT");
645        state.confirm_subscribe("tickers.BTCUSDT");
646        state.mark_subscribe("tickers.ETHUSDT");
647
648        let topics = state.all_topics();
649        assert_eq!(topics.len(), 2);
650        assert!(topics.contains(&"tickers.BTCUSDT".to_string()));
651        assert!(topics.contains(&"tickers.ETHUSDT".to_string()));
652    }
653
654    #[rstest]
655    fn test_all_topics_excludes_pending_unsubscribe() {
656        let state = SubscriptionState::new('.');
657        state.mark_subscribe("tickers.BTCUSDT");
658        state.confirm_subscribe("tickers.BTCUSDT");
659        state.mark_unsubscribe("tickers.BTCUSDT");
660
661        let topics = state.all_topics();
662        assert!(topics.is_empty());
663    }
664
665    #[rstest]
666    fn test_reference_counting_single_topic() {
667        let state = SubscriptionState::new('.');
668
669        assert!(state.add_reference("tickers.BTCUSDT"));
670        assert_eq!(state.get_reference_count("tickers.BTCUSDT"), 1);
671
672        assert!(!state.add_reference("tickers.BTCUSDT"));
673        assert_eq!(state.get_reference_count("tickers.BTCUSDT"), 2);
674
675        assert!(!state.remove_reference("tickers.BTCUSDT"));
676        assert_eq!(state.get_reference_count("tickers.BTCUSDT"), 1);
677
678        assert!(state.remove_reference("tickers.BTCUSDT"));
679        assert_eq!(state.get_reference_count("tickers.BTCUSDT"), 0);
680    }
681
682    #[rstest]
683    fn test_reference_counting_multiple_topics() {
684        let state = SubscriptionState::new('.');
685
686        assert!(state.add_reference("tickers.BTCUSDT"));
687        assert!(state.add_reference("tickers.ETHUSDT"));
688
689        assert!(!state.add_reference("tickers.BTCUSDT"));
690        assert_eq!(state.get_reference_count("tickers.BTCUSDT"), 2);
691        assert_eq!(state.get_reference_count("tickers.ETHUSDT"), 1);
692
693        assert!(!state.remove_reference("tickers.BTCUSDT"));
694        assert!(state.remove_reference("tickers.ETHUSDT"));
695    }
696
697    #[rstest]
698    fn test_topic_without_symbol() {
699        let state = SubscriptionState::new('.');
700        state.mark_subscribe("orderbook");
701        state.confirm_subscribe("orderbook");
702
703        assert_eq!(state.len(), 1);
704        assert_eq!(state.all_topics(), vec!["orderbook"]);
705    }
706
707    #[rstest]
708    fn test_different_delimiters() {
709        let state_dot = SubscriptionState::new('.');
710        state_dot.mark_subscribe("tickers.BTCUSDT");
711        assert_eq!(
712            state_dot.pending_subscribe_topics(),
713            vec!["tickers.BTCUSDT"]
714        );
715
716        let state_colon = SubscriptionState::new(':');
717        state_colon.mark_subscribe("orderBookL2:XBTUSD");
718        assert_eq!(
719            state_colon.pending_subscribe_topics(),
720            vec!["orderBookL2:XBTUSD"]
721        );
722    }
723
724    #[rstest]
725    fn test_clear() {
726        let state = SubscriptionState::new('.');
727        state.mark_subscribe("tickers.BTCUSDT");
728        state.confirm_subscribe("tickers.BTCUSDT");
729        state.add_reference("tickers.BTCUSDT");
730
731        state.clear();
732
733        assert!(state.is_empty());
734        assert_eq!(state.get_reference_count("tickers.BTCUSDT"), 0);
735    }
736
737    #[rstest]
738    fn test_multiple_symbols_same_channel() {
739        let state = SubscriptionState::new('.');
740        state.mark_subscribe("tickers.BTCUSDT");
741        state.mark_subscribe("tickers.ETHUSDT");
742        state.confirm_subscribe("tickers.BTCUSDT");
743        state.confirm_subscribe("tickers.ETHUSDT");
744
745        assert_eq!(state.len(), 2);
746        let topics = state.all_topics();
747        assert!(topics.contains(&"tickers.BTCUSDT".to_string()));
748        assert!(topics.contains(&"tickers.ETHUSDT".to_string()));
749    }
750
751    #[rstest]
752    fn test_mixed_channel_and_symbol_subscriptions() {
753        let state = SubscriptionState::new('.');
754
755        // Subscribe to channel-level first
756        state.mark_subscribe("tickers");
757        state.confirm_subscribe("tickers");
758        assert_eq!(state.len(), 1);
759        assert_eq!(state.all_topics(), vec!["tickers"]);
760
761        // Add symbol-level subscription to same channel
762        state.mark_subscribe("tickers.BTCUSDT");
763        state.confirm_subscribe("tickers.BTCUSDT");
764        assert_eq!(state.len(), 2);
765
766        // Both should be present
767        let topics = state.all_topics();
768        assert_eq!(topics.len(), 2);
769        assert!(topics.contains(&"tickers".to_string()));
770        assert!(topics.contains(&"tickers.BTCUSDT".to_string()));
771
772        // Add another symbol
773        state.mark_subscribe("tickers.ETHUSDT");
774        state.confirm_subscribe("tickers.ETHUSDT");
775        assert_eq!(state.len(), 3);
776
777        let topics = state.all_topics();
778        assert_eq!(topics.len(), 3);
779        assert!(topics.contains(&"tickers".to_string()));
780        assert!(topics.contains(&"tickers.BTCUSDT".to_string()));
781        assert!(topics.contains(&"tickers.ETHUSDT".to_string()));
782
783        // Unsubscribe from channel-level only
784        state.mark_unsubscribe("tickers");
785        state.confirm_unsubscribe("tickers");
786        assert_eq!(state.len(), 2);
787
788        let topics = state.all_topics();
789        assert_eq!(topics.len(), 2);
790        assert!(!topics.contains(&"tickers".to_string()));
791        assert!(topics.contains(&"tickers.BTCUSDT".to_string()));
792        assert!(topics.contains(&"tickers.ETHUSDT".to_string()));
793    }
794
795    #[rstest]
796    fn test_symbol_subscription_before_channel() {
797        let state = SubscriptionState::new('.');
798
799        // Subscribe to symbol first
800        state.mark_subscribe("tickers.BTCUSDT");
801        state.confirm_subscribe("tickers.BTCUSDT");
802        assert_eq!(state.len(), 1);
803
804        // Then add channel-level
805        state.mark_subscribe("tickers");
806        state.confirm_subscribe("tickers");
807        assert_eq!(state.len(), 2);
808
809        // Both should be present after reconnect
810        let topics = state.all_topics();
811        assert_eq!(topics.len(), 2);
812        assert!(topics.contains(&"tickers".to_string()));
813        assert!(topics.contains(&"tickers.BTCUSDT".to_string()));
814    }
815
816    #[rstest]
817    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
818    async fn test_concurrent_subscribe_same_topic() {
819        let state = Arc::new(SubscriptionState::new('.'));
820        let mut handles = vec![];
821
822        // Spawn 10 tasks all subscribing to the same topic
823        for _ in 0..10 {
824            let state_clone = Arc::clone(&state);
825            let handle = tokio::spawn(async move {
826                state_clone.add_reference("tickers.BTCUSDT");
827                state_clone.mark_subscribe("tickers.BTCUSDT");
828                state_clone.confirm_subscribe("tickers.BTCUSDT");
829            });
830            handles.push(handle);
831        }
832
833        for handle in handles {
834            handle.await.unwrap();
835        }
836
837        // Reference count should be exactly 10
838        assert_eq!(state.get_reference_count("tickers.BTCUSDT"), 10);
839        assert_eq!(state.len(), 1);
840    }
841
842    #[rstest]
843    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
844    async fn test_concurrent_subscribe_unsubscribe() {
845        let state = Arc::new(SubscriptionState::new('.'));
846        let mut handles = vec![];
847
848        // Spawn 20 tasks, each adding 2 references to their own unique topic
849        // This ensures deterministic behavior - we know exactly what the final state should be
850        for i in 0..20 {
851            let state_clone = Arc::clone(&state);
852            let handle = tokio::spawn(async move {
853                let topic = format!("tickers.SYMBOL{i}");
854                // Add 2 references
855                state_clone.add_reference(&topic);
856                state_clone.add_reference(&topic);
857                state_clone.mark_subscribe(&topic);
858                state_clone.confirm_subscribe(&topic);
859
860                // Remove 1 reference (should still have 1 remaining)
861                state_clone.remove_reference(&topic);
862            });
863            handles.push(handle);
864        }
865
866        for handle in handles {
867            handle.await.unwrap();
868        }
869
870        // Each of the 20 topics should still have 1 reference
871        for i in 0..20 {
872            let topic = format!("tickers.SYMBOL{i}");
873            assert_eq!(state.get_reference_count(&topic), 1);
874        }
875
876        // Should have exactly 20 confirmed subscriptions
877        assert_eq!(state.len(), 20);
878        assert!(!state.is_empty());
879    }
880
881    #[rstest]
882    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
883    async fn test_concurrent_reference_counting_same_topic() {
884        let state = Arc::new(SubscriptionState::new('.'));
885        let topic = "tickers.BTCUSDT";
886        let mut handles = vec![];
887
888        // Spawn 10 tasks all adding 10 references to the same topic
889        for _ in 0..10 {
890            let state_clone = Arc::clone(&state);
891            let handle = tokio::spawn(async move {
892                for _ in 0..10 {
893                    state_clone.add_reference(topic);
894                }
895            });
896            handles.push(handle);
897        }
898
899        for handle in handles {
900            handle.await.unwrap();
901        }
902
903        // Should have exactly 100 references (10 tasks * 10 refs each)
904        assert_eq!(state.get_reference_count(topic), 100);
905
906        // Now remove 50 references sequentially
907        for _ in 0..50 {
908            state.remove_reference(topic);
909        }
910
911        // Should have exactly 50 references remaining
912        assert_eq!(state.get_reference_count(topic), 50);
913    }
914
915    #[rstest]
916    fn test_reconnection_scenario() {
917        let state = SubscriptionState::new('.');
918
919        // Initial subscriptions
920        state.add_reference("tickers.BTCUSDT");
921        state.mark_subscribe("tickers.BTCUSDT");
922        state.confirm_subscribe("tickers.BTCUSDT");
923
924        state.add_reference("tickers.ETHUSDT");
925        state.mark_subscribe("tickers.ETHUSDT");
926        state.confirm_subscribe("tickers.ETHUSDT");
927
928        state.add_reference("orderbook");
929        state.mark_subscribe("orderbook");
930        state.confirm_subscribe("orderbook");
931
932        assert_eq!(state.len(), 3);
933
934        // Simulate disconnect - topics should be available for resubscription
935        let topics_to_resubscribe = state.all_topics();
936        assert_eq!(topics_to_resubscribe.len(), 3);
937        assert!(topics_to_resubscribe.contains(&"tickers.BTCUSDT".to_string()));
938        assert!(topics_to_resubscribe.contains(&"tickers.ETHUSDT".to_string()));
939        assert!(topics_to_resubscribe.contains(&"orderbook".to_string()));
940
941        // On reconnect, mark all as pending again
942        for topic in &topics_to_resubscribe {
943            state.mark_subscribe(topic);
944        }
945
946        // Simulate server confirmations
947        for topic in &topics_to_resubscribe {
948            state.confirm_subscribe(topic);
949        }
950
951        // Should still have all 3 subscriptions
952        assert_eq!(state.len(), 3);
953        assert_eq!(state.all_topics().len(), 3);
954    }
955
956    #[rstest]
957    fn test_state_machine_invalid_transitions() {
958        let state = SubscriptionState::new('.');
959
960        // Confirm subscribe without marking first - should not crash
961        state.confirm_subscribe("tickers.BTCUSDT");
962        assert_eq!(state.len(), 1); // Gets added to confirmed
963
964        // Confirm unsubscribe without marking first - should not crash
965        state.confirm_unsubscribe("tickers.ETHUSDT");
966        assert_eq!(state.len(), 1); // Nothing changes
967
968        // Double confirm subscribe
969        state.mark_subscribe("orderbook");
970        state.confirm_subscribe("orderbook");
971        state.confirm_subscribe("orderbook"); // Second confirm is idempotent
972        assert_eq!(state.len(), 2);
973
974        // Unsubscribe something that was never subscribed
975        state.mark_unsubscribe("nonexistent");
976        state.confirm_unsubscribe("nonexistent");
977        assert_eq!(state.len(), 2); // Still 2
978    }
979
980    #[rstest]
981    fn test_mark_failure_moves_to_pending() {
982        let state = SubscriptionState::new('.');
983
984        // Subscribe and confirm
985        state.mark_subscribe("tickers.BTCUSDT");
986        state.confirm_subscribe("tickers.BTCUSDT");
987        assert_eq!(state.len(), 1);
988        assert!(state.pending_subscribe_topics().is_empty());
989
990        // Mark as failed
991        state.mark_failure("tickers.BTCUSDT");
992
993        // Should be removed from confirmed and back in pending
994        assert_eq!(state.len(), 0);
995        assert_eq!(state.pending_subscribe_topics(), vec!["tickers.BTCUSDT"]);
996
997        // all_topics should still include it for reconnection
998        assert_eq!(state.all_topics(), vec!["tickers.BTCUSDT"]);
999    }
1000
1001    #[rstest]
1002    fn test_pending_subscribe_excludes_pending_unsubscribe() {
1003        let state = SubscriptionState::new('.');
1004
1005        // Subscribe and confirm
1006        state.mark_subscribe("tickers.BTCUSDT");
1007        state.confirm_subscribe("tickers.BTCUSDT");
1008
1009        // Mark for unsubscribe
1010        state.mark_unsubscribe("tickers.BTCUSDT");
1011
1012        // Should be in pending_unsubscribe but NOT in all_topics
1013        assert_eq!(state.pending_unsubscribe_topics(), vec!["tickers.BTCUSDT"]);
1014        assert!(state.all_topics().is_empty());
1015        assert_eq!(state.len(), 0);
1016    }
1017
1018    #[rstest]
1019    fn test_remove_reference_nonexistent_topic() {
1020        let state = SubscriptionState::new('.');
1021
1022        // Removing reference to topic that was never added
1023        let should_unsubscribe = state.remove_reference("nonexistent");
1024
1025        // Should return false and not crash
1026        assert!(!should_unsubscribe);
1027        assert_eq!(state.get_reference_count("nonexistent"), 0);
1028    }
1029
1030    #[rstest]
1031    fn test_edge_case_empty_channel_name() {
1032        let state = SubscriptionState::new('.');
1033
1034        // Edge case: empty string as topic
1035        state.mark_subscribe("");
1036        state.confirm_subscribe("");
1037
1038        assert_eq!(state.len(), 1);
1039        assert_eq!(state.all_topics(), vec![""]);
1040    }
1041
1042    #[rstest]
1043    fn test_special_characters_in_topics() {
1044        let state = SubscriptionState::new('.');
1045
1046        // Topics with special characters
1047        let special_topics = vec![
1048            "channel.symbol-with-dash",
1049            "channel.SYMBOL_WITH_UNDERSCORE",
1050            "channel.symbol123",
1051            "channel.symbol@special",
1052        ];
1053
1054        for topic in &special_topics {
1055            state.mark_subscribe(topic);
1056            state.confirm_subscribe(topic);
1057        }
1058
1059        assert_eq!(state.len(), special_topics.len());
1060
1061        let all_topics = state.all_topics();
1062        for topic in &special_topics {
1063            assert!(
1064                all_topics.contains(&(*topic).to_string()),
1065                "Missing topic: {topic}"
1066            );
1067        }
1068    }
1069
1070    #[rstest]
1071    fn test_clear_resets_all_state() {
1072        let state = SubscriptionState::new('.');
1073
1074        // Add multiple subscriptions and references
1075        for i in 0..10 {
1076            let topic = format!("channel{i}.SYMBOL");
1077            state.add_reference(&topic);
1078            state.add_reference(&topic); // Add twice
1079            state.mark_subscribe(&topic);
1080            state.confirm_subscribe(&topic);
1081        }
1082
1083        assert_eq!(state.len(), 10);
1084        assert!(!state.is_empty());
1085
1086        // Clear everything
1087        state.clear();
1088
1089        // Verify complete reset
1090        assert_eq!(state.len(), 0);
1091        assert!(state.is_empty());
1092        assert!(state.all_topics().is_empty());
1093        assert!(state.pending_subscribe_topics().is_empty());
1094        assert!(state.pending_unsubscribe_topics().is_empty());
1095
1096        // Verify reference counts are cleared
1097        for i in 0..10 {
1098            let topic = format!("channel{i}.SYMBOL");
1099            assert_eq!(state.get_reference_count(&topic), 0);
1100        }
1101    }
1102
1103    #[rstest]
1104    fn test_different_delimiter_does_not_affect_storage() {
1105        // Verify delimiter is only used for parsing, not storage
1106        let state_dot = SubscriptionState::new('.');
1107        let state_colon = SubscriptionState::new(':');
1108
1109        // Add same logical subscription with different delimiters
1110        state_dot.mark_subscribe("channel.SYMBOL");
1111        state_colon.mark_subscribe("channel:SYMBOL");
1112
1113        // Both should work correctly
1114        assert_eq!(state_dot.pending_subscribe_topics(), vec!["channel.SYMBOL"]);
1115        assert_eq!(
1116            state_colon.pending_subscribe_topics(),
1117            vec!["channel:SYMBOL"]
1118        );
1119    }
1120
1121    #[rstest]
1122    fn test_unsubscribe_before_subscribe_confirmed() {
1123        let state = SubscriptionState::new('.');
1124
1125        // User subscribes
1126        state.mark_subscribe("tickers.BTCUSDT");
1127        assert_eq!(state.pending_subscribe_topics(), vec!["tickers.BTCUSDT"]);
1128
1129        // User immediately changes mind before server confirms
1130        state.mark_unsubscribe("tickers.BTCUSDT");
1131
1132        // Should be removed from pending_subscribe and added to pending_unsubscribe
1133        assert!(state.pending_subscribe_topics().is_empty());
1134        assert_eq!(state.pending_unsubscribe_topics(), vec!["tickers.BTCUSDT"]);
1135
1136        // Confirm the unsubscribe
1137        state.confirm_unsubscribe("tickers.BTCUSDT");
1138
1139        // Should be completely gone
1140        assert!(state.is_empty());
1141        assert!(state.all_topics().is_empty());
1142        assert_eq!(state.len(), 0);
1143    }
1144
1145    #[rstest]
1146    fn test_late_subscribe_confirmation_after_unsubscribe() {
1147        let state = SubscriptionState::new('.');
1148
1149        // User subscribes
1150        state.mark_subscribe("tickers.BTCUSDT");
1151
1152        // User immediately unsubscribes
1153        state.mark_unsubscribe("tickers.BTCUSDT");
1154
1155        // Late subscribe confirmation arrives from server
1156        state.confirm_subscribe("tickers.BTCUSDT");
1157
1158        // Should NOT be added to confirmed (unsubscribe takes precedence)
1159        assert_eq!(state.len(), 0);
1160        assert!(state.pending_subscribe_topics().is_empty());
1161
1162        // Confirm the unsubscribe
1163        state.confirm_unsubscribe("tickers.BTCUSDT");
1164
1165        // Should still be empty
1166        assert!(state.is_empty());
1167        assert!(state.all_topics().is_empty());
1168    }
1169
1170    #[rstest]
1171    fn test_unsubscribe_clears_all_states() {
1172        let state = SubscriptionState::new('.');
1173
1174        // Subscribe and confirm
1175        state.mark_subscribe("tickers.BTCUSDT");
1176        state.confirm_subscribe("tickers.BTCUSDT");
1177        assert_eq!(state.len(), 1);
1178
1179        // Unsubscribe
1180        state.mark_unsubscribe("tickers.BTCUSDT");
1181
1182        // Should be removed from confirmed
1183        assert_eq!(state.len(), 0);
1184        assert_eq!(state.pending_unsubscribe_topics(), vec!["tickers.BTCUSDT"]);
1185
1186        // Late subscribe confirmation somehow arrives (race condition)
1187        state.confirm_subscribe("tickers.BTCUSDT");
1188
1189        // confirm_unsubscribe should clean everything
1190        state.confirm_unsubscribe("tickers.BTCUSDT");
1191
1192        // Completely empty
1193        assert!(state.is_empty());
1194        assert_eq!(state.len(), 0);
1195        assert!(state.pending_subscribe_topics().is_empty());
1196        assert!(state.pending_unsubscribe_topics().is_empty());
1197        assert!(state.all_topics().is_empty());
1198    }
1199
1200    #[rstest]
1201    fn test_mark_failure_respects_pending_unsubscribe() {
1202        let state = SubscriptionState::new('.');
1203
1204        // Subscribe and confirm
1205        state.mark_subscribe("tickers.BTCUSDT");
1206        state.confirm_subscribe("tickers.BTCUSDT");
1207        assert_eq!(state.len(), 1);
1208
1209        // User unsubscribes
1210        state.mark_unsubscribe("tickers.BTCUSDT");
1211        assert_eq!(state.len(), 0);
1212        assert_eq!(state.pending_unsubscribe_topics(), vec!["tickers.BTCUSDT"]);
1213
1214        // Meanwhile, a network error triggers mark_failure
1215        state.mark_failure("tickers.BTCUSDT");
1216
1217        // Should NOT be added to pending_subscribe (user wanted to unsubscribe)
1218        assert!(state.pending_subscribe_topics().is_empty());
1219        assert_eq!(state.pending_unsubscribe_topics(), vec!["tickers.BTCUSDT"]);
1220
1221        // all_topics should NOT include it
1222        assert!(state.all_topics().is_empty());
1223
1224        // Confirm unsubscribe
1225        state.confirm_unsubscribe("tickers.BTCUSDT");
1226        assert!(state.is_empty());
1227    }
1228
1229    #[rstest]
1230    #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
1231    async fn test_concurrent_stress_mixed_operations() {
1232        let state = Arc::new(SubscriptionState::new('.'));
1233        let mut handles = vec![];
1234
1235        // Spawn 50 tasks doing random interleaved operations
1236        for i in 0..50 {
1237            let state_clone = Arc::clone(&state);
1238            let handle = tokio::spawn(async move {
1239                let topic1 = format!("channel.SYMBOL{i}");
1240                let topic2 = format!("channel.SYMBOL{}", i + 100);
1241
1242                // Add references
1243                state_clone.add_reference(&topic1);
1244                state_clone.add_reference(&topic2);
1245
1246                // Mark and confirm subscriptions
1247                state_clone.mark_subscribe(&topic1);
1248                state_clone.confirm_subscribe(&topic1);
1249                state_clone.mark_subscribe(&topic2);
1250
1251                // Interleave some unsubscribes
1252                if i % 3 == 0 {
1253                    state_clone.mark_unsubscribe(&topic1);
1254                    state_clone.confirm_unsubscribe(&topic1);
1255                }
1256
1257                // More reference operations
1258                state_clone.add_reference(&topic2);
1259                state_clone.remove_reference(&topic2);
1260
1261                // Confirm topic2
1262                state_clone.confirm_subscribe(&topic2);
1263            });
1264            handles.push(handle);
1265        }
1266
1267        for handle in handles {
1268            handle.await.unwrap();
1269        }
1270
1271        // Verify state is consistent (no panics, all maps accessible)
1272        let all = state.all_topics();
1273        let confirmed_count = state.len();
1274
1275        // We have 50 topic2s (always confirmed) + topic1s (50 - number unsubscribed)
1276        // About 17 topic1s get unsubscribed (i % 3 == 0), leaving ~33 topic1s + 50 topic2s = ~83
1277        assert!(confirmed_count > 50); // At least all topic2s
1278        assert!(confirmed_count <= 100); // At most all topic1s + topic2s
1279        assert_eq!(
1280            all.len(),
1281            confirmed_count + state.pending_subscribe_topics().len()
1282        );
1283    }
1284
1285    #[rstest]
1286    fn test_edge_case_malformed_topics() {
1287        let state = SubscriptionState::new('.');
1288
1289        // Topics with multiple delimiters (splits on first delimiter)
1290        state.mark_subscribe("channel.symbol.extra");
1291        state.confirm_subscribe("channel.symbol.extra");
1292        let topics = state.all_topics();
1293        assert!(topics.contains(&"channel.symbol.extra".to_string()));
1294
1295        // Topic with leading delimiter (empty channel, symbol is "channel")
1296        state.mark_subscribe(".channel");
1297        state.confirm_subscribe(".channel");
1298        assert_eq!(state.len(), 2);
1299
1300        // Topic with trailing delimiter - treated as channel-level (empty symbol = marker)
1301        // "channel." splits to ("channel", Some("")), and empty string is the channel marker
1302        state.mark_subscribe("channel.");
1303        state.confirm_subscribe("channel.");
1304        assert_eq!(state.len(), 3);
1305
1306        // Topic without delimiter - explicitly channel-level
1307        state.mark_subscribe("tickers");
1308        state.confirm_subscribe("tickers");
1309        assert_eq!(state.len(), 4);
1310
1311        // Verify all are retrievable (note: "channel." becomes "channel")
1312        let all = state.all_topics();
1313        assert_eq!(all.len(), 4);
1314        assert!(all.contains(&"channel.symbol.extra".to_string()));
1315        assert!(all.contains(&".channel".to_string()));
1316        assert!(all.contains(&"channel".to_string())); // "channel." treated as channel-level
1317        assert!(all.contains(&"tickers".to_string()));
1318    }
1319
1320    #[rstest]
1321    fn test_reference_count_underflow_safety() {
1322        let state = SubscriptionState::new('.');
1323
1324        // Remove without ever adding
1325        assert!(!state.remove_reference("never.added"));
1326        assert_eq!(state.get_reference_count("never.added"), 0);
1327
1328        // Add one, remove multiple times
1329        state.add_reference("once.added");
1330        assert_eq!(state.get_reference_count("once.added"), 1);
1331
1332        assert!(state.remove_reference("once.added")); // Should return true (last ref)
1333        assert_eq!(state.get_reference_count("once.added"), 0);
1334
1335        assert!(!state.remove_reference("once.added")); // Should not crash, returns false
1336        assert!(!state.remove_reference("once.added")); // Multiple times
1337        assert_eq!(state.get_reference_count("once.added"), 0);
1338
1339        // Verify we can add again after underflow attempts
1340        assert!(state.add_reference("once.added"));
1341        assert_eq!(state.get_reference_count("once.added"), 1);
1342    }
1343
1344    #[rstest]
1345    fn test_reconnection_with_partial_state() {
1346        let state = SubscriptionState::new('.');
1347
1348        // Setup: Some confirmed, some pending subscribe, some pending unsubscribe
1349        // Confirmed
1350        state.mark_subscribe("confirmed.BTCUSDT");
1351        state.confirm_subscribe("confirmed.BTCUSDT");
1352
1353        // Pending subscribe (not yet confirmed)
1354        state.mark_subscribe("pending.ETHUSDT");
1355
1356        // Pending unsubscribe (user cancelled)
1357        state.mark_subscribe("cancelled.XRPUSDT");
1358        state.confirm_subscribe("cancelled.XRPUSDT");
1359        state.mark_unsubscribe("cancelled.XRPUSDT");
1360
1361        // Verify state before reconnect
1362        assert_eq!(state.len(), 1); // Only confirmed.BTCUSDT
1363        let all = state.all_topics();
1364        assert_eq!(all.len(), 2); // confirmed + pending_subscribe (not pending_unsubscribe)
1365        assert!(all.contains(&"confirmed.BTCUSDT".to_string()));
1366        assert!(all.contains(&"pending.ETHUSDT".to_string()));
1367        assert!(!all.contains(&"cancelled.XRPUSDT".to_string())); // Should NOT be included
1368
1369        // Simulate disconnect and reconnect
1370        let topics_to_resubscribe = state.all_topics();
1371
1372        // Clear confirmed on disconnect (simulate connection drop)
1373        state.confirmed().clear();
1374
1375        // Mark all for resubscription
1376        for topic in &topics_to_resubscribe {
1377            state.mark_subscribe(topic);
1378        }
1379
1380        // Server confirms both
1381        for topic in &topics_to_resubscribe {
1382            state.confirm_subscribe(topic);
1383        }
1384
1385        // Verify final state
1386        assert_eq!(state.len(), 2); // Both confirmed
1387        let final_topics = state.all_topics();
1388        assert_eq!(final_topics.len(), 2);
1389        assert!(final_topics.contains(&"confirmed.BTCUSDT".to_string()));
1390        assert!(final_topics.contains(&"pending.ETHUSDT".to_string()));
1391        assert!(!final_topics.contains(&"cancelled.XRPUSDT".to_string()));
1392    }
1393
1394    /// Verifies all invariants of the subscription state.
1395    ///
1396    /// # Invariants
1397    ///
1398    /// 1. **Mutual exclusivity**: A topic cannot exist in multiple states simultaneously
1399    ///    (one of: confirmed, pending_subscribe, pending_unsubscribe, or none).
1400    /// 2. **all_topics consistency**: `all_topics()` must equal `confirmed ∪ pending_subscribe`
1401    /// 3. **len consistency**: `len()` must equal total count of symbols in confirmed map
1402    /// 4. **is_empty consistency**: `is_empty()` true iff all maps are empty
1403    /// 5. **Reference count non-negative**: All reference counts >= 0
1404    fn check_invariants(state: &SubscriptionState, label: &str) {
1405        // Collect all topics from each state
1406        let confirmed_topics: AHashSet<String> = state
1407            .topics_from_map(&state.confirmed)
1408            .into_iter()
1409            .collect();
1410        let pending_sub_topics: AHashSet<String> =
1411            state.pending_subscribe_topics().into_iter().collect();
1412        let pending_unsub_topics: AHashSet<String> =
1413            state.pending_unsubscribe_topics().into_iter().collect();
1414
1415        // INVARIANT 1: Mutual exclusivity - no topic in multiple states
1416        let confirmed_and_pending_sub: Vec<_> =
1417            confirmed_topics.intersection(&pending_sub_topics).collect();
1418        assert!(
1419            confirmed_and_pending_sub.is_empty(),
1420            "{label}: Topic in both confirmed and pending_subscribe: {confirmed_and_pending_sub:?}"
1421        );
1422
1423        let confirmed_and_pending_unsub: Vec<_> = confirmed_topics
1424            .intersection(&pending_unsub_topics)
1425            .collect();
1426        assert!(
1427            confirmed_and_pending_unsub.is_empty(),
1428            "{label}: Topic in both confirmed and pending_unsubscribe: {confirmed_and_pending_unsub:?}"
1429        );
1430
1431        let pending_sub_and_unsub: Vec<_> = pending_sub_topics
1432            .intersection(&pending_unsub_topics)
1433            .collect();
1434        assert!(
1435            pending_sub_and_unsub.is_empty(),
1436            "{label}: Topic in both pending_subscribe and pending_unsubscribe: {pending_sub_and_unsub:?}"
1437        );
1438
1439        // INVARIANT 2: all_topics() == confirmed ∪ pending_subscribe
1440        let all_topics: AHashSet<String> = state.all_topics().into_iter().collect();
1441        let expected_all: AHashSet<String> = confirmed_topics
1442            .union(&pending_sub_topics)
1443            .cloned()
1444            .collect();
1445        assert_eq!(
1446            all_topics, expected_all,
1447            "{label}: all_topics() doesn't match confirmed ∪ pending_subscribe"
1448        );
1449
1450        // Ensure pending_unsubscribe is NOT in all_topics
1451        for topic in &pending_unsub_topics {
1452            assert!(
1453                !all_topics.contains(topic),
1454                "{label}: pending_unsubscribe topic {topic} incorrectly in all_topics()"
1455            );
1456        }
1457
1458        // INVARIANT 3: len() == sum of confirmed symbol counts
1459        let expected_len: usize = state
1460            .confirmed
1461            .iter()
1462            .map(|entry| entry.value().len())
1463            .sum();
1464        assert_eq!(
1465            state.len(),
1466            expected_len,
1467            "{label}: len() mismatch. Expected {expected_len}, was {}",
1468            state.len()
1469        );
1470
1471        // INVARIANT 4: is_empty() consistency
1472        let should_be_empty = state.confirmed.is_empty()
1473            && pending_sub_topics.is_empty()
1474            && pending_unsub_topics.is_empty();
1475        assert_eq!(
1476            state.is_empty(),
1477            should_be_empty,
1478            "{label}: is_empty() inconsistent. Maps empty: {should_be_empty}, is_empty(): {}",
1479            state.is_empty()
1480        );
1481
1482        // INVARIANT 5: Reference counts non-negative (NonZeroUsize enforces > 0, absence = 0)
1483        for entry in state.reference_counts.iter() {
1484            let count = entry.value().get();
1485            assert!(
1486                count > 0,
1487                "{label}: Reference count should be NonZeroUsize (> 0), was {count} for {:?}",
1488                entry.key()
1489            );
1490        }
1491    }
1492
1493    /// Checks that a topic exists in exactly one of the three states or none.
1494    fn check_topic_exclusivity(state: &SubscriptionState, topic: &str, label: &str) {
1495        let (channel, symbol) = split_topic(topic, state.delimiter);
1496
1497        let in_confirmed = is_tracked(&state.confirmed, channel, symbol);
1498        let in_pending_sub = is_tracked(&state.pending_subscribe, channel, symbol);
1499        let in_pending_unsub = is_tracked(&state.pending_unsubscribe, channel, symbol);
1500
1501        let count = [in_confirmed, in_pending_sub, in_pending_unsub]
1502            .iter()
1503            .filter(|&&x| x)
1504            .count();
1505
1506        assert!(
1507            count <= 1,
1508            "{label}: Topic {topic} in {count} states (should be 0 or 1). \
1509             confirmed: {in_confirmed}, pending_sub: {in_pending_sub}, pending_unsub: {in_pending_unsub}"
1510        );
1511    }
1512
1513    #[cfg(test)]
1514    mod property_tests {
1515        use proptest::prelude::*;
1516
1517        use super::*;
1518
1519        #[derive(Debug, Clone)]
1520        enum Operation {
1521            MarkSubscribe(String),
1522            ConfirmSubscribe(String),
1523            MarkUnsubscribe(String),
1524            ConfirmUnsubscribe(String),
1525            MarkFailure(String),
1526            AddReference(String),
1527            RemoveReference(String),
1528            Clear,
1529        }
1530
1531        // Strategy for generating valid topics
1532        fn topic_strategy() -> impl Strategy<Value = String> {
1533            prop_oneof![
1534                // Symbol-level topics
1535                (any::<u8>(), any::<u8>())
1536                    .prop_map(|(ch, sym)| { format!("channel{}.SYMBOL{}", ch % 5, sym % 10) }),
1537                // Channel-level topics (no symbol)
1538                any::<u8>().prop_map(|ch| format!("channel{}", ch % 5)),
1539            ]
1540        }
1541
1542        // Strategy for generating random operations
1543        fn operation_strategy() -> impl Strategy<Value = Operation> {
1544            topic_strategy().prop_flat_map(|topic| {
1545                prop_oneof![
1546                    Just(Operation::MarkSubscribe(topic.clone())),
1547                    Just(Operation::ConfirmSubscribe(topic.clone())),
1548                    Just(Operation::MarkUnsubscribe(topic.clone())),
1549                    Just(Operation::ConfirmUnsubscribe(topic.clone())),
1550                    Just(Operation::MarkFailure(topic.clone())),
1551                    Just(Operation::AddReference(topic.clone())),
1552                    Just(Operation::RemoveReference(topic)),
1553                    Just(Operation::Clear),
1554                ]
1555            })
1556        }
1557
1558        // Apply an operation to the state
1559        fn apply_operation(state: &SubscriptionState, op: &Operation) {
1560            match op {
1561                Operation::MarkSubscribe(topic) => state.mark_subscribe(topic),
1562                Operation::ConfirmSubscribe(topic) => state.confirm_subscribe(topic),
1563                Operation::MarkUnsubscribe(topic) => state.mark_unsubscribe(topic),
1564                Operation::ConfirmUnsubscribe(topic) => state.confirm_unsubscribe(topic),
1565                Operation::MarkFailure(topic) => state.mark_failure(topic),
1566                Operation::AddReference(topic) => {
1567                    state.add_reference(topic);
1568                }
1569                Operation::RemoveReference(topic) => {
1570                    state.remove_reference(topic);
1571                }
1572                Operation::Clear => state.clear(),
1573            }
1574        }
1575
1576        proptest! {
1577            #![proptest_config(ProptestConfig::with_cases(500))]
1578
1579            /// Property: Invariants hold after any sequence of operations.
1580            #[rstest]
1581            fn prop_invariants_hold_after_operations(
1582                operations in prop::collection::vec(operation_strategy(), 1..50)
1583            ) {
1584                let state = SubscriptionState::new('.');
1585
1586                // Apply all operations
1587                for (i, op) in operations.iter().enumerate() {
1588                    apply_operation(&state, op);
1589
1590                    // Check invariants after each operation
1591                    check_invariants(&state, &format!("After op {i}: {op:?}"));
1592                }
1593
1594                // Final invariant check
1595                check_invariants(&state, "Final state");
1596            }
1597
1598            /// Property: Reference counting is always consistent.
1599            #[rstest]
1600            fn prop_reference_counting_consistency(
1601                ops in prop::collection::vec(
1602                    topic_strategy().prop_flat_map(|t| {
1603                        prop_oneof![
1604                            Just(Operation::AddReference(t.clone())),
1605                            Just(Operation::RemoveReference(t)),
1606                        ]
1607                    }),
1608                    1..100
1609                )
1610            ) {
1611                let state = SubscriptionState::new('.');
1612
1613                for op in &ops {
1614                    apply_operation(&state, op);
1615
1616                    // All reference counts must be >= 0 (NonZeroUsize or absent)
1617                    for entry in state.reference_counts.iter() {
1618                        assert!(entry.value().get() > 0);
1619                    }
1620                }
1621            }
1622
1623            /// Property: all_topics() always equals confirmed ∪ pending_subscribe.
1624            #[rstest]
1625            fn prop_all_topics_is_union(
1626                operations in prop::collection::vec(operation_strategy(), 1..50)
1627            ) {
1628                let state = SubscriptionState::new('.');
1629
1630                for op in &operations {
1631                    apply_operation(&state, op);
1632
1633                    // Verify all_topics() == confirmed ∪ pending_subscribe
1634                    let all_topics: AHashSet<String> = state.all_topics().into_iter().collect();
1635                    let confirmed: AHashSet<String> = state.topics_from_map(&state.confirmed).into_iter().collect();
1636                    let pending_sub: AHashSet<String> = state.pending_subscribe_topics().into_iter().collect();
1637                    let expected: AHashSet<String> = confirmed.union(&pending_sub).cloned().collect();
1638
1639                    assert_eq!(all_topics, expected);
1640
1641                    // Ensure pending_unsubscribe topics are NOT in all_topics
1642                    let pending_unsub: AHashSet<String> = state.pending_unsubscribe_topics().into_iter().collect();
1643                    for topic in pending_unsub {
1644                        assert!(!all_topics.contains(&topic));
1645                    }
1646                }
1647            }
1648
1649            /// Property: clear() resets to empty state.
1650            #[rstest]
1651            fn prop_clear_resets_completely(
1652                operations in prop::collection::vec(operation_strategy(), 1..30)
1653            ) {
1654                let state = SubscriptionState::new('.');
1655
1656                // Apply random operations
1657                for op in &operations {
1658                    apply_operation(&state, op);
1659                }
1660
1661                // Clear and verify complete reset
1662                state.clear();
1663
1664                assert!(state.is_empty());
1665                assert_eq!(state.len(), 0);
1666                assert!(state.all_topics().is_empty());
1667                assert!(state.pending_subscribe_topics().is_empty());
1668                assert!(state.pending_unsubscribe_topics().is_empty());
1669                assert!(state.confirmed.is_empty());
1670                assert!(state.pending_subscribe.is_empty());
1671                assert!(state.pending_unsubscribe.is_empty());
1672                assert!(state.reference_counts.is_empty());
1673            }
1674
1675            /// Property: Topics are mutually exclusive across states.
1676            #[rstest]
1677            fn prop_topic_mutual_exclusivity(
1678                operations in prop::collection::vec(operation_strategy(), 1..50),
1679                topic in topic_strategy()
1680            ) {
1681                let state = SubscriptionState::new('.');
1682
1683                for (i, op) in operations.iter().enumerate() {
1684                    apply_operation(&state, op);
1685                    check_topic_exclusivity(&state, &topic, &format!("After op {i}: {op:?}"));
1686                }
1687            }
1688        }
1689    }
1690
1691    #[rstest]
1692    fn test_exhaustive_two_step_transitions() {
1693        let operations = [
1694            "mark_subscribe",
1695            "confirm_subscribe",
1696            "mark_unsubscribe",
1697            "confirm_unsubscribe",
1698            "mark_failure",
1699        ];
1700
1701        for &op1 in &operations {
1702            for &op2 in &operations {
1703                let state = SubscriptionState::new('.');
1704                let topic = "test.TOPIC";
1705
1706                // Apply two operations
1707                apply_op(&state, op1, topic);
1708                apply_op(&state, op2, topic);
1709
1710                // Verify invariants hold
1711                check_invariants(&state, &format!("{op1} → {op2}"));
1712                check_topic_exclusivity(&state, topic, &format!("{op1} → {op2}"));
1713            }
1714        }
1715    }
1716
1717    fn apply_op(state: &SubscriptionState, op: &str, topic: &str) {
1718        match op {
1719            "mark_subscribe" => state.mark_subscribe(topic),
1720            "confirm_subscribe" => state.confirm_subscribe(topic),
1721            "mark_unsubscribe" => state.mark_unsubscribe(topic),
1722            "confirm_unsubscribe" => state.confirm_unsubscribe(topic),
1723            "mark_failure" => state.mark_failure(topic),
1724            _ => panic!("Unknown operation: {op}"),
1725        }
1726    }
1727
1728    #[rstest]
1729    #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
1730    async fn test_stress_rapid_resubscribe_pattern() {
1731        // Stress test the race condition we fixed: rapid unsubscribe → resubscribe
1732        let state = Arc::new(SubscriptionState::new('.'));
1733        let mut handles = vec![];
1734
1735        for i in 0..100 {
1736            let state_clone = Arc::clone(&state);
1737            let handle = tokio::spawn(async move {
1738                let topic = format!("rapid.SYMBOL{}", i % 10); // 10 unique topics, lots of contention
1739
1740                // Initial subscribe
1741                state_clone.mark_subscribe(&topic);
1742                state_clone.confirm_subscribe(&topic);
1743
1744                // Rapid unsubscribe → resubscribe (race condition scenario)
1745                state_clone.mark_unsubscribe(&topic);
1746                // Immediately resubscribe before unsubscribe ACK
1747                state_clone.mark_subscribe(&topic);
1748                // Now unsubscribe ACK arrives
1749                state_clone.confirm_unsubscribe(&topic);
1750                // Subscribe ACK arrives
1751                state_clone.confirm_subscribe(&topic);
1752            });
1753            handles.push(handle);
1754        }
1755
1756        for handle in handles {
1757            handle.await.unwrap();
1758        }
1759
1760        check_invariants(&state, "After rapid resubscribe stress test");
1761    }
1762
1763    #[rstest]
1764    #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
1765    async fn test_stress_failure_recovery_loop() {
1766        // Stress test failure → recovery loops
1767        // Each task gets its own unique topic to avoid race conditions in the test itself
1768        let state = Arc::new(SubscriptionState::new('.'));
1769        let mut handles = vec![];
1770
1771        for i in 0..30 {
1772            let state_clone = Arc::clone(&state);
1773            let handle = tokio::spawn(async move {
1774                let topic = format!("failure.SYMBOL{i}"); // Unique topic per task
1775
1776                // Subscribe and confirm
1777                state_clone.mark_subscribe(&topic);
1778                state_clone.confirm_subscribe(&topic);
1779
1780                // Simulate multiple failures and recoveries
1781                for _ in 0..5 {
1782                    state_clone.mark_failure(&topic);
1783                    state_clone.confirm_subscribe(&topic); // Re-confirm after retry
1784                }
1785            });
1786            handles.push(handle);
1787        }
1788
1789        for handle in handles {
1790            handle.await.unwrap();
1791        }
1792
1793        check_invariants(&state, "After failure recovery loops");
1794
1795        // All should eventually be confirmed (30 unique topics)
1796        assert_eq!(state.len(), 30);
1797    }
1798}