1use std::{future::Future, marker::PhantomData, time::Duration};
19
20use tokio_util::sync::CancellationToken;
21
22use crate::backoff::ExponentialBackoff;
23
24#[derive(Debug, Clone)]
26pub struct RetryConfig {
27 pub max_retries: u32,
29 pub initial_delay_ms: u64,
31 pub max_delay_ms: u64,
33 pub backoff_factor: f64,
35 pub jitter_ms: u64,
37 pub operation_timeout_ms: Option<u64>,
40 pub immediate_first: bool,
43 pub max_elapsed_ms: Option<u64>,
46}
47
48impl Default for RetryConfig {
49 fn default() -> Self {
50 Self {
51 max_retries: 3,
52 initial_delay_ms: 1_000,
53 max_delay_ms: 10_000,
54 backoff_factor: 2.0,
55 jitter_ms: 100,
56 operation_timeout_ms: Some(30_000),
57 immediate_first: false,
58 max_elapsed_ms: None,
59 }
60 }
61}
62
63#[derive(Clone, Debug)]
67pub struct RetryManager<E> {
68 config: RetryConfig,
69 _phantom: PhantomData<E>,
70}
71
72impl<E> RetryManager<E>
73where
74 E: std::error::Error,
75{
76 pub const fn new(config: RetryConfig) -> anyhow::Result<Self> {
82 Ok(Self {
83 config,
84 _phantom: PhantomData,
85 })
86 }
87
88 pub async fn execute_with_retry_inner<F, Fut, T>(
102 &self,
103 operation_name: &str,
104 mut operation: F,
105 should_retry: impl Fn(&E) -> bool,
106 create_error: impl Fn(String) -> E,
107 cancel: Option<&CancellationToken>,
108 ) -> Result<T, E>
109 where
110 F: FnMut() -> Fut,
111 Fut: Future<Output = Result<T, E>>,
112 {
113 let mut backoff = ExponentialBackoff::new(
114 Duration::from_millis(self.config.initial_delay_ms),
115 Duration::from_millis(self.config.max_delay_ms),
116 self.config.backoff_factor,
117 self.config.jitter_ms,
118 self.config.immediate_first,
119 )
120 .map_err(|e| create_error(format!("Invalid configuration: {e}")))?;
121
122 let mut attempt = 0;
123 let start_time = tokio::time::Instant::now();
124
125 loop {
126 if let Some(token) = cancel
127 && token.is_cancelled()
128 {
129 tracing::debug!(
130 operation = %operation_name,
131 attempts = attempt,
132 "Operation canceled"
133 );
134 return Err(create_error("canceled".to_string()));
135 }
136
137 if let Some(max_elapsed_ms) = self.config.max_elapsed_ms {
138 let elapsed = start_time.elapsed();
139 if elapsed.as_millis() >= u128::from(max_elapsed_ms) {
140 let timeout_error = create_error("Budget exceeded".to_string());
141 tracing::trace!(
142 operation = %operation_name,
143 attempts = attempt + 1,
144 budget_ms = max_elapsed_ms,
145 "Retry budget exceeded"
146 );
147 return Err(timeout_error);
148 }
149 }
150
151 let result = match (self.config.operation_timeout_ms, cancel) {
152 (Some(timeout_ms), Some(token)) => {
153 tokio::select! {
154 result = tokio::time::timeout(Duration::from_millis(timeout_ms), operation()) => result,
155 () = token.cancelled() => {
156 tracing::debug!(
157 operation = %operation_name,
158 "Operation canceled during execution"
159 );
160 return Err(create_error("canceled".to_string()));
161 }
162 }
163 }
164 (Some(timeout_ms), None) => {
165 tokio::time::timeout(Duration::from_millis(timeout_ms), operation()).await
166 }
167 (None, Some(token)) => {
168 tokio::select! {
169 result = operation() => Ok(result),
170 () = token.cancelled() => {
171 tracing::debug!(
172 operation = %operation_name,
173 "Operation canceled during execution"
174 );
175 return Err(create_error("canceled".to_string()));
176 }
177 }
178 }
179 (None, None) => Ok(operation().await),
180 };
181
182 match result {
183 Ok(Ok(success)) => {
184 if attempt > 0 {
185 tracing::trace!(
186 operation = %operation_name,
187 attempts = attempt + 1,
188 "Retry succeeded"
189 );
190 }
191 return Ok(success);
192 }
193 Ok(Err(error)) => {
194 if !should_retry(&error) {
195 tracing::trace!(
196 operation = %operation_name,
197 error = %error,
198 "Non-retryable error"
199 );
200 return Err(error);
201 }
202
203 if attempt >= self.config.max_retries {
204 tracing::trace!(
205 operation = %operation_name,
206 attempts = attempt + 1,
207 error = %error,
208 "Retries exhausted"
209 );
210 return Err(error);
211 }
212
213 let mut delay = backoff.next_duration();
214
215 if let Some(max_elapsed_ms) = self.config.max_elapsed_ms {
216 let elapsed = start_time.elapsed();
217 let remaining =
218 Duration::from_millis(max_elapsed_ms).saturating_sub(elapsed);
219
220 if remaining.is_zero() {
221 let timeout_error = create_error("Budget exceeded".to_string());
222 tracing::trace!(
223 operation = %operation_name,
224 attempts = attempt + 1,
225 budget_ms = max_elapsed_ms,
226 "Retry budget exceeded"
227 );
228 return Err(timeout_error);
229 }
230
231 delay = delay.min(remaining);
232 }
233
234 tracing::trace!(
235 operation = %operation_name,
236 attempt = attempt + 1,
237 delay_ms = delay.as_millis() as u64,
238 error = %error,
239 "Retrying after failure"
240 );
241
242 if delay.is_zero() {
244 tokio::task::yield_now().await;
245 attempt += 1;
246 continue;
247 }
248
249 if let Some(token) = cancel {
250 tokio::select! {
251 () = tokio::time::sleep(delay) => {},
252 () = token.cancelled() => {
253 tracing::debug!(
254 operation = %operation_name,
255 attempt = attempt + 1,
256 "Operation canceled during retry delay"
257 );
258 return Err(create_error("canceled".to_string()));
259 }
260 }
261 } else {
262 tokio::time::sleep(delay).await;
263 }
264 attempt += 1;
265 }
266 Err(_timeout) => {
267 let timeout_error = create_error(format!(
268 "Timed out after {}ms",
269 self.config.operation_timeout_ms.unwrap_or(0)
270 ));
271
272 if !should_retry(&timeout_error) {
273 tracing::trace!(
274 operation = %operation_name,
275 error = %timeout_error,
276 "Non-retryable timeout"
277 );
278 return Err(timeout_error);
279 }
280
281 if attempt >= self.config.max_retries {
282 tracing::trace!(
283 operation = %operation_name,
284 attempts = attempt + 1,
285 error = %timeout_error,
286 "Retries exhausted after timeout"
287 );
288 return Err(timeout_error);
289 }
290
291 let mut delay = backoff.next_duration();
292
293 if let Some(max_elapsed_ms) = self.config.max_elapsed_ms {
294 let elapsed = start_time.elapsed();
295 let remaining =
296 Duration::from_millis(max_elapsed_ms).saturating_sub(elapsed);
297
298 if remaining.is_zero() {
299 let budget_error = create_error("Budget exceeded".to_string());
300 tracing::trace!(
301 operation = %operation_name,
302 attempts = attempt + 1,
303 budget_ms = max_elapsed_ms,
304 "Retry budget exceeded"
305 );
306 return Err(budget_error);
307 }
308
309 delay = delay.min(remaining);
310 }
311
312 tracing::trace!(
313 operation = %operation_name,
314 attempt = attempt + 1,
315 delay_ms = delay.as_millis() as u64,
316 error = %timeout_error,
317 "Retrying after timeout"
318 );
319
320 if delay.is_zero() {
322 tokio::task::yield_now().await;
323 attempt += 1;
324 continue;
325 }
326
327 if let Some(token) = cancel {
328 tokio::select! {
329 () = tokio::time::sleep(delay) => {},
330 () = token.cancelled() => {
331 tracing::debug!(
332 operation = %operation_name,
333 attempt = attempt + 1,
334 "Operation canceled during retry delay"
335 );
336 return Err(create_error("canceled".to_string()));
337 }
338 }
339 } else {
340 tokio::time::sleep(delay).await;
341 }
342 attempt += 1;
343 }
344 }
345 }
346 }
347
348 pub async fn execute_with_retry<F, Fut, T>(
355 &self,
356 operation_name: &str,
357 operation: F,
358 should_retry: impl Fn(&E) -> bool,
359 create_error: impl Fn(String) -> E,
360 ) -> Result<T, E>
361 where
362 F: FnMut() -> Fut,
363 Fut: Future<Output = Result<T, E>>,
364 {
365 self.execute_with_retry_inner(operation_name, operation, should_retry, create_error, None)
366 .await
367 }
368
369 pub async fn execute_with_retry_with_cancel<F, Fut, T>(
376 &self,
377 operation_name: &str,
378 operation: F,
379 should_retry: impl Fn(&E) -> bool,
380 create_error: impl Fn(String) -> E,
381 cancellation_token: &CancellationToken,
382 ) -> Result<T, E>
383 where
384 F: FnMut() -> Fut,
385 Fut: Future<Output = Result<T, E>>,
386 {
387 self.execute_with_retry_inner(
388 operation_name,
389 operation,
390 should_retry,
391 create_error,
392 Some(cancellation_token),
393 )
394 .await
395 }
396}
397
398pub fn create_default_retry_manager<E>() -> anyhow::Result<RetryManager<E>>
404where
405 E: std::error::Error,
406{
407 RetryManager::new(RetryConfig::default())
408}
409
410pub const fn create_http_retry_manager<E>() -> anyhow::Result<RetryManager<E>>
416where
417 E: std::error::Error,
418{
419 let config = RetryConfig {
420 max_retries: 3,
421 initial_delay_ms: 1_000,
422 max_delay_ms: 10_000,
423 backoff_factor: 2.0,
424 jitter_ms: 1_000,
425 operation_timeout_ms: Some(60_000), immediate_first: false,
427 max_elapsed_ms: Some(180_000), };
429 RetryManager::new(config)
430}
431
432pub const fn create_websocket_retry_manager<E>() -> anyhow::Result<RetryManager<E>>
438where
439 E: std::error::Error,
440{
441 let config = RetryConfig {
442 max_retries: 5,
443 initial_delay_ms: 1_000,
444 max_delay_ms: 10_000,
445 backoff_factor: 2.0,
446 jitter_ms: 1_000,
447 operation_timeout_ms: Some(30_000), immediate_first: true,
449 max_elapsed_ms: Some(120_000), };
451 RetryManager::new(config)
452}
453
454#[cfg(test)]
459mod test_utils {
460 #[derive(Debug, thiserror::Error)]
461 pub enum TestError {
462 #[error("Retryable error: {0}")]
463 Retryable(String),
464 #[error("Non-retryable error: {0}")]
465 NonRetryable(String),
466 #[error("Timeout error: {0}")]
467 Timeout(String),
468 }
469
470 pub fn should_retry_test_error(error: &TestError) -> bool {
471 matches!(error, TestError::Retryable(_))
472 }
473
474 pub fn create_test_error(msg: String) -> TestError {
475 TestError::Timeout(msg)
476 }
477}
478
479#[cfg(test)]
480mod tests {
481 use std::sync::{
482 Arc,
483 atomic::{AtomicU32, Ordering},
484 };
485
486 use rstest::rstest;
487
488 use super::{test_utils::*, *};
489
490 const MAX_WAIT_ITERS: usize = 10_000;
491 const MAX_ADVANCE_ITERS: usize = 10_000;
492
493 pub(crate) async fn yield_until<F>(mut condition: F)
494 where
495 F: FnMut() -> bool,
496 {
497 for _ in 0..MAX_WAIT_ITERS {
498 if condition() {
499 return;
500 }
501 tokio::task::yield_now().await;
502 }
503
504 panic!("yield_until timed out waiting for condition");
505 }
506
507 pub(crate) async fn advance_until<F>(mut condition: F)
508 where
509 F: FnMut() -> bool,
510 {
511 for _ in 0..MAX_ADVANCE_ITERS {
512 if condition() {
513 return;
514 }
515 tokio::time::advance(Duration::from_millis(1)).await;
516 tokio::task::yield_now().await;
517 }
518
519 panic!("advance_until timed out waiting for condition");
520 }
521
522 #[rstest]
523 fn test_retry_config_default() {
524 let config = RetryConfig::default();
525 assert_eq!(config.max_retries, 3);
526 assert_eq!(config.initial_delay_ms, 1_000);
527 assert_eq!(config.max_delay_ms, 10_000);
528 assert_eq!(config.backoff_factor, 2.0);
529 assert_eq!(config.jitter_ms, 100);
530 assert_eq!(config.operation_timeout_ms, Some(30_000));
531 assert!(!config.immediate_first);
532 assert_eq!(config.max_elapsed_ms, None);
533 }
534
535 #[tokio::test]
536 async fn test_retry_manager_success_first_attempt() {
537 let manager = RetryManager::new(RetryConfig::default()).unwrap();
538
539 let result = manager
540 .execute_with_retry(
541 "test_operation",
542 || async { Ok::<i32, TestError>(42) },
543 should_retry_test_error,
544 create_test_error,
545 )
546 .await;
547
548 assert_eq!(result.unwrap(), 42);
549 }
550
551 #[tokio::test]
552 async fn test_retry_manager_non_retryable_error() {
553 let manager = RetryManager::new(RetryConfig::default()).unwrap();
554
555 let result = manager
556 .execute_with_retry(
557 "test_operation",
558 || async { Err::<i32, TestError>(TestError::NonRetryable("test".to_string())) },
559 should_retry_test_error,
560 create_test_error,
561 )
562 .await;
563
564 assert!(result.is_err());
565 assert!(matches!(result.unwrap_err(), TestError::NonRetryable(_)));
566 }
567
568 #[tokio::test]
569 async fn test_retry_manager_retryable_error_exhausted() {
570 let config = RetryConfig {
571 max_retries: 2,
572 initial_delay_ms: 10,
573 max_delay_ms: 50,
574 backoff_factor: 2.0,
575 jitter_ms: 0,
576 operation_timeout_ms: None,
577 immediate_first: false,
578 max_elapsed_ms: None,
579 };
580 let manager = RetryManager::new(config).unwrap();
581
582 let result = manager
583 .execute_with_retry(
584 "test_operation",
585 || async { Err::<i32, TestError>(TestError::Retryable("test".to_string())) },
586 should_retry_test_error,
587 create_test_error,
588 )
589 .await;
590
591 assert!(result.is_err());
592 assert!(matches!(result.unwrap_err(), TestError::Retryable(_)));
593 }
594
595 #[tokio::test]
596 async fn test_timeout_path() {
597 let config = RetryConfig {
598 max_retries: 2,
599 initial_delay_ms: 10,
600 max_delay_ms: 50,
601 backoff_factor: 2.0,
602 jitter_ms: 0,
603 operation_timeout_ms: Some(50),
604 immediate_first: false,
605 max_elapsed_ms: None,
606 };
607 let manager = RetryManager::new(config).unwrap();
608
609 let result = manager
610 .execute_with_retry(
611 "test_timeout",
612 || async {
613 tokio::time::sleep(Duration::from_millis(100)).await;
614 Ok::<i32, TestError>(42)
615 },
616 should_retry_test_error,
617 create_test_error,
618 )
619 .await;
620
621 assert!(result.is_err());
622 assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
623 }
624
625 #[tokio::test]
626 async fn test_max_elapsed_time_budget() {
627 let config = RetryConfig {
628 max_retries: 10,
629 initial_delay_ms: 50,
630 max_delay_ms: 100,
631 backoff_factor: 2.0,
632 jitter_ms: 0,
633 operation_timeout_ms: None,
634 immediate_first: false,
635 max_elapsed_ms: Some(200),
636 };
637 let manager = RetryManager::new(config).unwrap();
638
639 let start = tokio::time::Instant::now();
640 let result = manager
641 .execute_with_retry(
642 "test_budget",
643 || async { Err::<i32, TestError>(TestError::Retryable("test".to_string())) },
644 should_retry_test_error,
645 create_test_error,
646 )
647 .await;
648
649 let elapsed = start.elapsed();
650 assert!(result.is_err());
651 assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
652 assert!(elapsed.as_millis() >= 150);
653 assert!(elapsed.as_millis() < 1000);
654 }
655
656 #[rstest]
657 fn test_http_retry_manager_config() {
658 let manager = create_http_retry_manager::<TestError>().unwrap();
659 assert_eq!(manager.config.max_retries, 3);
660 assert!(!manager.config.immediate_first);
661 assert_eq!(manager.config.max_elapsed_ms, Some(180_000));
662 }
663
664 #[rstest]
665 fn test_websocket_retry_manager_config() {
666 let manager = create_websocket_retry_manager::<TestError>().unwrap();
667 assert_eq!(manager.config.max_retries, 5);
668 assert!(manager.config.immediate_first);
669 assert_eq!(manager.config.max_elapsed_ms, Some(120_000));
670 }
671
672 #[tokio::test]
673 async fn test_timeout_respects_retry_predicate() {
674 let config = RetryConfig {
675 max_retries: 3,
676 initial_delay_ms: 10,
677 max_delay_ms: 50,
678 backoff_factor: 2.0,
679 jitter_ms: 0,
680 operation_timeout_ms: Some(50),
681 immediate_first: false,
682 max_elapsed_ms: None,
683 };
684 let manager = RetryManager::new(config).unwrap();
685
686 let should_not_retry_timeouts = |error: &TestError| !matches!(error, TestError::Timeout(_));
688
689 let result = manager
690 .execute_with_retry(
691 "test_timeout_non_retryable",
692 || async {
693 tokio::time::sleep(Duration::from_millis(100)).await;
694 Ok::<i32, TestError>(42)
695 },
696 should_not_retry_timeouts,
697 create_test_error,
698 )
699 .await;
700
701 assert!(result.is_err());
703 assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
704 }
705
706 #[tokio::test]
707 async fn test_timeout_retries_when_predicate_allows() {
708 let config = RetryConfig {
709 max_retries: 2,
710 initial_delay_ms: 10,
711 max_delay_ms: 50,
712 backoff_factor: 2.0,
713 jitter_ms: 0,
714 operation_timeout_ms: Some(50),
715 immediate_first: false,
716 max_elapsed_ms: None,
717 };
718 let manager = RetryManager::new(config).unwrap();
719
720 let should_retry_timeouts = |error: &TestError| matches!(error, TestError::Timeout(_));
722
723 let start = tokio::time::Instant::now();
724 let result = manager
725 .execute_with_retry(
726 "test_timeout_retryable",
727 || async {
728 tokio::time::sleep(Duration::from_millis(100)).await;
729 Ok::<i32, TestError>(42)
730 },
731 should_retry_timeouts,
732 create_test_error,
733 )
734 .await;
735
736 let elapsed = start.elapsed();
737
738 assert!(result.is_err());
740 assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
741 assert!(elapsed.as_millis() > 80); }
744
745 #[tokio::test]
746 async fn test_successful_retry_after_failures() {
747 let config = RetryConfig {
748 max_retries: 3,
749 initial_delay_ms: 10,
750 max_delay_ms: 50,
751 backoff_factor: 2.0,
752 jitter_ms: 0,
753 operation_timeout_ms: None,
754 immediate_first: false,
755 max_elapsed_ms: None,
756 };
757 let manager = RetryManager::new(config).unwrap();
758
759 let attempt_counter = Arc::new(AtomicU32::new(0));
760 let counter_clone = attempt_counter.clone();
761
762 let result = manager
763 .execute_with_retry(
764 "test_eventual_success",
765 move || {
766 let counter = counter_clone.clone();
767 async move {
768 let attempts = counter.fetch_add(1, Ordering::SeqCst);
769 if attempts < 2 {
770 Err(TestError::Retryable("temporary failure".to_string()))
771 } else {
772 Ok(42)
773 }
774 }
775 },
776 should_retry_test_error,
777 create_test_error,
778 )
779 .await;
780
781 assert_eq!(result.unwrap(), 42);
782 assert_eq!(attempt_counter.load(Ordering::SeqCst), 3);
783 }
784
785 #[tokio::test(start_paused = true)]
786 async fn test_immediate_first_retry() {
787 let config = RetryConfig {
788 max_retries: 2,
789 initial_delay_ms: 100,
790 max_delay_ms: 200,
791 backoff_factor: 2.0,
792 jitter_ms: 0,
793 operation_timeout_ms: None,
794 immediate_first: true,
795 max_elapsed_ms: None,
796 };
797 let manager = RetryManager::new(config).unwrap();
798
799 let attempt_times = Arc::new(std::sync::Mutex::new(Vec::new()));
800 let times_clone = attempt_times.clone();
801 let start = tokio::time::Instant::now();
802
803 let handle = tokio::spawn({
804 let times_clone = times_clone.clone();
805 async move {
806 let _ = manager
807 .execute_with_retry(
808 "test_immediate",
809 move || {
810 let times = times_clone.clone();
811 async move {
812 times.lock().unwrap().push(start.elapsed());
813 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
814 }
815 },
816 should_retry_test_error,
817 create_test_error,
818 )
819 .await;
820 }
821 });
822
823 yield_until(|| attempt_times.lock().unwrap().len() >= 2).await;
825
826 tokio::time::advance(Duration::from_millis(100)).await;
828 tokio::task::yield_now().await;
829
830 yield_until(|| attempt_times.lock().unwrap().len() >= 3).await;
832
833 handle.await.unwrap();
834
835 let times = attempt_times.lock().unwrap();
836 assert_eq!(times.len(), 3); assert!(times[1] <= Duration::from_millis(1));
840 assert!(times[2] >= Duration::from_millis(100));
842 assert!(times[2] <= Duration::from_millis(110));
843 }
844
845 #[tokio::test]
846 async fn test_operation_without_timeout() {
847 let config = RetryConfig {
848 max_retries: 2,
849 initial_delay_ms: 10,
850 max_delay_ms: 50,
851 backoff_factor: 2.0,
852 jitter_ms: 0,
853 operation_timeout_ms: None, immediate_first: false,
855 max_elapsed_ms: None,
856 };
857 let manager = RetryManager::new(config).unwrap();
858
859 let start = tokio::time::Instant::now();
860 let result = manager
861 .execute_with_retry(
862 "test_no_timeout",
863 || async {
864 tokio::time::sleep(Duration::from_millis(50)).await;
865 Ok::<i32, TestError>(42)
866 },
867 should_retry_test_error,
868 create_test_error,
869 )
870 .await;
871
872 let elapsed = start.elapsed();
873 assert_eq!(result.unwrap(), 42);
874 assert!(elapsed.as_millis() >= 30);
876 assert!(elapsed.as_millis() < 200);
877 }
878
879 #[tokio::test]
880 async fn test_zero_retries() {
881 let config = RetryConfig {
882 max_retries: 0,
883 initial_delay_ms: 10,
884 max_delay_ms: 50,
885 backoff_factor: 2.0,
886 jitter_ms: 0,
887 operation_timeout_ms: None,
888 immediate_first: false,
889 max_elapsed_ms: None,
890 };
891 let manager = RetryManager::new(config).unwrap();
892
893 let attempt_counter = Arc::new(AtomicU32::new(0));
894 let counter_clone = attempt_counter.clone();
895
896 let result = manager
897 .execute_with_retry(
898 "test_no_retries",
899 move || {
900 let counter = counter_clone.clone();
901 async move {
902 counter.fetch_add(1, Ordering::SeqCst);
903 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
904 }
905 },
906 should_retry_test_error,
907 create_test_error,
908 )
909 .await;
910
911 assert!(result.is_err());
912 assert_eq!(attempt_counter.load(Ordering::SeqCst), 1);
914 }
915
916 #[tokio::test(start_paused = true)]
917 async fn test_jitter_applied() {
918 let config = RetryConfig {
919 max_retries: 2,
920 initial_delay_ms: 50,
921 max_delay_ms: 100,
922 backoff_factor: 2.0,
923 jitter_ms: 50, operation_timeout_ms: None,
925 immediate_first: false,
926 max_elapsed_ms: None,
927 };
928 let manager = RetryManager::new(config).unwrap();
929
930 let delays = Arc::new(std::sync::Mutex::new(Vec::new()));
931 let delays_clone = delays.clone();
932 let last_time = Arc::new(std::sync::Mutex::new(tokio::time::Instant::now()));
933 let last_time_clone = last_time.clone();
934
935 let handle = tokio::spawn({
936 let delays_clone = delays_clone.clone();
937 async move {
938 let _ = manager
939 .execute_with_retry(
940 "test_jitter",
941 move || {
942 let delays = delays_clone.clone();
943 let last_time = last_time_clone.clone();
944 async move {
945 let now = tokio::time::Instant::now();
946 let delay = {
947 let mut last = last_time.lock().unwrap();
948 let d = now.duration_since(*last);
949 *last = now;
950 d
951 };
952 delays.lock().unwrap().push(delay);
953 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
954 }
955 },
956 should_retry_test_error,
957 create_test_error,
958 )
959 .await;
960 }
961 });
962
963 yield_until(|| !delays.lock().unwrap().is_empty()).await;
964 advance_until(|| delays.lock().unwrap().len() >= 2).await;
965 advance_until(|| delays.lock().unwrap().len() >= 3).await;
966
967 handle.await.unwrap();
968
969 let delays = delays.lock().unwrap();
970 for delay in delays.iter().skip(1) {
972 assert!(delay.as_millis() >= 50);
974 assert!(delay.as_millis() <= 151);
976 }
977 }
978
979 #[tokio::test]
980 async fn test_max_elapsed_stops_early() {
981 let config = RetryConfig {
982 max_retries: 100, initial_delay_ms: 50,
984 max_delay_ms: 100,
985 backoff_factor: 1.5,
986 jitter_ms: 0,
987 operation_timeout_ms: None,
988 immediate_first: false,
989 max_elapsed_ms: Some(150), };
991 let manager = RetryManager::new(config).unwrap();
992
993 let attempt_counter = Arc::new(AtomicU32::new(0));
994 let counter_clone = attempt_counter.clone();
995
996 let start = tokio::time::Instant::now();
997 let result = manager
998 .execute_with_retry(
999 "test_elapsed_limit",
1000 move || {
1001 let counter = counter_clone.clone();
1002 async move {
1003 counter.fetch_add(1, Ordering::SeqCst);
1004 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1005 }
1006 },
1007 should_retry_test_error,
1008 create_test_error,
1009 )
1010 .await;
1011
1012 let elapsed = start.elapsed();
1013 assert!(result.is_err());
1014 assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
1015
1016 let attempts = attempt_counter.load(Ordering::SeqCst);
1018 assert!(attempts < 10); assert!(elapsed.as_millis() >= 100);
1020 }
1021
1022 #[tokio::test]
1023 async fn test_mixed_errors_retry_behavior() {
1024 let config = RetryConfig {
1025 max_retries: 5,
1026 initial_delay_ms: 10,
1027 max_delay_ms: 50,
1028 backoff_factor: 2.0,
1029 jitter_ms: 0,
1030 operation_timeout_ms: None,
1031 immediate_first: false,
1032 max_elapsed_ms: None,
1033 };
1034 let manager = RetryManager::new(config).unwrap();
1035
1036 let attempt_counter = Arc::new(AtomicU32::new(0));
1037 let counter_clone = attempt_counter.clone();
1038
1039 let result = manager
1040 .execute_with_retry(
1041 "test_mixed_errors",
1042 move || {
1043 let counter = counter_clone.clone();
1044 async move {
1045 let attempts = counter.fetch_add(1, Ordering::SeqCst);
1046 match attempts {
1047 0 => Err(TestError::Retryable("retry 1".to_string())),
1048 1 => Err(TestError::Retryable("retry 2".to_string())),
1049 2 => Err(TestError::NonRetryable("stop here".to_string())),
1050 _ => Ok(42),
1051 }
1052 }
1053 },
1054 should_retry_test_error,
1055 create_test_error,
1056 )
1057 .await;
1058
1059 assert!(result.is_err());
1060 assert!(matches!(result.unwrap_err(), TestError::NonRetryable(_)));
1061 assert_eq!(attempt_counter.load(Ordering::SeqCst), 3);
1063 }
1064
1065 #[tokio::test]
1066 async fn test_cancellation_during_retry_delay() {
1067 use tokio_util::sync::CancellationToken;
1068
1069 let config = RetryConfig {
1070 max_retries: 10,
1071 initial_delay_ms: 500, max_delay_ms: 1000,
1073 backoff_factor: 2.0,
1074 jitter_ms: 0,
1075 operation_timeout_ms: None,
1076 immediate_first: false,
1077 max_elapsed_ms: None,
1078 };
1079 let manager = RetryManager::new(config).unwrap();
1080
1081 let token = CancellationToken::new();
1082 let token_clone = token.clone();
1083
1084 tokio::spawn(async move {
1086 tokio::time::sleep(Duration::from_millis(100)).await;
1087 token_clone.cancel();
1088 });
1089
1090 let attempt_counter = Arc::new(AtomicU32::new(0));
1091 let counter_clone = attempt_counter.clone();
1092
1093 let start = tokio::time::Instant::now();
1094 let result = manager
1095 .execute_with_retry_with_cancel(
1096 "test_cancellation",
1097 move || {
1098 let counter = counter_clone.clone();
1099 async move {
1100 counter.fetch_add(1, Ordering::SeqCst);
1101 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1102 }
1103 },
1104 should_retry_test_error,
1105 create_test_error,
1106 &token,
1107 )
1108 .await;
1109
1110 let elapsed = start.elapsed();
1111
1112 assert!(result.is_err());
1114 let error_msg = format!("{}", result.unwrap_err());
1115 assert!(error_msg.contains("canceled"));
1116
1117 assert!(elapsed.as_millis() < 600);
1119
1120 let attempts = attempt_counter.load(Ordering::SeqCst);
1122 assert!(attempts >= 1);
1123 }
1124
1125 #[tokio::test]
1126 async fn test_cancellation_during_operation_execution() {
1127 use tokio_util::sync::CancellationToken;
1128
1129 let config = RetryConfig {
1130 max_retries: 5,
1131 initial_delay_ms: 50,
1132 max_delay_ms: 100,
1133 backoff_factor: 2.0,
1134 jitter_ms: 0,
1135 operation_timeout_ms: None,
1136 immediate_first: false,
1137 max_elapsed_ms: None,
1138 };
1139 let manager = RetryManager::new(config).unwrap();
1140
1141 let token = CancellationToken::new();
1142 let token_clone = token.clone();
1143
1144 tokio::spawn(async move {
1146 tokio::time::sleep(Duration::from_millis(50)).await;
1147 token_clone.cancel();
1148 });
1149
1150 let start = tokio::time::Instant::now();
1151 let result = manager
1152 .execute_with_retry_with_cancel(
1153 "test_cancellation_during_op",
1154 || async {
1155 tokio::time::sleep(Duration::from_millis(200)).await;
1157 Ok::<i32, TestError>(42)
1158 },
1159 should_retry_test_error,
1160 create_test_error,
1161 &token,
1162 )
1163 .await;
1164
1165 let elapsed = start.elapsed();
1166
1167 assert!(result.is_err());
1169 let error_msg = format!("{}", result.unwrap_err());
1170 assert!(error_msg.contains("canceled"));
1171
1172 assert!(elapsed.as_millis() < 250);
1174 }
1175
1176 #[tokio::test]
1177 async fn test_cancellation_error_message() {
1178 use tokio_util::sync::CancellationToken;
1179
1180 let config = RetryConfig::default();
1181 let manager = RetryManager::new(config).unwrap();
1182
1183 let token = CancellationToken::new();
1184 token.cancel(); let result = manager
1187 .execute_with_retry_with_cancel(
1188 "test_operation",
1189 || async { Ok::<i32, TestError>(42) },
1190 should_retry_test_error,
1191 create_test_error,
1192 &token,
1193 )
1194 .await;
1195
1196 assert!(result.is_err());
1197 let error_msg = format!("{}", result.unwrap_err());
1198 assert!(error_msg.contains("canceled"));
1199 }
1200}
1201
1202#[cfg(test)]
1203mod proptest_tests {
1204 use std::sync::{
1205 Arc,
1206 atomic::{AtomicU32, Ordering},
1207 };
1208
1209 use proptest::prelude::*;
1210 use rstest::rstest;
1212
1213 use super::{
1214 test_utils::*,
1215 tests::{advance_until, yield_until},
1216 *,
1217 };
1218
1219 proptest! {
1220 #[rstest]
1221 fn test_retry_config_valid_ranges(
1222 max_retries in 0u32..100,
1223 initial_delay_ms in 1u64..10_000,
1224 max_delay_ms in 1u64..60_000,
1225 backoff_factor in 1.0f64..10.0,
1226 jitter_ms in 0u64..1_000,
1227 operation_timeout_ms in prop::option::of(1u64..120_000),
1228 immediate_first in any::<bool>(),
1229 max_elapsed_ms in prop::option::of(1u64..300_000)
1230 ) {
1231 let max_delay_ms = max_delay_ms.max(initial_delay_ms);
1233
1234 let config = RetryConfig {
1235 max_retries,
1236 initial_delay_ms,
1237 max_delay_ms,
1238 backoff_factor,
1239 jitter_ms,
1240 operation_timeout_ms,
1241 immediate_first,
1242 max_elapsed_ms,
1243 };
1244
1245 let manager = RetryManager::<std::io::Error>::new(config);
1247 prop_assert!(manager.is_ok());
1248 }
1249
1250 #[rstest]
1251 fn test_retry_attempts_bounded(
1252 max_retries in 0u32..5,
1253 initial_delay_ms in 1u64..10,
1254 backoff_factor in 1.0f64..2.0,
1255 ) {
1256 let rt = tokio::runtime::Builder::new_current_thread()
1257 .enable_time()
1258 .build()
1259 .unwrap();
1260
1261 let config = RetryConfig {
1262 max_retries,
1263 initial_delay_ms,
1264 max_delay_ms: initial_delay_ms * 2,
1265 backoff_factor,
1266 jitter_ms: 0,
1267 operation_timeout_ms: None,
1268 immediate_first: false,
1269 max_elapsed_ms: None,
1270 };
1271
1272 let manager = RetryManager::new(config).unwrap();
1273 let attempt_counter = Arc::new(AtomicU32::new(0));
1274 let counter_clone = attempt_counter.clone();
1275
1276 let _result = rt.block_on(manager.execute_with_retry(
1277 "prop_test",
1278 move || {
1279 let counter = counter_clone.clone();
1280 async move {
1281 counter.fetch_add(1, Ordering::SeqCst);
1282 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1283 }
1284 },
1285 |e: &TestError| matches!(e, TestError::Retryable(_)),
1286 TestError::Timeout,
1287 ));
1288
1289 let attempts = attempt_counter.load(Ordering::SeqCst);
1290 prop_assert_eq!(attempts, max_retries + 1);
1292 }
1293
1294 #[rstest]
1295 fn test_timeout_always_respected(
1296 timeout_ms in 10u64..50,
1297 operation_delay_ms in 60u64..100,
1298 ) {
1299 let rt = tokio::runtime::Builder::new_current_thread()
1300 .enable_time()
1301 .start_paused(true)
1302 .build()
1303 .unwrap();
1304
1305 let config = RetryConfig {
1306 max_retries: 0, initial_delay_ms: 10,
1308 max_delay_ms: 100,
1309 backoff_factor: 2.0,
1310 jitter_ms: 0,
1311 operation_timeout_ms: Some(timeout_ms),
1312 immediate_first: false,
1313 max_elapsed_ms: None,
1314 };
1315
1316 let manager = RetryManager::new(config).unwrap();
1317
1318 let result = rt.block_on(async {
1319 let operation_future = manager.execute_with_retry(
1320 "timeout_test",
1321 move || async move {
1322 tokio::time::sleep(Duration::from_millis(operation_delay_ms)).await;
1323 Ok::<i32, TestError>(42)
1324 },
1325 |_: &TestError| true,
1326 TestError::Timeout,
1327 );
1328
1329 tokio::time::advance(Duration::from_millis(timeout_ms + 10)).await;
1331 operation_future.await
1332 });
1333
1334 prop_assert!(result.is_err());
1336 prop_assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
1337 }
1338
1339 #[rstest]
1340 fn test_max_elapsed_always_respected(
1341 max_elapsed_ms in 20u64..50,
1342 delay_per_retry in 15u64..30,
1343 max_retries in 10u32..20,
1344 ) {
1345 let rt = tokio::runtime::Builder::new_current_thread()
1346 .enable_time()
1347 .start_paused(true)
1348 .build()
1349 .unwrap();
1350
1351 let config = RetryConfig {
1353 max_retries,
1354 initial_delay_ms: delay_per_retry,
1355 max_delay_ms: delay_per_retry * 2,
1356 backoff_factor: 1.0, jitter_ms: 0,
1358 operation_timeout_ms: None,
1359 immediate_first: false,
1360 max_elapsed_ms: Some(max_elapsed_ms),
1361 };
1362
1363 let manager = RetryManager::new(config).unwrap();
1364 let attempt_counter = Arc::new(AtomicU32::new(0));
1365 let counter_clone = attempt_counter.clone();
1366
1367 let result = rt.block_on(async {
1368 let operation_future = manager.execute_with_retry(
1369 "elapsed_test",
1370 move || {
1371 let counter = counter_clone.clone();
1372 async move {
1373 counter.fetch_add(1, Ordering::SeqCst);
1374 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1375 }
1376 },
1377 |e: &TestError| matches!(e, TestError::Retryable(_)),
1378 TestError::Timeout,
1379 );
1380
1381 tokio::time::advance(Duration::from_millis(max_elapsed_ms + delay_per_retry)).await;
1383 operation_future.await
1384 });
1385
1386 let attempts = attempt_counter.load(Ordering::SeqCst);
1387
1388 prop_assert!(result.is_err());
1390 prop_assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
1391
1392 prop_assert!(attempts <= max_retries + 1);
1394 }
1395
1396 #[rstest]
1397 fn test_jitter_bounds(
1398 jitter_ms in 0u64..20,
1399 base_delay_ms in 10u64..30,
1400 ) {
1401 let rt = tokio::runtime::Builder::new_current_thread()
1402 .enable_time()
1403 .start_paused(true)
1404 .build()
1405 .unwrap();
1406
1407 let config = RetryConfig {
1408 max_retries: 2,
1409 initial_delay_ms: base_delay_ms,
1410 max_delay_ms: base_delay_ms * 2,
1411 backoff_factor: 1.0, jitter_ms,
1413 operation_timeout_ms: None,
1414 immediate_first: false,
1415 max_elapsed_ms: None,
1416 };
1417
1418 let manager = RetryManager::new(config).unwrap();
1419 let attempt_times = Arc::new(std::sync::Mutex::new(Vec::new()));
1420 let attempt_times_for_block = attempt_times.clone();
1421
1422 rt.block_on(async move {
1423 let attempt_times_for_wait = attempt_times_for_block.clone();
1424 let handle = tokio::spawn({
1425 let attempt_times_for_task = attempt_times_for_block.clone();
1426 let manager = manager;
1427 async move {
1428 let start_time = tokio::time::Instant::now();
1429 let _ = manager
1430 .execute_with_retry(
1431 "jitter_test",
1432 move || {
1433 let attempt_times_inner = attempt_times_for_task.clone();
1434 async move {
1435 attempt_times_inner
1436 .lock()
1437 .unwrap()
1438 .push(start_time.elapsed());
1439 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1440 }
1441 },
1442 |e: &TestError| matches!(e, TestError::Retryable(_)),
1443 TestError::Timeout,
1444 )
1445 .await;
1446 }
1447 });
1448
1449 yield_until(|| !attempt_times_for_wait.lock().unwrap().is_empty()).await;
1450 advance_until(|| attempt_times_for_wait.lock().unwrap().len() >= 2).await;
1451 advance_until(|| attempt_times_for_wait.lock().unwrap().len() >= 3).await;
1452
1453 handle.await.unwrap();
1454 });
1455
1456 let times = attempt_times.lock().unwrap();
1457
1458 prop_assert!(times.len() >= 2);
1460
1461 prop_assert!(times[0].as_millis() < 5);
1463
1464 for i in 1..times.len() {
1466 let delay_from_previous = if i == 1 {
1467 times[i] - times[0]
1468 } else {
1469 times[i] - times[i - 1]
1470 };
1471
1472 prop_assert!(
1474 delay_from_previous.as_millis() >= base_delay_ms as u128,
1475 "Retry {} delay {}ms is less than base {}ms",
1476 i, delay_from_previous.as_millis(), base_delay_ms
1477 );
1478
1479 prop_assert!(
1481 delay_from_previous.as_millis() <= (base_delay_ms + jitter_ms + 1) as u128,
1482 "Retry {} delay {}ms exceeds base {} + jitter {}",
1483 i, delay_from_previous.as_millis(), base_delay_ms, jitter_ms
1484 );
1485 }
1486 }
1487
1488 #[rstest]
1489 fn test_immediate_first_property(
1490 immediate_first in any::<bool>(),
1491 initial_delay_ms in 10u64..30,
1492 ) {
1493 let rt = tokio::runtime::Builder::new_current_thread()
1494 .enable_time()
1495 .start_paused(true)
1496 .build()
1497 .unwrap();
1498
1499 let config = RetryConfig {
1500 max_retries: 2,
1501 initial_delay_ms,
1502 max_delay_ms: initial_delay_ms * 2,
1503 backoff_factor: 2.0,
1504 jitter_ms: 0,
1505 operation_timeout_ms: None,
1506 immediate_first,
1507 max_elapsed_ms: None,
1508 };
1509
1510 let manager = RetryManager::new(config).unwrap();
1511 let attempt_times = Arc::new(std::sync::Mutex::new(Vec::new()));
1512 let attempt_times_for_block = attempt_times.clone();
1513
1514 rt.block_on(async move {
1515 let attempt_times_for_wait = attempt_times_for_block.clone();
1516 let handle = tokio::spawn({
1517 let attempt_times_for_task = attempt_times_for_block.clone();
1518 let manager = manager;
1519 async move {
1520 let start = tokio::time::Instant::now();
1521 let _ = manager
1522 .execute_with_retry(
1523 "immediate_test",
1524 move || {
1525 let attempt_times_inner = attempt_times_for_task.clone();
1526 async move {
1527 let elapsed = start.elapsed();
1528 attempt_times_inner.lock().unwrap().push(elapsed);
1529 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1530 }
1531 },
1532 |e: &TestError| matches!(e, TestError::Retryable(_)),
1533 TestError::Timeout,
1534 )
1535 .await;
1536 }
1537 });
1538
1539 yield_until(|| !attempt_times_for_wait.lock().unwrap().is_empty()).await;
1540 advance_until(|| attempt_times_for_wait.lock().unwrap().len() >= 2).await;
1541 advance_until(|| attempt_times_for_wait.lock().unwrap().len() >= 3).await;
1542
1543 handle.await.unwrap();
1544 });
1545
1546 let times = attempt_times.lock().unwrap();
1547 prop_assert!(times.len() >= 2);
1548
1549 if immediate_first {
1550 prop_assert!(times[1].as_millis() < 20,
1552 "With immediate_first=true, first retry took {}ms",
1553 times[1].as_millis());
1554 } else {
1555 prop_assert!(times[1].as_millis() >= (initial_delay_ms - 1) as u128,
1557 "With immediate_first=false, first retry was too fast: {}ms",
1558 times[1].as_millis());
1559 }
1560 }
1561
1562 #[rstest]
1563 fn test_non_retryable_stops_immediately(
1564 attempt_before_non_retryable in 0usize..3,
1565 max_retries in 3u32..5,
1566 ) {
1567 let rt = tokio::runtime::Builder::new_current_thread()
1568 .enable_time()
1569 .build()
1570 .unwrap();
1571
1572 let config = RetryConfig {
1573 max_retries,
1574 initial_delay_ms: 10,
1575 max_delay_ms: 100,
1576 backoff_factor: 2.0,
1577 jitter_ms: 0,
1578 operation_timeout_ms: None,
1579 immediate_first: false,
1580 max_elapsed_ms: None,
1581 };
1582
1583 let manager = RetryManager::new(config).unwrap();
1584 let attempt_counter = Arc::new(AtomicU32::new(0));
1585 let counter_clone = attempt_counter.clone();
1586
1587 let result: Result<i32, TestError> = rt.block_on(manager.execute_with_retry(
1588 "non_retryable_test",
1589 move || {
1590 let counter = counter_clone.clone();
1591 async move {
1592 let attempts = counter.fetch_add(1, Ordering::SeqCst) as usize;
1593 if attempts == attempt_before_non_retryable {
1594 Err(TestError::NonRetryable("stop".to_string()))
1595 } else {
1596 Err(TestError::Retryable("retry".to_string()))
1597 }
1598 }
1599 },
1600 |e: &TestError| matches!(e, TestError::Retryable(_)),
1601 TestError::Timeout,
1602 ));
1603
1604 let attempts = attempt_counter.load(Ordering::SeqCst) as usize;
1605
1606 prop_assert!(result.is_err());
1607 prop_assert!(matches!(result.unwrap_err(), TestError::NonRetryable(_)));
1608 prop_assert_eq!(attempts, attempt_before_non_retryable + 1);
1610 }
1611
1612 #[rstest]
1613 fn test_cancellation_stops_immediately(
1614 cancel_after_ms in 10u64..100,
1615 initial_delay_ms in 200u64..500,
1616 ) {
1617 use tokio_util::sync::CancellationToken;
1618
1619 let rt = tokio::runtime::Builder::new_current_thread()
1620 .enable_time()
1621 .start_paused(true)
1622 .build()
1623 .unwrap();
1624
1625 let config = RetryConfig {
1626 max_retries: 10,
1627 initial_delay_ms,
1628 max_delay_ms: initial_delay_ms * 2,
1629 backoff_factor: 2.0,
1630 jitter_ms: 0,
1631 operation_timeout_ms: None,
1632 immediate_first: false,
1633 max_elapsed_ms: None,
1634 };
1635
1636 let manager = RetryManager::new(config).unwrap();
1637 let token = CancellationToken::new();
1638 let token_clone = token.clone();
1639
1640 let result: Result<i32, TestError> = rt.block_on(async {
1641 tokio::spawn(async move {
1643 tokio::time::sleep(Duration::from_millis(cancel_after_ms)).await;
1644 token_clone.cancel();
1645 });
1646
1647 let operation_future = manager.execute_with_retry_with_cancel(
1648 "cancellation_test",
1649 || async {
1650 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1651 },
1652 |e: &TestError| matches!(e, TestError::Retryable(_)),
1653 create_test_error,
1654 &token,
1655 );
1656
1657 tokio::time::advance(Duration::from_millis(cancel_after_ms + 10)).await;
1659 operation_future.await
1660 });
1661
1662 prop_assert!(result.is_err());
1664 let error_msg = format!("{}", result.unwrap_err());
1665 prop_assert!(error_msg.contains("canceled"));
1666 }
1667
1668 #[rstest]
1669 fn test_budget_clamp_prevents_overshoot(
1670 max_elapsed_ms in 10u64..30,
1671 delay_per_retry in 20u64..50,
1672 ) {
1673 let rt = tokio::runtime::Builder::new_current_thread()
1674 .enable_time()
1675 .start_paused(true)
1676 .build()
1677 .unwrap();
1678
1679 let config = RetryConfig {
1681 max_retries: 5,
1682 initial_delay_ms: delay_per_retry,
1683 max_delay_ms: delay_per_retry * 2,
1684 backoff_factor: 1.0,
1685 jitter_ms: 0,
1686 operation_timeout_ms: None,
1687 immediate_first: false,
1688 max_elapsed_ms: Some(max_elapsed_ms),
1689 };
1690
1691 let manager = RetryManager::new(config).unwrap();
1692
1693 let _result = rt.block_on(async {
1694 let operation_future = manager.execute_with_retry(
1695 "budget_clamp_test",
1696 || async {
1697 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1699 },
1700 |e: &TestError| matches!(e, TestError::Retryable(_)),
1701 create_test_error,
1702 );
1703
1704 tokio::time::advance(Duration::from_millis(max_elapsed_ms + delay_per_retry)).await;
1706 operation_future.await
1707 });
1708
1709 }
1712
1713 #[rstest]
1714 fn test_success_on_kth_attempt(
1715 k in 1usize..5,
1716 initial_delay_ms in 5u64..20,
1717 ) {
1718 let rt = tokio::runtime::Builder::new_current_thread()
1719 .enable_time()
1720 .start_paused(true)
1721 .build()
1722 .unwrap();
1723
1724 let config = RetryConfig {
1725 max_retries: 10, initial_delay_ms,
1727 max_delay_ms: initial_delay_ms * 4,
1728 backoff_factor: 2.0,
1729 jitter_ms: 0,
1730 operation_timeout_ms: None,
1731 immediate_first: false,
1732 max_elapsed_ms: None,
1733 };
1734
1735 let manager = RetryManager::new(config).unwrap();
1736 let attempt_counter = Arc::new(AtomicU32::new(0));
1737 let counter_clone = attempt_counter.clone();
1738 let target_k = k;
1739
1740 let (result, _elapsed) = rt.block_on(async {
1741 let start = tokio::time::Instant::now();
1742
1743 let operation_future = manager.execute_with_retry(
1744 "kth_attempt_test",
1745 move || {
1746 let counter = counter_clone.clone();
1747 async move {
1748 let attempt = counter.fetch_add(1, Ordering::SeqCst) as usize;
1749 if attempt + 1 == target_k {
1750 Ok(42)
1751 } else {
1752 Err(TestError::Retryable("retry".to_string()))
1753 }
1754 }
1755 },
1756 |e: &TestError| matches!(e, TestError::Retryable(_)),
1757 create_test_error,
1758 );
1759
1760 for _ in 0..k {
1762 tokio::time::advance(Duration::from_millis(initial_delay_ms * 4)).await;
1763 }
1764
1765 let result = operation_future.await;
1766 let elapsed = start.elapsed();
1767
1768 (result, elapsed)
1769 });
1770
1771 let attempts = attempt_counter.load(Ordering::SeqCst) as usize;
1772
1773 prop_assert!(result.is_ok());
1775 prop_assert_eq!(result.unwrap(), 42);
1776 prop_assert_eq!(attempts, k);
1777 }
1778 }
1779}