1use std::{
38 num::NonZeroUsize,
39 sync::{Arc, LazyLock},
40};
41
42use ahash::AHashSet;
43use dashmap::DashMap;
44use ustr::Ustr;
45
46pub(crate) static CHANNEL_LEVEL_MARKER: LazyLock<Ustr> = LazyLock::new(|| Ustr::from(""));
51
52#[derive(Clone, Debug)]
69pub struct SubscriptionState {
70 confirmed: Arc<DashMap<Ustr, AHashSet<Ustr>>>,
72 pending_subscribe: Arc<DashMap<Ustr, AHashSet<Ustr>>>,
74 pending_unsubscribe: Arc<DashMap<Ustr, AHashSet<Ustr>>>,
76 reference_counts: Arc<DashMap<Ustr, NonZeroUsize>>,
78 delimiter: char,
80}
81
82impl SubscriptionState {
83 pub fn new(delimiter: char) -> Self {
85 Self {
86 confirmed: Arc::new(DashMap::new()),
87 pending_subscribe: Arc::new(DashMap::new()),
88 pending_unsubscribe: Arc::new(DashMap::new()),
89 reference_counts: Arc::new(DashMap::new()),
90 delimiter,
91 }
92 }
93
94 pub fn delimiter(&self) -> char {
96 self.delimiter
97 }
98
99 pub fn confirmed(&self) -> Arc<DashMap<Ustr, AHashSet<Ustr>>> {
101 Arc::clone(&self.confirmed)
102 }
103
104 pub fn pending_subscribe(&self) -> Arc<DashMap<Ustr, AHashSet<Ustr>>> {
106 Arc::clone(&self.pending_subscribe)
107 }
108
109 pub fn pending_unsubscribe(&self) -> Arc<DashMap<Ustr, AHashSet<Ustr>>> {
111 Arc::clone(&self.pending_unsubscribe)
112 }
113
114 pub fn len(&self) -> usize {
118 self.confirmed.iter().map(|entry| entry.value().len()).sum()
119 }
120
121 pub fn is_empty(&self) -> bool {
123 self.confirmed.is_empty()
124 && self.pending_subscribe.is_empty()
125 && self.pending_unsubscribe.is_empty()
126 }
127
128 pub fn mark_subscribe(&self, topic: &str) {
134 let (channel, symbol) = split_topic(topic, self.delimiter);
135
136 if is_tracked(&self.confirmed, channel, symbol) {
138 return;
139 }
140
141 untrack_topic(&self.pending_unsubscribe, channel, symbol);
143
144 track_topic(&self.pending_subscribe, channel, symbol);
145 }
146
147 pub fn mark_unsubscribe(&self, topic: &str) {
153 let (channel, symbol) = split_topic(topic, self.delimiter);
154 track_topic(&self.pending_unsubscribe, channel, symbol);
155 untrack_topic(&self.confirmed, channel, symbol);
156 untrack_topic(&self.pending_subscribe, channel, symbol);
157 }
158
159 pub fn confirm_subscribe(&self, topic: &str) {
165 let (channel, symbol) = split_topic(topic, self.delimiter);
166
167 if is_tracked(&self.pending_unsubscribe, channel, symbol) {
169 return;
170 }
171
172 untrack_topic(&self.pending_subscribe, channel, symbol);
173 track_topic(&self.confirmed, channel, symbol);
174 }
175
176 pub fn confirm_unsubscribe(&self, topic: &str) {
187 let (channel, symbol) = split_topic(topic, self.delimiter);
188
189 if !is_tracked(&self.pending_unsubscribe, channel, symbol) {
192 return; }
194
195 untrack_topic(&self.pending_unsubscribe, channel, symbol);
196 untrack_topic(&self.confirmed, channel, symbol);
197 }
199
200 pub fn mark_failure(&self, topic: &str) {
205 let (channel, symbol) = split_topic(topic, self.delimiter);
206
207 if is_tracked(&self.pending_unsubscribe, channel, symbol) {
209 return;
210 }
211
212 untrack_topic(&self.confirmed, channel, symbol);
213 track_topic(&self.pending_subscribe, channel, symbol);
214 }
215
216 pub fn pending_subscribe_topics(&self) -> Vec<String> {
218 self.topics_from_map(&self.pending_subscribe)
219 }
220
221 pub fn pending_unsubscribe_topics(&self) -> Vec<String> {
223 self.topics_from_map(&self.pending_unsubscribe)
224 }
225
226 pub fn all_topics(&self) -> Vec<String> {
233 let mut topics = Vec::new();
234 topics.extend(self.topics_from_map(&self.confirmed));
235 topics.extend(self.topics_from_map(&self.pending_subscribe));
236 topics
237 }
238
239 fn topics_from_map(&self, map: &DashMap<Ustr, AHashSet<Ustr>>) -> Vec<String> {
241 let mut topics = Vec::new();
242 let marker = *CHANNEL_LEVEL_MARKER;
243
244 for entry in map.iter() {
245 let channel = entry.key();
246 let symbols = entry.value();
247
248 if symbols.contains(&marker) {
250 topics.push(channel.to_string());
251 }
252
253 for symbol in symbols.iter() {
255 if *symbol != marker {
256 topics.push(format!(
257 "{}{}{}",
258 channel.as_str(),
259 self.delimiter,
260 symbol.as_str()
261 ));
262 }
263 }
264 }
265
266 topics
267 }
268
269 pub fn add_reference(&self, topic: &str) -> bool {
278 let mut should_subscribe = false;
279 let topic_ustr = Ustr::from(topic);
280
281 self.reference_counts
282 .entry(topic_ustr)
283 .and_modify(|count| {
284 *count = NonZeroUsize::new(count.get() + 1).expect("reference count overflow");
285 })
286 .or_insert_with(|| {
287 should_subscribe = true;
288 NonZeroUsize::new(1).expect("NonZeroUsize::new(1) should never fail")
289 });
290
291 should_subscribe
292 }
293
294 pub fn remove_reference(&self, topic: &str) -> bool {
304 let topic_ustr = Ustr::from(topic);
305
306 if let dashmap::mapref::entry::Entry::Occupied(mut entry) =
309 self.reference_counts.entry(topic_ustr)
310 {
311 let current = entry.get().get();
312
313 if current == 1 {
314 entry.remove();
315 return true;
316 }
317
318 *entry.get_mut() = NonZeroUsize::new(current - 1)
319 .expect("reference count should never reach zero here");
320 }
321
322 false
323 }
324
325 pub fn get_reference_count(&self, topic: &str) -> usize {
329 let topic_ustr = Ustr::from(topic);
330 self.reference_counts
331 .get(&topic_ustr)
332 .map_or(0, |count| count.get())
333 }
334
335 pub fn clear(&self) {
339 self.confirmed.clear();
340 self.pending_subscribe.clear();
341 self.pending_unsubscribe.clear();
342 self.reference_counts.clear();
343 }
344}
345
346pub fn split_topic(topic: &str, delimiter: char) -> (&str, Option<&str>) {
348 topic
349 .split_once(delimiter)
350 .map_or((topic, None), |(channel, symbol)| (channel, Some(symbol)))
351}
352
353fn track_topic(map: &DashMap<Ustr, AHashSet<Ustr>>, channel: &str, symbol: Option<&str>) {
358 let channel_ustr = Ustr::from(channel);
359 let mut entry = map.entry(channel_ustr).or_default();
360
361 if let Some(symbol) = symbol {
362 entry.insert(Ustr::from(symbol));
363 } else {
364 entry.insert(*CHANNEL_LEVEL_MARKER);
365 }
366}
367
368fn untrack_topic(map: &DashMap<Ustr, AHashSet<Ustr>>, channel: &str, symbol: Option<&str>) {
372 let channel_ustr = Ustr::from(channel);
373 let symbol_to_remove = if let Some(symbol) = symbol {
374 Ustr::from(symbol)
375 } else {
376 *CHANNEL_LEVEL_MARKER
377 };
378
379 if let dashmap::mapref::entry::Entry::Occupied(mut entry) = map.entry(channel_ustr) {
382 entry.get_mut().remove(&symbol_to_remove);
383 if entry.get().is_empty() {
384 entry.remove();
385 }
386 }
387}
388
389fn is_tracked(map: &DashMap<Ustr, AHashSet<Ustr>>, channel: &str, symbol: Option<&str>) -> bool {
391 let channel_ustr = Ustr::from(channel);
392 let symbol_to_check = if let Some(symbol) = symbol {
393 Ustr::from(symbol)
394 } else {
395 *CHANNEL_LEVEL_MARKER
396 };
397
398 if let Some(entry) = map.get(&channel_ustr) {
399 entry.contains(&symbol_to_check)
400 } else {
401 false
402 }
403}
404
405#[cfg(test)]
410mod tests {
411 use rstest::rstest;
412
413 use super::*;
414
415 #[rstest]
416 fn test_split_topic_with_symbol() {
417 let (channel, symbol) = split_topic("tickers.BTCUSDT", '.');
418 assert_eq!(channel, "tickers");
419 assert_eq!(symbol, Some("BTCUSDT"));
420
421 let (channel, symbol) = split_topic("orderBookL2:XBTUSD", ':');
422 assert_eq!(channel, "orderBookL2");
423 assert_eq!(symbol, Some("XBTUSD"));
424 }
425
426 #[rstest]
427 fn test_split_topic_without_symbol() {
428 let (channel, symbol) = split_topic("orderbook", '.');
429 assert_eq!(channel, "orderbook");
430 assert_eq!(symbol, None);
431 }
432
433 #[rstest]
434 fn test_new_state_is_empty() {
435 let state = SubscriptionState::new('.');
436 assert!(state.is_empty());
437 assert_eq!(state.len(), 0);
438 }
439
440 #[rstest]
441 fn test_mark_subscribe() {
442 let state = SubscriptionState::new('.');
443 state.mark_subscribe("tickers.BTCUSDT");
444
445 assert_eq!(state.pending_subscribe_topics(), vec!["tickers.BTCUSDT"]);
446 assert_eq!(state.len(), 0); }
448
449 #[rstest]
450 fn test_confirm_subscribe() {
451 let state = SubscriptionState::new('.');
452 state.mark_subscribe("tickers.BTCUSDT");
453 state.confirm_subscribe("tickers.BTCUSDT");
454
455 assert!(state.pending_subscribe_topics().is_empty());
456 assert_eq!(state.len(), 1);
457 }
458
459 #[rstest]
460 fn test_mark_unsubscribe() {
461 let state = SubscriptionState::new('.');
462 state.mark_subscribe("tickers.BTCUSDT");
463 state.confirm_subscribe("tickers.BTCUSDT");
464 state.mark_unsubscribe("tickers.BTCUSDT");
465
466 assert_eq!(state.len(), 0); assert_eq!(state.pending_unsubscribe_topics(), vec!["tickers.BTCUSDT"]);
468 }
469
470 #[rstest]
471 fn test_confirm_unsubscribe() {
472 let state = SubscriptionState::new('.');
473 state.mark_subscribe("tickers.BTCUSDT");
474 state.confirm_subscribe("tickers.BTCUSDT");
475 state.mark_unsubscribe("tickers.BTCUSDT");
476 state.confirm_unsubscribe("tickers.BTCUSDT");
477
478 assert!(state.is_empty());
479 }
480
481 #[rstest]
482 fn test_resubscribe_before_unsubscribe_ack() {
483 let state = SubscriptionState::new('.');
487
488 state.mark_subscribe("tickers.BTCUSDT");
489 state.confirm_subscribe("tickers.BTCUSDT");
490 assert_eq!(state.len(), 1);
491
492 state.mark_unsubscribe("tickers.BTCUSDT");
493 assert_eq!(state.len(), 0);
494 assert_eq!(state.pending_unsubscribe_topics(), vec!["tickers.BTCUSDT"]);
495
496 state.mark_subscribe("tickers.BTCUSDT");
498 assert_eq!(state.pending_subscribe_topics(), vec!["tickers.BTCUSDT"]);
499
500 state.confirm_unsubscribe("tickers.BTCUSDT");
502 assert!(state.pending_unsubscribe_topics().is_empty());
503 assert_eq!(state.pending_subscribe_topics(), vec!["tickers.BTCUSDT"]); state.confirm_subscribe("tickers.BTCUSDT");
507 assert_eq!(state.len(), 1);
508 assert!(state.pending_subscribe_topics().is_empty());
509
510 let all = state.all_topics();
512 assert_eq!(all.len(), 1);
513 assert!(all.contains(&"tickers.BTCUSDT".to_string()));
514 }
515
516 #[rstest]
517 fn test_stale_unsubscribe_ack_after_resubscribe_confirmed() {
518 let state = SubscriptionState::new('.');
523
524 state.mark_subscribe("tickers.BTCUSDT");
526 state.confirm_subscribe("tickers.BTCUSDT");
527 assert_eq!(state.len(), 1);
528
529 state.mark_unsubscribe("tickers.BTCUSDT");
531 assert_eq!(state.len(), 0);
532 assert_eq!(state.pending_unsubscribe_topics(), vec!["tickers.BTCUSDT"]);
533
534 state.mark_subscribe("tickers.BTCUSDT");
536 assert!(state.pending_unsubscribe_topics().is_empty()); assert_eq!(state.pending_subscribe_topics(), vec!["tickers.BTCUSDT"]);
538
539 state.confirm_subscribe("tickers.BTCUSDT");
541 assert_eq!(state.len(), 1); assert!(state.pending_subscribe_topics().is_empty());
543
544 state.confirm_unsubscribe("tickers.BTCUSDT");
547
548 assert_eq!(state.len(), 1); assert!(state.pending_unsubscribe_topics().is_empty());
551 assert!(state.pending_subscribe_topics().is_empty());
552
553 let all = state.all_topics();
555 assert_eq!(all.len(), 1);
556 assert!(all.contains(&"tickers.BTCUSDT".to_string()));
557 }
558
559 #[rstest]
560 fn test_mark_failure() {
561 let state = SubscriptionState::new('.');
562 state.mark_subscribe("tickers.BTCUSDT");
563 state.confirm_subscribe("tickers.BTCUSDT");
564 state.mark_failure("tickers.BTCUSDT");
565
566 assert_eq!(state.len(), 0);
567 assert_eq!(state.pending_subscribe_topics(), vec!["tickers.BTCUSDT"]);
568 }
569
570 #[rstest]
571 fn test_all_topics_includes_confirmed_and_pending_subscribe() {
572 let state = SubscriptionState::new('.');
573 state.mark_subscribe("tickers.BTCUSDT");
574 state.confirm_subscribe("tickers.BTCUSDT");
575 state.mark_subscribe("tickers.ETHUSDT");
576
577 let topics = state.all_topics();
578 assert_eq!(topics.len(), 2);
579 assert!(topics.contains(&"tickers.BTCUSDT".to_string()));
580 assert!(topics.contains(&"tickers.ETHUSDT".to_string()));
581 }
582
583 #[rstest]
584 fn test_all_topics_excludes_pending_unsubscribe() {
585 let state = SubscriptionState::new('.');
586 state.mark_subscribe("tickers.BTCUSDT");
587 state.confirm_subscribe("tickers.BTCUSDT");
588 state.mark_unsubscribe("tickers.BTCUSDT");
589
590 let topics = state.all_topics();
591 assert!(topics.is_empty());
592 }
593
594 #[rstest]
595 fn test_reference_counting_single_topic() {
596 let state = SubscriptionState::new('.');
597
598 assert!(state.add_reference("tickers.BTCUSDT"));
599 assert_eq!(state.get_reference_count("tickers.BTCUSDT"), 1);
600
601 assert!(!state.add_reference("tickers.BTCUSDT"));
602 assert_eq!(state.get_reference_count("tickers.BTCUSDT"), 2);
603
604 assert!(!state.remove_reference("tickers.BTCUSDT"));
605 assert_eq!(state.get_reference_count("tickers.BTCUSDT"), 1);
606
607 assert!(state.remove_reference("tickers.BTCUSDT"));
608 assert_eq!(state.get_reference_count("tickers.BTCUSDT"), 0);
609 }
610
611 #[rstest]
612 fn test_reference_counting_multiple_topics() {
613 let state = SubscriptionState::new('.');
614
615 assert!(state.add_reference("tickers.BTCUSDT"));
616 assert!(state.add_reference("tickers.ETHUSDT"));
617
618 assert!(!state.add_reference("tickers.BTCUSDT"));
619 assert_eq!(state.get_reference_count("tickers.BTCUSDT"), 2);
620 assert_eq!(state.get_reference_count("tickers.ETHUSDT"), 1);
621
622 assert!(!state.remove_reference("tickers.BTCUSDT"));
623 assert!(state.remove_reference("tickers.ETHUSDT"));
624 }
625
626 #[rstest]
627 fn test_topic_without_symbol() {
628 let state = SubscriptionState::new('.');
629 state.mark_subscribe("orderbook");
630 state.confirm_subscribe("orderbook");
631
632 assert_eq!(state.len(), 1);
633 assert_eq!(state.all_topics(), vec!["orderbook"]);
634 }
635
636 #[rstest]
637 fn test_different_delimiters() {
638 let state_dot = SubscriptionState::new('.');
639 state_dot.mark_subscribe("tickers.BTCUSDT");
640 assert_eq!(
641 state_dot.pending_subscribe_topics(),
642 vec!["tickers.BTCUSDT"]
643 );
644
645 let state_colon = SubscriptionState::new(':');
646 state_colon.mark_subscribe("orderBookL2:XBTUSD");
647 assert_eq!(
648 state_colon.pending_subscribe_topics(),
649 vec!["orderBookL2:XBTUSD"]
650 );
651 }
652
653 #[rstest]
654 fn test_clear() {
655 let state = SubscriptionState::new('.');
656 state.mark_subscribe("tickers.BTCUSDT");
657 state.confirm_subscribe("tickers.BTCUSDT");
658 state.add_reference("tickers.BTCUSDT");
659
660 state.clear();
661
662 assert!(state.is_empty());
663 assert_eq!(state.get_reference_count("tickers.BTCUSDT"), 0);
664 }
665
666 #[rstest]
667 fn test_multiple_symbols_same_channel() {
668 let state = SubscriptionState::new('.');
669 state.mark_subscribe("tickers.BTCUSDT");
670 state.mark_subscribe("tickers.ETHUSDT");
671 state.confirm_subscribe("tickers.BTCUSDT");
672 state.confirm_subscribe("tickers.ETHUSDT");
673
674 assert_eq!(state.len(), 2);
675 let topics = state.all_topics();
676 assert!(topics.contains(&"tickers.BTCUSDT".to_string()));
677 assert!(topics.contains(&"tickers.ETHUSDT".to_string()));
678 }
679
680 #[rstest]
681 fn test_mixed_channel_and_symbol_subscriptions() {
682 let state = SubscriptionState::new('.');
683
684 state.mark_subscribe("tickers");
686 state.confirm_subscribe("tickers");
687 assert_eq!(state.len(), 1);
688 assert_eq!(state.all_topics(), vec!["tickers"]);
689
690 state.mark_subscribe("tickers.BTCUSDT");
692 state.confirm_subscribe("tickers.BTCUSDT");
693 assert_eq!(state.len(), 2);
694
695 let topics = state.all_topics();
697 assert_eq!(topics.len(), 2);
698 assert!(topics.contains(&"tickers".to_string()));
699 assert!(topics.contains(&"tickers.BTCUSDT".to_string()));
700
701 state.mark_subscribe("tickers.ETHUSDT");
703 state.confirm_subscribe("tickers.ETHUSDT");
704 assert_eq!(state.len(), 3);
705
706 let topics = state.all_topics();
707 assert_eq!(topics.len(), 3);
708 assert!(topics.contains(&"tickers".to_string()));
709 assert!(topics.contains(&"tickers.BTCUSDT".to_string()));
710 assert!(topics.contains(&"tickers.ETHUSDT".to_string()));
711
712 state.mark_unsubscribe("tickers");
714 state.confirm_unsubscribe("tickers");
715 assert_eq!(state.len(), 2);
716
717 let topics = state.all_topics();
718 assert_eq!(topics.len(), 2);
719 assert!(!topics.contains(&"tickers".to_string()));
720 assert!(topics.contains(&"tickers.BTCUSDT".to_string()));
721 assert!(topics.contains(&"tickers.ETHUSDT".to_string()));
722 }
723
724 #[rstest]
725 fn test_symbol_subscription_before_channel() {
726 let state = SubscriptionState::new('.');
727
728 state.mark_subscribe("tickers.BTCUSDT");
730 state.confirm_subscribe("tickers.BTCUSDT");
731 assert_eq!(state.len(), 1);
732
733 state.mark_subscribe("tickers");
735 state.confirm_subscribe("tickers");
736 assert_eq!(state.len(), 2);
737
738 let topics = state.all_topics();
740 assert_eq!(topics.len(), 2);
741 assert!(topics.contains(&"tickers".to_string()));
742 assert!(topics.contains(&"tickers.BTCUSDT".to_string()));
743 }
744
745 #[rstest]
746 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
747 async fn test_concurrent_subscribe_same_topic() {
748 let state = Arc::new(SubscriptionState::new('.'));
749 let mut handles = vec![];
750
751 for _ in 0..10 {
753 let state_clone = Arc::clone(&state);
754 let handle = tokio::spawn(async move {
755 state_clone.add_reference("tickers.BTCUSDT");
756 state_clone.mark_subscribe("tickers.BTCUSDT");
757 state_clone.confirm_subscribe("tickers.BTCUSDT");
758 });
759 handles.push(handle);
760 }
761
762 for handle in handles {
763 handle.await.unwrap();
764 }
765
766 assert_eq!(state.get_reference_count("tickers.BTCUSDT"), 10);
768 assert_eq!(state.len(), 1);
769 }
770
771 #[rstest]
772 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
773 async fn test_concurrent_subscribe_unsubscribe() {
774 let state = Arc::new(SubscriptionState::new('.'));
775 let mut handles = vec![];
776
777 for i in 0..20 {
780 let state_clone = Arc::clone(&state);
781 let handle = tokio::spawn(async move {
782 let topic = format!("tickers.SYMBOL{i}");
783 state_clone.add_reference(&topic);
785 state_clone.add_reference(&topic);
786 state_clone.mark_subscribe(&topic);
787 state_clone.confirm_subscribe(&topic);
788
789 state_clone.remove_reference(&topic);
791 });
792 handles.push(handle);
793 }
794
795 for handle in handles {
796 handle.await.unwrap();
797 }
798
799 for i in 0..20 {
801 let topic = format!("tickers.SYMBOL{i}");
802 assert_eq!(state.get_reference_count(&topic), 1);
803 }
804
805 assert_eq!(state.len(), 20);
807 assert!(!state.is_empty());
808 }
809
810 #[rstest]
811 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
812 async fn test_concurrent_reference_counting_same_topic() {
813 let state = Arc::new(SubscriptionState::new('.'));
814 let topic = "tickers.BTCUSDT";
815 let mut handles = vec![];
816
817 for _ in 0..10 {
819 let state_clone = Arc::clone(&state);
820 let handle = tokio::spawn(async move {
821 for _ in 0..10 {
822 state_clone.add_reference(topic);
823 }
824 });
825 handles.push(handle);
826 }
827
828 for handle in handles {
829 handle.await.unwrap();
830 }
831
832 assert_eq!(state.get_reference_count(topic), 100);
834
835 for _ in 0..50 {
837 state.remove_reference(topic);
838 }
839
840 assert_eq!(state.get_reference_count(topic), 50);
842 }
843
844 #[rstest]
845 fn test_reconnection_scenario() {
846 let state = SubscriptionState::new('.');
847
848 state.add_reference("tickers.BTCUSDT");
850 state.mark_subscribe("tickers.BTCUSDT");
851 state.confirm_subscribe("tickers.BTCUSDT");
852
853 state.add_reference("tickers.ETHUSDT");
854 state.mark_subscribe("tickers.ETHUSDT");
855 state.confirm_subscribe("tickers.ETHUSDT");
856
857 state.add_reference("orderbook");
858 state.mark_subscribe("orderbook");
859 state.confirm_subscribe("orderbook");
860
861 assert_eq!(state.len(), 3);
862
863 let topics_to_resubscribe = state.all_topics();
865 assert_eq!(topics_to_resubscribe.len(), 3);
866 assert!(topics_to_resubscribe.contains(&"tickers.BTCUSDT".to_string()));
867 assert!(topics_to_resubscribe.contains(&"tickers.ETHUSDT".to_string()));
868 assert!(topics_to_resubscribe.contains(&"orderbook".to_string()));
869
870 for topic in &topics_to_resubscribe {
872 state.mark_subscribe(topic);
873 }
874
875 for topic in &topics_to_resubscribe {
877 state.confirm_subscribe(topic);
878 }
879
880 assert_eq!(state.len(), 3);
882 assert_eq!(state.all_topics().len(), 3);
883 }
884
885 #[rstest]
886 fn test_state_machine_invalid_transitions() {
887 let state = SubscriptionState::new('.');
888
889 state.confirm_subscribe("tickers.BTCUSDT");
891 assert_eq!(state.len(), 1); state.confirm_unsubscribe("tickers.ETHUSDT");
895 assert_eq!(state.len(), 1); state.mark_subscribe("orderbook");
899 state.confirm_subscribe("orderbook");
900 state.confirm_subscribe("orderbook"); assert_eq!(state.len(), 2);
902
903 state.mark_unsubscribe("nonexistent");
905 state.confirm_unsubscribe("nonexistent");
906 assert_eq!(state.len(), 2); }
908
909 #[rstest]
910 fn test_mark_failure_moves_to_pending() {
911 let state = SubscriptionState::new('.');
912
913 state.mark_subscribe("tickers.BTCUSDT");
915 state.confirm_subscribe("tickers.BTCUSDT");
916 assert_eq!(state.len(), 1);
917 assert!(state.pending_subscribe_topics().is_empty());
918
919 state.mark_failure("tickers.BTCUSDT");
921
922 assert_eq!(state.len(), 0);
924 assert_eq!(state.pending_subscribe_topics(), vec!["tickers.BTCUSDT"]);
925
926 assert_eq!(state.all_topics(), vec!["tickers.BTCUSDT"]);
928 }
929
930 #[rstest]
931 fn test_pending_subscribe_excludes_pending_unsubscribe() {
932 let state = SubscriptionState::new('.');
933
934 state.mark_subscribe("tickers.BTCUSDT");
936 state.confirm_subscribe("tickers.BTCUSDT");
937
938 state.mark_unsubscribe("tickers.BTCUSDT");
940
941 assert_eq!(state.pending_unsubscribe_topics(), vec!["tickers.BTCUSDT"]);
943 assert!(state.all_topics().is_empty());
944 assert_eq!(state.len(), 0);
945 }
946
947 #[rstest]
948 fn test_remove_reference_nonexistent_topic() {
949 let state = SubscriptionState::new('.');
950
951 let should_unsubscribe = state.remove_reference("nonexistent");
953
954 assert!(!should_unsubscribe);
956 assert_eq!(state.get_reference_count("nonexistent"), 0);
957 }
958
959 #[rstest]
960 fn test_edge_case_empty_channel_name() {
961 let state = SubscriptionState::new('.');
962
963 state.mark_subscribe("");
965 state.confirm_subscribe("");
966
967 assert_eq!(state.len(), 1);
968 assert_eq!(state.all_topics(), vec![""]);
969 }
970
971 #[rstest]
972 fn test_special_characters_in_topics() {
973 let state = SubscriptionState::new('.');
974
975 let special_topics = vec![
977 "channel.symbol-with-dash",
978 "channel.SYMBOL_WITH_UNDERSCORE",
979 "channel.symbol123",
980 "channel.symbol@special",
981 ];
982
983 for topic in &special_topics {
984 state.mark_subscribe(topic);
985 state.confirm_subscribe(topic);
986 }
987
988 assert_eq!(state.len(), special_topics.len());
989
990 let all_topics = state.all_topics();
991 for topic in &special_topics {
992 assert!(
993 all_topics.contains(&(*topic).to_string()),
994 "Missing topic: {topic}"
995 );
996 }
997 }
998
999 #[rstest]
1000 fn test_clear_resets_all_state() {
1001 let state = SubscriptionState::new('.');
1002
1003 for i in 0..10 {
1005 let topic = format!("channel{i}.SYMBOL");
1006 state.add_reference(&topic);
1007 state.add_reference(&topic); state.mark_subscribe(&topic);
1009 state.confirm_subscribe(&topic);
1010 }
1011
1012 assert_eq!(state.len(), 10);
1013 assert!(!state.is_empty());
1014
1015 state.clear();
1017
1018 assert_eq!(state.len(), 0);
1020 assert!(state.is_empty());
1021 assert!(state.all_topics().is_empty());
1022 assert!(state.pending_subscribe_topics().is_empty());
1023 assert!(state.pending_unsubscribe_topics().is_empty());
1024
1025 for i in 0..10 {
1027 let topic = format!("channel{i}.SYMBOL");
1028 assert_eq!(state.get_reference_count(&topic), 0);
1029 }
1030 }
1031
1032 #[rstest]
1033 fn test_different_delimiter_does_not_affect_storage() {
1034 let state_dot = SubscriptionState::new('.');
1036 let state_colon = SubscriptionState::new(':');
1037
1038 state_dot.mark_subscribe("channel.SYMBOL");
1040 state_colon.mark_subscribe("channel:SYMBOL");
1041
1042 assert_eq!(state_dot.pending_subscribe_topics(), vec!["channel.SYMBOL"]);
1044 assert_eq!(
1045 state_colon.pending_subscribe_topics(),
1046 vec!["channel:SYMBOL"]
1047 );
1048 }
1049
1050 #[rstest]
1051 fn test_unsubscribe_before_subscribe_confirmed() {
1052 let state = SubscriptionState::new('.');
1053
1054 state.mark_subscribe("tickers.BTCUSDT");
1056 assert_eq!(state.pending_subscribe_topics(), vec!["tickers.BTCUSDT"]);
1057
1058 state.mark_unsubscribe("tickers.BTCUSDT");
1060
1061 assert!(state.pending_subscribe_topics().is_empty());
1063 assert_eq!(state.pending_unsubscribe_topics(), vec!["tickers.BTCUSDT"]);
1064
1065 state.confirm_unsubscribe("tickers.BTCUSDT");
1067
1068 assert!(state.is_empty());
1070 assert!(state.all_topics().is_empty());
1071 assert_eq!(state.len(), 0);
1072 }
1073
1074 #[rstest]
1075 fn test_late_subscribe_confirmation_after_unsubscribe() {
1076 let state = SubscriptionState::new('.');
1077
1078 state.mark_subscribe("tickers.BTCUSDT");
1080
1081 state.mark_unsubscribe("tickers.BTCUSDT");
1083
1084 state.confirm_subscribe("tickers.BTCUSDT");
1086
1087 assert_eq!(state.len(), 0);
1089 assert!(state.pending_subscribe_topics().is_empty());
1090
1091 state.confirm_unsubscribe("tickers.BTCUSDT");
1093
1094 assert!(state.is_empty());
1096 assert!(state.all_topics().is_empty());
1097 }
1098
1099 #[rstest]
1100 fn test_unsubscribe_clears_all_states() {
1101 let state = SubscriptionState::new('.');
1102
1103 state.mark_subscribe("tickers.BTCUSDT");
1105 state.confirm_subscribe("tickers.BTCUSDT");
1106 assert_eq!(state.len(), 1);
1107
1108 state.mark_unsubscribe("tickers.BTCUSDT");
1110
1111 assert_eq!(state.len(), 0);
1113 assert_eq!(state.pending_unsubscribe_topics(), vec!["tickers.BTCUSDT"]);
1114
1115 state.confirm_subscribe("tickers.BTCUSDT");
1117
1118 state.confirm_unsubscribe("tickers.BTCUSDT");
1120
1121 assert!(state.is_empty());
1123 assert_eq!(state.len(), 0);
1124 assert!(state.pending_subscribe_topics().is_empty());
1125 assert!(state.pending_unsubscribe_topics().is_empty());
1126 assert!(state.all_topics().is_empty());
1127 }
1128
1129 #[rstest]
1130 fn test_mark_failure_respects_pending_unsubscribe() {
1131 let state = SubscriptionState::new('.');
1132
1133 state.mark_subscribe("tickers.BTCUSDT");
1135 state.confirm_subscribe("tickers.BTCUSDT");
1136 assert_eq!(state.len(), 1);
1137
1138 state.mark_unsubscribe("tickers.BTCUSDT");
1140 assert_eq!(state.len(), 0);
1141 assert_eq!(state.pending_unsubscribe_topics(), vec!["tickers.BTCUSDT"]);
1142
1143 state.mark_failure("tickers.BTCUSDT");
1145
1146 assert!(state.pending_subscribe_topics().is_empty());
1148 assert_eq!(state.pending_unsubscribe_topics(), vec!["tickers.BTCUSDT"]);
1149
1150 assert!(state.all_topics().is_empty());
1152
1153 state.confirm_unsubscribe("tickers.BTCUSDT");
1155 assert!(state.is_empty());
1156 }
1157
1158 #[rstest]
1159 #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
1160 async fn test_concurrent_stress_mixed_operations() {
1161 let state = Arc::new(SubscriptionState::new('.'));
1162 let mut handles = vec![];
1163
1164 for i in 0..50 {
1166 let state_clone = Arc::clone(&state);
1167 let handle = tokio::spawn(async move {
1168 let topic1 = format!("channel.SYMBOL{i}");
1169 let topic2 = format!("channel.SYMBOL{}", i + 100);
1170
1171 state_clone.add_reference(&topic1);
1173 state_clone.add_reference(&topic2);
1174
1175 state_clone.mark_subscribe(&topic1);
1177 state_clone.confirm_subscribe(&topic1);
1178 state_clone.mark_subscribe(&topic2);
1179
1180 if i % 3 == 0 {
1182 state_clone.mark_unsubscribe(&topic1);
1183 state_clone.confirm_unsubscribe(&topic1);
1184 }
1185
1186 state_clone.add_reference(&topic2);
1188 state_clone.remove_reference(&topic2);
1189
1190 state_clone.confirm_subscribe(&topic2);
1192 });
1193 handles.push(handle);
1194 }
1195
1196 for handle in handles {
1197 handle.await.unwrap();
1198 }
1199
1200 let all = state.all_topics();
1202 let confirmed_count = state.len();
1203
1204 assert!(confirmed_count > 50); assert!(confirmed_count <= 100); assert_eq!(
1209 all.len(),
1210 confirmed_count + state.pending_subscribe_topics().len()
1211 );
1212 }
1213
1214 #[rstest]
1215 fn test_edge_case_malformed_topics() {
1216 let state = SubscriptionState::new('.');
1217
1218 state.mark_subscribe("channel.symbol.extra");
1220 state.confirm_subscribe("channel.symbol.extra");
1221 let topics = state.all_topics();
1222 assert!(topics.contains(&"channel.symbol.extra".to_string()));
1223
1224 state.mark_subscribe(".channel");
1226 state.confirm_subscribe(".channel");
1227 assert_eq!(state.len(), 2);
1228
1229 state.mark_subscribe("channel.");
1232 state.confirm_subscribe("channel.");
1233 assert_eq!(state.len(), 3);
1234
1235 state.mark_subscribe("tickers");
1237 state.confirm_subscribe("tickers");
1238 assert_eq!(state.len(), 4);
1239
1240 let all = state.all_topics();
1242 assert_eq!(all.len(), 4);
1243 assert!(all.contains(&"channel.symbol.extra".to_string()));
1244 assert!(all.contains(&".channel".to_string()));
1245 assert!(all.contains(&"channel".to_string())); assert!(all.contains(&"tickers".to_string()));
1247 }
1248
1249 #[rstest]
1250 fn test_reference_count_underflow_safety() {
1251 let state = SubscriptionState::new('.');
1252
1253 assert!(!state.remove_reference("never.added"));
1255 assert_eq!(state.get_reference_count("never.added"), 0);
1256
1257 state.add_reference("once.added");
1259 assert_eq!(state.get_reference_count("once.added"), 1);
1260
1261 assert!(state.remove_reference("once.added")); assert_eq!(state.get_reference_count("once.added"), 0);
1263
1264 assert!(!state.remove_reference("once.added")); assert!(!state.remove_reference("once.added")); assert_eq!(state.get_reference_count("once.added"), 0);
1267
1268 assert!(state.add_reference("once.added"));
1270 assert_eq!(state.get_reference_count("once.added"), 1);
1271 }
1272
1273 #[rstest]
1274 fn test_reconnection_with_partial_state() {
1275 let state = SubscriptionState::new('.');
1276
1277 state.mark_subscribe("confirmed.BTCUSDT");
1280 state.confirm_subscribe("confirmed.BTCUSDT");
1281
1282 state.mark_subscribe("pending.ETHUSDT");
1284
1285 state.mark_subscribe("cancelled.XRPUSDT");
1287 state.confirm_subscribe("cancelled.XRPUSDT");
1288 state.mark_unsubscribe("cancelled.XRPUSDT");
1289
1290 assert_eq!(state.len(), 1); let all = state.all_topics();
1293 assert_eq!(all.len(), 2); assert!(all.contains(&"confirmed.BTCUSDT".to_string()));
1295 assert!(all.contains(&"pending.ETHUSDT".to_string()));
1296 assert!(!all.contains(&"cancelled.XRPUSDT".to_string())); let topics_to_resubscribe = state.all_topics();
1300
1301 state.confirmed().clear();
1303
1304 for topic in &topics_to_resubscribe {
1306 state.mark_subscribe(topic);
1307 }
1308
1309 for topic in &topics_to_resubscribe {
1311 state.confirm_subscribe(topic);
1312 }
1313
1314 assert_eq!(state.len(), 2); let final_topics = state.all_topics();
1317 assert_eq!(final_topics.len(), 2);
1318 assert!(final_topics.contains(&"confirmed.BTCUSDT".to_string()));
1319 assert!(final_topics.contains(&"pending.ETHUSDT".to_string()));
1320 assert!(!final_topics.contains(&"cancelled.XRPUSDT".to_string()));
1321 }
1322
1323 fn check_invariants(state: &SubscriptionState, label: &str) {
1334 let confirmed_topics: AHashSet<String> = state
1336 .topics_from_map(&state.confirmed)
1337 .into_iter()
1338 .collect();
1339 let pending_sub_topics: AHashSet<String> =
1340 state.pending_subscribe_topics().into_iter().collect();
1341 let pending_unsub_topics: AHashSet<String> =
1342 state.pending_unsubscribe_topics().into_iter().collect();
1343
1344 let confirmed_and_pending_sub: Vec<_> =
1346 confirmed_topics.intersection(&pending_sub_topics).collect();
1347 assert!(
1348 confirmed_and_pending_sub.is_empty(),
1349 "{label}: Topic in both confirmed and pending_subscribe: {confirmed_and_pending_sub:?}"
1350 );
1351
1352 let confirmed_and_pending_unsub: Vec<_> = confirmed_topics
1353 .intersection(&pending_unsub_topics)
1354 .collect();
1355 assert!(
1356 confirmed_and_pending_unsub.is_empty(),
1357 "{label}: Topic in both confirmed and pending_unsubscribe: {confirmed_and_pending_unsub:?}"
1358 );
1359
1360 let pending_sub_and_unsub: Vec<_> = pending_sub_topics
1361 .intersection(&pending_unsub_topics)
1362 .collect();
1363 assert!(
1364 pending_sub_and_unsub.is_empty(),
1365 "{label}: Topic in both pending_subscribe and pending_unsubscribe: {pending_sub_and_unsub:?}"
1366 );
1367
1368 let all_topics: AHashSet<String> = state.all_topics().into_iter().collect();
1370 let expected_all: AHashSet<String> = confirmed_topics
1371 .union(&pending_sub_topics)
1372 .cloned()
1373 .collect();
1374 assert_eq!(
1375 all_topics, expected_all,
1376 "{label}: all_topics() doesn't match confirmed ∪ pending_subscribe"
1377 );
1378
1379 for topic in &pending_unsub_topics {
1381 assert!(
1382 !all_topics.contains(topic),
1383 "{label}: pending_unsubscribe topic {topic} incorrectly in all_topics()"
1384 );
1385 }
1386
1387 let expected_len: usize = state
1389 .confirmed
1390 .iter()
1391 .map(|entry| entry.value().len())
1392 .sum();
1393 assert_eq!(
1394 state.len(),
1395 expected_len,
1396 "{label}: len() mismatch. Expected {expected_len}, was {}",
1397 state.len()
1398 );
1399
1400 let should_be_empty = state.confirmed.is_empty()
1402 && pending_sub_topics.is_empty()
1403 && pending_unsub_topics.is_empty();
1404 assert_eq!(
1405 state.is_empty(),
1406 should_be_empty,
1407 "{label}: is_empty() inconsistent. Maps empty: {should_be_empty}, is_empty(): {}",
1408 state.is_empty()
1409 );
1410
1411 for entry in state.reference_counts.iter() {
1413 let count = entry.value().get();
1414 assert!(
1415 count > 0,
1416 "{label}: Reference count should be NonZeroUsize (> 0), was {count} for {:?}",
1417 entry.key()
1418 );
1419 }
1420 }
1421
1422 fn check_topic_exclusivity(state: &SubscriptionState, topic: &str, label: &str) {
1424 let (channel, symbol) = split_topic(topic, state.delimiter);
1425
1426 let in_confirmed = is_tracked(&state.confirmed, channel, symbol);
1427 let in_pending_sub = is_tracked(&state.pending_subscribe, channel, symbol);
1428 let in_pending_unsub = is_tracked(&state.pending_unsubscribe, channel, symbol);
1429
1430 let count = [in_confirmed, in_pending_sub, in_pending_unsub]
1431 .iter()
1432 .filter(|&&x| x)
1433 .count();
1434
1435 assert!(
1436 count <= 1,
1437 "{label}: Topic {topic} in {count} states (should be 0 or 1). \
1438 confirmed: {in_confirmed}, pending_sub: {in_pending_sub}, pending_unsub: {in_pending_unsub}"
1439 );
1440 }
1441
1442 #[cfg(test)]
1443 mod property_tests {
1444 use proptest::prelude::*;
1445
1446 use super::*;
1447
1448 #[derive(Debug, Clone)]
1449 enum Operation {
1450 MarkSubscribe(String),
1451 ConfirmSubscribe(String),
1452 MarkUnsubscribe(String),
1453 ConfirmUnsubscribe(String),
1454 MarkFailure(String),
1455 AddReference(String),
1456 RemoveReference(String),
1457 Clear,
1458 }
1459
1460 fn topic_strategy() -> impl Strategy<Value = String> {
1462 prop_oneof![
1463 (any::<u8>(), any::<u8>())
1465 .prop_map(|(ch, sym)| { format!("channel{}.SYMBOL{}", ch % 5, sym % 10) }),
1466 any::<u8>().prop_map(|ch| format!("channel{}", ch % 5)),
1468 ]
1469 }
1470
1471 fn operation_strategy() -> impl Strategy<Value = Operation> {
1473 topic_strategy().prop_flat_map(|topic| {
1474 prop_oneof![
1475 Just(Operation::MarkSubscribe(topic.clone())),
1476 Just(Operation::ConfirmSubscribe(topic.clone())),
1477 Just(Operation::MarkUnsubscribe(topic.clone())),
1478 Just(Operation::ConfirmUnsubscribe(topic.clone())),
1479 Just(Operation::MarkFailure(topic.clone())),
1480 Just(Operation::AddReference(topic.clone())),
1481 Just(Operation::RemoveReference(topic)),
1482 Just(Operation::Clear),
1483 ]
1484 })
1485 }
1486
1487 fn apply_operation(state: &SubscriptionState, op: &Operation) {
1489 match op {
1490 Operation::MarkSubscribe(topic) => state.mark_subscribe(topic),
1491 Operation::ConfirmSubscribe(topic) => state.confirm_subscribe(topic),
1492 Operation::MarkUnsubscribe(topic) => state.mark_unsubscribe(topic),
1493 Operation::ConfirmUnsubscribe(topic) => state.confirm_unsubscribe(topic),
1494 Operation::MarkFailure(topic) => state.mark_failure(topic),
1495 Operation::AddReference(topic) => {
1496 state.add_reference(topic);
1497 }
1498 Operation::RemoveReference(topic) => {
1499 state.remove_reference(topic);
1500 }
1501 Operation::Clear => state.clear(),
1502 }
1503 }
1504
1505 proptest! {
1506 #![proptest_config(ProptestConfig::with_cases(500))]
1507
1508 #[rstest]
1510 fn prop_invariants_hold_after_operations(
1511 operations in prop::collection::vec(operation_strategy(), 1..50)
1512 ) {
1513 let state = SubscriptionState::new('.');
1514
1515 for (i, op) in operations.iter().enumerate() {
1517 apply_operation(&state, op);
1518
1519 check_invariants(&state, &format!("After op {i}: {op:?}"));
1521 }
1522
1523 check_invariants(&state, "Final state");
1525 }
1526
1527 #[rstest]
1529 fn prop_reference_counting_consistency(
1530 ops in prop::collection::vec(
1531 topic_strategy().prop_flat_map(|t| {
1532 prop_oneof![
1533 Just(Operation::AddReference(t.clone())),
1534 Just(Operation::RemoveReference(t)),
1535 ]
1536 }),
1537 1..100
1538 )
1539 ) {
1540 let state = SubscriptionState::new('.');
1541
1542 for op in &ops {
1543 apply_operation(&state, op);
1544
1545 for entry in state.reference_counts.iter() {
1547 assert!(entry.value().get() > 0);
1548 }
1549 }
1550 }
1551
1552 #[rstest]
1554 fn prop_all_topics_is_union(
1555 operations in prop::collection::vec(operation_strategy(), 1..50)
1556 ) {
1557 let state = SubscriptionState::new('.');
1558
1559 for op in &operations {
1560 apply_operation(&state, op);
1561
1562 let all_topics: AHashSet<String> = state.all_topics().into_iter().collect();
1564 let confirmed: AHashSet<String> = state.topics_from_map(&state.confirmed).into_iter().collect();
1565 let pending_sub: AHashSet<String> = state.pending_subscribe_topics().into_iter().collect();
1566 let expected: AHashSet<String> = confirmed.union(&pending_sub).cloned().collect();
1567
1568 assert_eq!(all_topics, expected);
1569
1570 let pending_unsub: AHashSet<String> = state.pending_unsubscribe_topics().into_iter().collect();
1572 for topic in pending_unsub {
1573 assert!(!all_topics.contains(&topic));
1574 }
1575 }
1576 }
1577
1578 #[rstest]
1580 fn prop_clear_resets_completely(
1581 operations in prop::collection::vec(operation_strategy(), 1..30)
1582 ) {
1583 let state = SubscriptionState::new('.');
1584
1585 for op in &operations {
1587 apply_operation(&state, op);
1588 }
1589
1590 state.clear();
1592
1593 assert!(state.is_empty());
1594 assert_eq!(state.len(), 0);
1595 assert!(state.all_topics().is_empty());
1596 assert!(state.pending_subscribe_topics().is_empty());
1597 assert!(state.pending_unsubscribe_topics().is_empty());
1598 assert!(state.confirmed.is_empty());
1599 assert!(state.pending_subscribe.is_empty());
1600 assert!(state.pending_unsubscribe.is_empty());
1601 assert!(state.reference_counts.is_empty());
1602 }
1603
1604 #[rstest]
1606 fn prop_topic_mutual_exclusivity(
1607 operations in prop::collection::vec(operation_strategy(), 1..50),
1608 topic in topic_strategy()
1609 ) {
1610 let state = SubscriptionState::new('.');
1611
1612 for (i, op) in operations.iter().enumerate() {
1613 apply_operation(&state, op);
1614 check_topic_exclusivity(&state, &topic, &format!("After op {i}: {op:?}"));
1615 }
1616 }
1617 }
1618 }
1619
1620 #[rstest]
1621 fn test_exhaustive_two_step_transitions() {
1622 let operations = [
1623 "mark_subscribe",
1624 "confirm_subscribe",
1625 "mark_unsubscribe",
1626 "confirm_unsubscribe",
1627 "mark_failure",
1628 ];
1629
1630 for &op1 in &operations {
1631 for &op2 in &operations {
1632 let state = SubscriptionState::new('.');
1633 let topic = "test.TOPIC";
1634
1635 apply_op(&state, op1, topic);
1637 apply_op(&state, op2, topic);
1638
1639 check_invariants(&state, &format!("{op1} → {op2}"));
1641 check_topic_exclusivity(&state, topic, &format!("{op1} → {op2}"));
1642 }
1643 }
1644 }
1645
1646 fn apply_op(state: &SubscriptionState, op: &str, topic: &str) {
1647 match op {
1648 "mark_subscribe" => state.mark_subscribe(topic),
1649 "confirm_subscribe" => state.confirm_subscribe(topic),
1650 "mark_unsubscribe" => state.mark_unsubscribe(topic),
1651 "confirm_unsubscribe" => state.confirm_unsubscribe(topic),
1652 "mark_failure" => state.mark_failure(topic),
1653 _ => panic!("Unknown operation: {op}"),
1654 }
1655 }
1656
1657 #[rstest]
1658 #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
1659 async fn test_stress_rapid_resubscribe_pattern() {
1660 let state = Arc::new(SubscriptionState::new('.'));
1662 let mut handles = vec![];
1663
1664 for i in 0..100 {
1665 let state_clone = Arc::clone(&state);
1666 let handle = tokio::spawn(async move {
1667 let topic = format!("rapid.SYMBOL{}", i % 10); state_clone.mark_subscribe(&topic);
1671 state_clone.confirm_subscribe(&topic);
1672
1673 state_clone.mark_unsubscribe(&topic);
1675 state_clone.mark_subscribe(&topic);
1677 state_clone.confirm_unsubscribe(&topic);
1679 state_clone.confirm_subscribe(&topic);
1681 });
1682 handles.push(handle);
1683 }
1684
1685 for handle in handles {
1686 handle.await.unwrap();
1687 }
1688
1689 check_invariants(&state, "After rapid resubscribe stress test");
1690 }
1691
1692 #[rstest]
1693 #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
1694 async fn test_stress_failure_recovery_loop() {
1695 let state = Arc::new(SubscriptionState::new('.'));
1698 let mut handles = vec![];
1699
1700 for i in 0..30 {
1701 let state_clone = Arc::clone(&state);
1702 let handle = tokio::spawn(async move {
1703 let topic = format!("failure.SYMBOL{i}"); state_clone.mark_subscribe(&topic);
1707 state_clone.confirm_subscribe(&topic);
1708
1709 for _ in 0..5 {
1711 state_clone.mark_failure(&topic);
1712 state_clone.confirm_subscribe(&topic); }
1714 });
1715 handles.push(handle);
1716 }
1717
1718 for handle in handles {
1719 handle.await.unwrap();
1720 }
1721
1722 check_invariants(&state, "After failure recovery loops");
1723
1724 assert_eq!(state.len(), 30);
1726 }
1727}