1use std::{future::Future, marker::PhantomData, time::Duration};
19
20use tokio_util::sync::CancellationToken;
21
22use crate::backoff::ExponentialBackoff;
23
24#[derive(Debug, Clone)]
26pub struct RetryConfig {
27 pub max_retries: u32,
29 pub initial_delay_ms: u64,
31 pub max_delay_ms: u64,
33 pub backoff_factor: f64,
35 pub jitter_ms: u64,
37 pub operation_timeout_ms: Option<u64>,
40 pub immediate_first: bool,
43 pub max_elapsed_ms: Option<u64>,
46}
47
48impl Default for RetryConfig {
49 fn default() -> Self {
50 Self {
51 max_retries: 3,
52 initial_delay_ms: 1_000,
53 max_delay_ms: 10_000,
54 backoff_factor: 2.0,
55 jitter_ms: 100,
56 operation_timeout_ms: Some(30_000),
57 immediate_first: false,
58 max_elapsed_ms: None,
59 }
60 }
61}
62
63#[derive(Clone, Debug)]
67pub struct RetryManager<E> {
68 config: RetryConfig,
69 _phantom: PhantomData<E>,
70}
71
72impl<E> RetryManager<E>
73where
74 E: std::error::Error,
75{
76 pub const fn new(config: RetryConfig) -> anyhow::Result<Self> {
82 Ok(Self {
83 config,
84 _phantom: PhantomData,
85 })
86 }
87
88 pub async fn execute_with_retry_inner<F, Fut, T>(
95 &self,
96 operation_name: &str,
97 mut operation: F,
98 should_retry: impl Fn(&E) -> bool,
99 create_error: impl Fn(String) -> E,
100 cancel: Option<&CancellationToken>,
101 ) -> Result<T, E>
102 where
103 F: FnMut() -> Fut,
104 Fut: Future<Output = Result<T, E>>,
105 {
106 let mut backoff = ExponentialBackoff::new(
107 Duration::from_millis(self.config.initial_delay_ms),
108 Duration::from_millis(self.config.max_delay_ms),
109 self.config.backoff_factor,
110 self.config.jitter_ms,
111 self.config.immediate_first,
112 )
113 .map_err(|e| create_error(format!("Invalid configuration: {e}")))?;
114
115 let mut attempt = 0;
116 let start_time = tokio::time::Instant::now();
117
118 loop {
119 if let Some(token) = cancel
120 && token.is_cancelled()
121 {
122 tracing::debug!(
123 operation = %operation_name,
124 attempts = attempt,
125 "Operation canceled"
126 );
127 return Err(create_error("canceled".to_string()));
128 }
129
130 if let Some(max_elapsed_ms) = self.config.max_elapsed_ms {
131 let elapsed = start_time.elapsed();
132 if elapsed.as_millis() >= u128::from(max_elapsed_ms) {
133 let timeout_error = create_error("Budget exceeded".to_string());
134 tracing::trace!(
135 operation = %operation_name,
136 attempts = attempt + 1,
137 budget_ms = max_elapsed_ms,
138 "Retry budget exceeded"
139 );
140 return Err(timeout_error);
141 }
142 }
143
144 let result = match (self.config.operation_timeout_ms, cancel) {
145 (Some(timeout_ms), Some(token)) => {
146 tokio::select! {
147 result = tokio::time::timeout(Duration::from_millis(timeout_ms), operation()) => result,
148 () = token.cancelled() => {
149 tracing::debug!(
150 operation = %operation_name,
151 "Operation canceled during execution"
152 );
153 return Err(create_error("canceled".to_string()));
154 }
155 }
156 }
157 (Some(timeout_ms), None) => {
158 tokio::time::timeout(Duration::from_millis(timeout_ms), operation()).await
159 }
160 (None, Some(token)) => {
161 tokio::select! {
162 result = operation() => Ok(result),
163 () = token.cancelled() => {
164 tracing::debug!(
165 operation = %operation_name,
166 "Operation canceled during execution"
167 );
168 return Err(create_error("canceled".to_string()));
169 }
170 }
171 }
172 (None, None) => Ok(operation().await),
173 };
174
175 match result {
176 Ok(Ok(success)) => {
177 if attempt > 0 {
178 tracing::trace!(
179 operation = %operation_name,
180 attempts = attempt + 1,
181 "Retry succeeded"
182 );
183 }
184 return Ok(success);
185 }
186 Ok(Err(error)) => {
187 if !should_retry(&error) {
188 tracing::trace!(
189 operation = %operation_name,
190 error = %error,
191 "Non-retryable error"
192 );
193 return Err(error);
194 }
195
196 if attempt >= self.config.max_retries {
197 tracing::trace!(
198 operation = %operation_name,
199 attempts = attempt + 1,
200 error = %error,
201 "Retries exhausted"
202 );
203 return Err(error);
204 }
205
206 let mut delay = backoff.next_duration();
207
208 if let Some(max_elapsed_ms) = self.config.max_elapsed_ms {
209 let elapsed = start_time.elapsed();
210 let remaining =
211 Duration::from_millis(max_elapsed_ms).saturating_sub(elapsed);
212
213 if remaining.is_zero() {
214 let timeout_error = create_error("Budget exceeded".to_string());
215 tracing::trace!(
216 operation = %operation_name,
217 attempts = attempt + 1,
218 budget_ms = max_elapsed_ms,
219 "Retry budget exceeded"
220 );
221 return Err(timeout_error);
222 }
223
224 delay = delay.min(remaining);
225 }
226
227 tracing::trace!(
228 operation = %operation_name,
229 attempt = attempt + 1,
230 delay_ms = delay.as_millis() as u64,
231 error = %error,
232 "Retrying after failure"
233 );
234
235 if delay.is_zero() {
237 attempt += 1;
238 continue;
239 }
240
241 if let Some(token) = cancel {
242 tokio::select! {
243 () = tokio::time::sleep(delay) => {},
244 () = token.cancelled() => {
245 tracing::debug!(
246 operation = %operation_name,
247 attempt = attempt + 1,
248 "Operation canceled during retry delay"
249 );
250 return Err(create_error("canceled".to_string()));
251 }
252 }
253 } else {
254 tokio::time::sleep(delay).await;
255 }
256 attempt += 1;
257 }
258 Err(_timeout) => {
259 let timeout_error = create_error(format!(
260 "Timed out after {}ms",
261 self.config.operation_timeout_ms.unwrap_or(0)
262 ));
263
264 if !should_retry(&timeout_error) {
265 tracing::trace!(
266 operation = %operation_name,
267 error = %timeout_error,
268 "Non-retryable timeout"
269 );
270 return Err(timeout_error);
271 }
272
273 if attempt >= self.config.max_retries {
274 tracing::trace!(
275 operation = %operation_name,
276 attempts = attempt + 1,
277 error = %timeout_error,
278 "Retries exhausted after timeout"
279 );
280 return Err(timeout_error);
281 }
282
283 let mut delay = backoff.next_duration();
284
285 if let Some(max_elapsed_ms) = self.config.max_elapsed_ms {
286 let elapsed = start_time.elapsed();
287 let remaining =
288 Duration::from_millis(max_elapsed_ms).saturating_sub(elapsed);
289
290 if remaining.is_zero() {
291 let budget_error = create_error("Budget exceeded".to_string());
292 tracing::trace!(
293 operation = %operation_name,
294 attempts = attempt + 1,
295 budget_ms = max_elapsed_ms,
296 "Retry budget exceeded"
297 );
298 return Err(budget_error);
299 }
300
301 delay = delay.min(remaining);
302 }
303
304 tracing::trace!(
305 operation = %operation_name,
306 attempt = attempt + 1,
307 delay_ms = delay.as_millis() as u64,
308 error = %timeout_error,
309 "Retrying after timeout"
310 );
311
312 if delay.is_zero() {
314 attempt += 1;
315 continue;
316 }
317
318 if let Some(token) = cancel {
319 tokio::select! {
320 () = tokio::time::sleep(delay) => {},
321 () = token.cancelled() => {
322 tracing::debug!(
323 operation = %operation_name,
324 attempt = attempt + 1,
325 "Operation canceled during retry delay"
326 );
327 return Err(create_error("canceled".to_string()));
328 }
329 }
330 } else {
331 tokio::time::sleep(delay).await;
332 }
333 attempt += 1;
334 }
335 }
336 }
337 }
338
339 pub async fn execute_with_retry<F, Fut, T>(
346 &self,
347 operation_name: &str,
348 operation: F,
349 should_retry: impl Fn(&E) -> bool,
350 create_error: impl Fn(String) -> E,
351 ) -> Result<T, E>
352 where
353 F: FnMut() -> Fut,
354 Fut: Future<Output = Result<T, E>>,
355 {
356 self.execute_with_retry_inner(operation_name, operation, should_retry, create_error, None)
357 .await
358 }
359
360 pub async fn execute_with_retry_with_cancel<F, Fut, T>(
367 &self,
368 operation_name: &str,
369 operation: F,
370 should_retry: impl Fn(&E) -> bool,
371 create_error: impl Fn(String) -> E,
372 cancellation_token: &CancellationToken,
373 ) -> Result<T, E>
374 where
375 F: FnMut() -> Fut,
376 Fut: Future<Output = Result<T, E>>,
377 {
378 self.execute_with_retry_inner(
379 operation_name,
380 operation,
381 should_retry,
382 create_error,
383 Some(cancellation_token),
384 )
385 .await
386 }
387}
388
389pub fn create_default_retry_manager<E>() -> anyhow::Result<RetryManager<E>>
395where
396 E: std::error::Error,
397{
398 RetryManager::new(RetryConfig::default())
399}
400
401pub const fn create_http_retry_manager<E>() -> anyhow::Result<RetryManager<E>>
407where
408 E: std::error::Error,
409{
410 let config = RetryConfig {
411 max_retries: 3,
412 initial_delay_ms: 1_000,
413 max_delay_ms: 10_000,
414 backoff_factor: 2.0,
415 jitter_ms: 1_000,
416 operation_timeout_ms: Some(60_000), immediate_first: false,
418 max_elapsed_ms: Some(180_000), };
420 RetryManager::new(config)
421}
422
423pub const fn create_websocket_retry_manager<E>() -> anyhow::Result<RetryManager<E>>
429where
430 E: std::error::Error,
431{
432 let config = RetryConfig {
433 max_retries: 5,
434 initial_delay_ms: 1_000,
435 max_delay_ms: 10_000,
436 backoff_factor: 2.0,
437 jitter_ms: 1_000,
438 operation_timeout_ms: Some(30_000), immediate_first: true,
440 max_elapsed_ms: Some(120_000), };
442 RetryManager::new(config)
443}
444
445#[cfg(test)]
450mod test_utils {
451 #[derive(Debug, thiserror::Error)]
452 pub enum TestError {
453 #[error("Retryable error: {0}")]
454 Retryable(String),
455 #[error("Non-retryable error: {0}")]
456 NonRetryable(String),
457 #[error("Timeout error: {0}")]
458 Timeout(String),
459 }
460
461 pub fn should_retry_test_error(error: &TestError) -> bool {
462 matches!(error, TestError::Retryable(_))
463 }
464
465 pub fn create_test_error(msg: String) -> TestError {
466 TestError::Timeout(msg)
467 }
468}
469
470#[cfg(test)]
471mod tests {
472 use std::sync::{
473 Arc,
474 atomic::{AtomicU32, Ordering},
475 };
476
477 use rstest::rstest;
478
479 use super::{test_utils::*, *};
480
481 #[rstest]
482 fn test_retry_config_default() {
483 let config = RetryConfig::default();
484 assert_eq!(config.max_retries, 3);
485 assert_eq!(config.initial_delay_ms, 1_000);
486 assert_eq!(config.max_delay_ms, 10_000);
487 assert_eq!(config.backoff_factor, 2.0);
488 assert_eq!(config.jitter_ms, 100);
489 assert_eq!(config.operation_timeout_ms, Some(30_000));
490 assert!(!config.immediate_first);
491 assert_eq!(config.max_elapsed_ms, None);
492 }
493
494 #[tokio::test]
495 async fn test_retry_manager_success_first_attempt() {
496 let manager = RetryManager::new(RetryConfig::default()).unwrap();
497
498 let result = manager
499 .execute_with_retry(
500 "test_operation",
501 || async { Ok::<i32, TestError>(42) },
502 should_retry_test_error,
503 create_test_error,
504 )
505 .await;
506
507 assert_eq!(result.unwrap(), 42);
508 }
509
510 #[tokio::test]
511 async fn test_retry_manager_non_retryable_error() {
512 let manager = RetryManager::new(RetryConfig::default()).unwrap();
513
514 let result = manager
515 .execute_with_retry(
516 "test_operation",
517 || async { Err::<i32, TestError>(TestError::NonRetryable("test".to_string())) },
518 should_retry_test_error,
519 create_test_error,
520 )
521 .await;
522
523 assert!(result.is_err());
524 assert!(matches!(result.unwrap_err(), TestError::NonRetryable(_)));
525 }
526
527 #[tokio::test]
528 async fn test_retry_manager_retryable_error_exhausted() {
529 let config = RetryConfig {
530 max_retries: 2,
531 initial_delay_ms: 10,
532 max_delay_ms: 50,
533 backoff_factor: 2.0,
534 jitter_ms: 0,
535 operation_timeout_ms: None,
536 immediate_first: false,
537 max_elapsed_ms: None,
538 };
539 let manager = RetryManager::new(config).unwrap();
540
541 let result = manager
542 .execute_with_retry(
543 "test_operation",
544 || async { Err::<i32, TestError>(TestError::Retryable("test".to_string())) },
545 should_retry_test_error,
546 create_test_error,
547 )
548 .await;
549
550 assert!(result.is_err());
551 assert!(matches!(result.unwrap_err(), TestError::Retryable(_)));
552 }
553
554 #[tokio::test]
555 async fn test_timeout_path() {
556 let config = RetryConfig {
557 max_retries: 2,
558 initial_delay_ms: 10,
559 max_delay_ms: 50,
560 backoff_factor: 2.0,
561 jitter_ms: 0,
562 operation_timeout_ms: Some(50),
563 immediate_first: false,
564 max_elapsed_ms: None,
565 };
566 let manager = RetryManager::new(config).unwrap();
567
568 let result = manager
569 .execute_with_retry(
570 "test_timeout",
571 || async {
572 tokio::time::sleep(Duration::from_millis(100)).await;
573 Ok::<i32, TestError>(42)
574 },
575 should_retry_test_error,
576 create_test_error,
577 )
578 .await;
579
580 assert!(result.is_err());
581 assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
582 }
583
584 #[tokio::test]
585 async fn test_max_elapsed_time_budget() {
586 let config = RetryConfig {
587 max_retries: 10,
588 initial_delay_ms: 50,
589 max_delay_ms: 100,
590 backoff_factor: 2.0,
591 jitter_ms: 0,
592 operation_timeout_ms: None,
593 immediate_first: false,
594 max_elapsed_ms: Some(200),
595 };
596 let manager = RetryManager::new(config).unwrap();
597
598 let start = tokio::time::Instant::now();
599 let result = manager
600 .execute_with_retry(
601 "test_budget",
602 || async { Err::<i32, TestError>(TestError::Retryable("test".to_string())) },
603 should_retry_test_error,
604 create_test_error,
605 )
606 .await;
607
608 let elapsed = start.elapsed();
609 assert!(result.is_err());
610 assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
611 assert!(elapsed.as_millis() >= 150);
612 assert!(elapsed.as_millis() < 1000);
613 }
614
615 #[rstest]
616 fn test_http_retry_manager_config() {
617 let manager = create_http_retry_manager::<TestError>().unwrap();
618 assert_eq!(manager.config.max_retries, 3);
619 assert!(!manager.config.immediate_first);
620 assert_eq!(manager.config.max_elapsed_ms, Some(180_000));
621 }
622
623 #[rstest]
624 fn test_websocket_retry_manager_config() {
625 let manager = create_websocket_retry_manager::<TestError>().unwrap();
626 assert_eq!(manager.config.max_retries, 5);
627 assert!(manager.config.immediate_first);
628 assert_eq!(manager.config.max_elapsed_ms, Some(120_000));
629 }
630
631 #[tokio::test]
632 async fn test_timeout_respects_retry_predicate() {
633 let config = RetryConfig {
634 max_retries: 3,
635 initial_delay_ms: 10,
636 max_delay_ms: 50,
637 backoff_factor: 2.0,
638 jitter_ms: 0,
639 operation_timeout_ms: Some(50),
640 immediate_first: false,
641 max_elapsed_ms: None,
642 };
643 let manager = RetryManager::new(config).unwrap();
644
645 let should_not_retry_timeouts = |error: &TestError| !matches!(error, TestError::Timeout(_));
647
648 let result = manager
649 .execute_with_retry(
650 "test_timeout_non_retryable",
651 || async {
652 tokio::time::sleep(Duration::from_millis(100)).await;
653 Ok::<i32, TestError>(42)
654 },
655 should_not_retry_timeouts,
656 create_test_error,
657 )
658 .await;
659
660 assert!(result.is_err());
662 assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
663 }
664
665 #[tokio::test]
666 async fn test_timeout_retries_when_predicate_allows() {
667 let config = RetryConfig {
668 max_retries: 2,
669 initial_delay_ms: 10,
670 max_delay_ms: 50,
671 backoff_factor: 2.0,
672 jitter_ms: 0,
673 operation_timeout_ms: Some(50),
674 immediate_first: false,
675 max_elapsed_ms: None,
676 };
677 let manager = RetryManager::new(config).unwrap();
678
679 let should_retry_timeouts = |error: &TestError| matches!(error, TestError::Timeout(_));
681
682 let start = tokio::time::Instant::now();
683 let result = manager
684 .execute_with_retry(
685 "test_timeout_retryable",
686 || async {
687 tokio::time::sleep(Duration::from_millis(100)).await;
688 Ok::<i32, TestError>(42)
689 },
690 should_retry_timeouts,
691 create_test_error,
692 )
693 .await;
694
695 let elapsed = start.elapsed();
696
697 assert!(result.is_err());
699 assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
700 assert!(elapsed.as_millis() > 80); }
703
704 #[tokio::test]
705 async fn test_successful_retry_after_failures() {
706 let config = RetryConfig {
707 max_retries: 3,
708 initial_delay_ms: 10,
709 max_delay_ms: 50,
710 backoff_factor: 2.0,
711 jitter_ms: 0,
712 operation_timeout_ms: None,
713 immediate_first: false,
714 max_elapsed_ms: None,
715 };
716 let manager = RetryManager::new(config).unwrap();
717
718 let attempt_counter = Arc::new(AtomicU32::new(0));
719 let counter_clone = attempt_counter.clone();
720
721 let result = manager
722 .execute_with_retry(
723 "test_eventual_success",
724 move || {
725 let counter = counter_clone.clone();
726 async move {
727 let attempts = counter.fetch_add(1, Ordering::SeqCst);
728 if attempts < 2 {
729 Err(TestError::Retryable("temporary failure".to_string()))
730 } else {
731 Ok(42)
732 }
733 }
734 },
735 should_retry_test_error,
736 create_test_error,
737 )
738 .await;
739
740 assert_eq!(result.unwrap(), 42);
741 assert_eq!(attempt_counter.load(Ordering::SeqCst), 3);
742 }
743
744 #[tokio::test]
745 #[ignore = "Non-deterministic timing test - TODO: Convert to use deterministic tokio time"]
746 async fn test_immediate_first_retry() {
747 let config = RetryConfig {
748 max_retries: 2,
749 initial_delay_ms: 100,
750 max_delay_ms: 200,
751 backoff_factor: 2.0,
752 jitter_ms: 0,
753 operation_timeout_ms: None,
754 immediate_first: true,
755 max_elapsed_ms: None,
756 };
757 let manager = RetryManager::new(config).unwrap();
758
759 let attempt_times = Arc::new(std::sync::Mutex::new(Vec::new()));
760 let times_clone = attempt_times.clone();
761 let start = tokio::time::Instant::now();
762
763 let _result = manager
764 .execute_with_retry(
765 "test_immediate",
766 move || {
767 let times = times_clone.clone();
768 async move {
769 times.lock().unwrap().push(start.elapsed());
770 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
771 }
772 },
773 should_retry_test_error,
774 create_test_error,
775 )
776 .await;
777
778 let times = attempt_times.lock().unwrap();
779 assert_eq!(times.len(), 3); assert!(times[1].as_millis() < 10);
783 assert!(times[2].as_millis() >= 100);
785 }
786
787 #[tokio::test]
788 async fn test_operation_without_timeout() {
789 let config = RetryConfig {
790 max_retries: 2,
791 initial_delay_ms: 10,
792 max_delay_ms: 50,
793 backoff_factor: 2.0,
794 jitter_ms: 0,
795 operation_timeout_ms: None, immediate_first: false,
797 max_elapsed_ms: None,
798 };
799 let manager = RetryManager::new(config).unwrap();
800
801 let start = tokio::time::Instant::now();
802 let result = manager
803 .execute_with_retry(
804 "test_no_timeout",
805 || async {
806 tokio::time::sleep(Duration::from_millis(50)).await;
807 Ok::<i32, TestError>(42)
808 },
809 should_retry_test_error,
810 create_test_error,
811 )
812 .await;
813
814 let elapsed = start.elapsed();
815 assert_eq!(result.unwrap(), 42);
816 assert!(elapsed.as_millis() >= 30);
818 assert!(elapsed.as_millis() < 200);
819 }
820
821 #[tokio::test]
822 async fn test_zero_retries() {
823 let config = RetryConfig {
824 max_retries: 0,
825 initial_delay_ms: 10,
826 max_delay_ms: 50,
827 backoff_factor: 2.0,
828 jitter_ms: 0,
829 operation_timeout_ms: None,
830 immediate_first: false,
831 max_elapsed_ms: None,
832 };
833 let manager = RetryManager::new(config).unwrap();
834
835 let attempt_counter = Arc::new(AtomicU32::new(0));
836 let counter_clone = attempt_counter.clone();
837
838 let result = manager
839 .execute_with_retry(
840 "test_no_retries",
841 move || {
842 let counter = counter_clone.clone();
843 async move {
844 counter.fetch_add(1, Ordering::SeqCst);
845 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
846 }
847 },
848 should_retry_test_error,
849 create_test_error,
850 )
851 .await;
852
853 assert!(result.is_err());
854 assert_eq!(attempt_counter.load(Ordering::SeqCst), 1);
856 }
857
858 #[tokio::test]
859 #[ignore = "Non-deterministic timing test - TODO: Convert to use deterministic tokio time"]
860 async fn test_jitter_applied() {
861 let config = RetryConfig {
862 max_retries: 2,
863 initial_delay_ms: 50,
864 max_delay_ms: 100,
865 backoff_factor: 2.0,
866 jitter_ms: 50, operation_timeout_ms: None,
868 immediate_first: false,
869 max_elapsed_ms: None,
870 };
871 let manager = RetryManager::new(config).unwrap();
872
873 let delays = Arc::new(std::sync::Mutex::new(Vec::new()));
874 let delays_clone = delays.clone();
875 let last_time = Arc::new(std::sync::Mutex::new(tokio::time::Instant::now()));
876 let last_time_clone = last_time.clone();
877
878 let _result = manager
879 .execute_with_retry(
880 "test_jitter",
881 move || {
882 let delays = delays_clone.clone();
883 let last_time = last_time_clone.clone();
884 async move {
885 let now = tokio::time::Instant::now();
886 let delay = {
887 let mut last = last_time.lock().unwrap();
888 let d = now.duration_since(*last);
889 *last = now;
890 d
891 };
892 delays.lock().unwrap().push(delay);
893 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
894 }
895 },
896 should_retry_test_error,
897 create_test_error,
898 )
899 .await;
900
901 let delays = delays.lock().unwrap();
902 for delay in delays.iter().skip(1) {
904 assert!(delay.as_millis() >= 50);
906 assert!(delay.as_millis() <= 150);
908 }
909 }
910
911 #[tokio::test]
912 async fn test_max_elapsed_stops_early() {
913 let config = RetryConfig {
914 max_retries: 100, initial_delay_ms: 50,
916 max_delay_ms: 100,
917 backoff_factor: 1.5,
918 jitter_ms: 0,
919 operation_timeout_ms: None,
920 immediate_first: false,
921 max_elapsed_ms: Some(150), };
923 let manager = RetryManager::new(config).unwrap();
924
925 let attempt_counter = Arc::new(AtomicU32::new(0));
926 let counter_clone = attempt_counter.clone();
927
928 let start = tokio::time::Instant::now();
929 let result = manager
930 .execute_with_retry(
931 "test_elapsed_limit",
932 move || {
933 let counter = counter_clone.clone();
934 async move {
935 counter.fetch_add(1, Ordering::SeqCst);
936 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
937 }
938 },
939 should_retry_test_error,
940 create_test_error,
941 )
942 .await;
943
944 let elapsed = start.elapsed();
945 assert!(result.is_err());
946 assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
947
948 let attempts = attempt_counter.load(Ordering::SeqCst);
950 assert!(attempts < 10); assert!(elapsed.as_millis() >= 100);
952 }
953
954 #[tokio::test]
955 async fn test_mixed_errors_retry_behavior() {
956 let config = RetryConfig {
957 max_retries: 5,
958 initial_delay_ms: 10,
959 max_delay_ms: 50,
960 backoff_factor: 2.0,
961 jitter_ms: 0,
962 operation_timeout_ms: None,
963 immediate_first: false,
964 max_elapsed_ms: None,
965 };
966 let manager = RetryManager::new(config).unwrap();
967
968 let attempt_counter = Arc::new(AtomicU32::new(0));
969 let counter_clone = attempt_counter.clone();
970
971 let result = manager
972 .execute_with_retry(
973 "test_mixed_errors",
974 move || {
975 let counter = counter_clone.clone();
976 async move {
977 let attempts = counter.fetch_add(1, Ordering::SeqCst);
978 match attempts {
979 0 => Err(TestError::Retryable("retry 1".to_string())),
980 1 => Err(TestError::Retryable("retry 2".to_string())),
981 2 => Err(TestError::NonRetryable("stop here".to_string())),
982 _ => Ok(42),
983 }
984 }
985 },
986 should_retry_test_error,
987 create_test_error,
988 )
989 .await;
990
991 assert!(result.is_err());
992 assert!(matches!(result.unwrap_err(), TestError::NonRetryable(_)));
993 assert_eq!(attempt_counter.load(Ordering::SeqCst), 3);
995 }
996
997 #[tokio::test]
998 async fn test_cancellation_during_retry_delay() {
999 use tokio_util::sync::CancellationToken;
1000
1001 let config = RetryConfig {
1002 max_retries: 10,
1003 initial_delay_ms: 500, max_delay_ms: 1000,
1005 backoff_factor: 2.0,
1006 jitter_ms: 0,
1007 operation_timeout_ms: None,
1008 immediate_first: false,
1009 max_elapsed_ms: None,
1010 };
1011 let manager = RetryManager::new(config).unwrap();
1012
1013 let token = CancellationToken::new();
1014 let token_clone = token.clone();
1015
1016 tokio::spawn(async move {
1018 tokio::time::sleep(Duration::from_millis(100)).await;
1019 token_clone.cancel();
1020 });
1021
1022 let attempt_counter = Arc::new(AtomicU32::new(0));
1023 let counter_clone = attempt_counter.clone();
1024
1025 let start = tokio::time::Instant::now();
1026 let result = manager
1027 .execute_with_retry_with_cancel(
1028 "test_cancellation",
1029 move || {
1030 let counter = counter_clone.clone();
1031 async move {
1032 counter.fetch_add(1, Ordering::SeqCst);
1033 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1034 }
1035 },
1036 should_retry_test_error,
1037 create_test_error,
1038 &token,
1039 )
1040 .await;
1041
1042 let elapsed = start.elapsed();
1043
1044 assert!(result.is_err());
1046 let error_msg = format!("{}", result.unwrap_err());
1047 assert!(error_msg.contains("canceled"));
1048
1049 assert!(elapsed.as_millis() < 600);
1051
1052 let attempts = attempt_counter.load(Ordering::SeqCst);
1054 assert!(attempts >= 1);
1055 }
1056
1057 #[tokio::test]
1058 async fn test_cancellation_during_operation_execution() {
1059 use tokio_util::sync::CancellationToken;
1060
1061 let config = RetryConfig {
1062 max_retries: 5,
1063 initial_delay_ms: 50,
1064 max_delay_ms: 100,
1065 backoff_factor: 2.0,
1066 jitter_ms: 0,
1067 operation_timeout_ms: None,
1068 immediate_first: false,
1069 max_elapsed_ms: None,
1070 };
1071 let manager = RetryManager::new(config).unwrap();
1072
1073 let token = CancellationToken::new();
1074 let token_clone = token.clone();
1075
1076 tokio::spawn(async move {
1078 tokio::time::sleep(Duration::from_millis(50)).await;
1079 token_clone.cancel();
1080 });
1081
1082 let start = tokio::time::Instant::now();
1083 let result = manager
1084 .execute_with_retry_with_cancel(
1085 "test_cancellation_during_op",
1086 || async {
1087 tokio::time::sleep(Duration::from_millis(200)).await;
1089 Ok::<i32, TestError>(42)
1090 },
1091 should_retry_test_error,
1092 create_test_error,
1093 &token,
1094 )
1095 .await;
1096
1097 let elapsed = start.elapsed();
1098
1099 assert!(result.is_err());
1101 let error_msg = format!("{}", result.unwrap_err());
1102 assert!(error_msg.contains("canceled"));
1103
1104 assert!(elapsed.as_millis() < 250);
1106 }
1107
1108 #[tokio::test]
1109 async fn test_cancellation_error_message() {
1110 use tokio_util::sync::CancellationToken;
1111
1112 let config = RetryConfig::default();
1113 let manager = RetryManager::new(config).unwrap();
1114
1115 let token = CancellationToken::new();
1116 token.cancel(); let result = manager
1119 .execute_with_retry_with_cancel(
1120 "test_operation",
1121 || async { Ok::<i32, TestError>(42) },
1122 should_retry_test_error,
1123 create_test_error,
1124 &token,
1125 )
1126 .await;
1127
1128 assert!(result.is_err());
1129 let error_msg = format!("{}", result.unwrap_err());
1130 assert!(error_msg.contains("canceled"));
1131 }
1132}
1133
1134#[cfg(test)]
1135mod proptest_tests {
1136 use std::sync::{
1137 Arc,
1138 atomic::{AtomicU32, Ordering},
1139 };
1140
1141 use proptest::prelude::*;
1142 use rstest::rstest;
1144
1145 use super::{test_utils::*, *};
1146
1147 proptest! {
1148 #[rstest]
1149 fn test_retry_config_valid_ranges(
1150 max_retries in 0u32..100,
1151 initial_delay_ms in 1u64..10_000,
1152 max_delay_ms in 1u64..60_000,
1153 backoff_factor in 1.0f64..10.0,
1154 jitter_ms in 0u64..1_000,
1155 operation_timeout_ms in prop::option::of(1u64..120_000),
1156 immediate_first in any::<bool>(),
1157 max_elapsed_ms in prop::option::of(1u64..300_000)
1158 ) {
1159 let max_delay_ms = max_delay_ms.max(initial_delay_ms);
1161
1162 let config = RetryConfig {
1163 max_retries,
1164 initial_delay_ms,
1165 max_delay_ms,
1166 backoff_factor,
1167 jitter_ms,
1168 operation_timeout_ms,
1169 immediate_first,
1170 max_elapsed_ms,
1171 };
1172
1173 let manager = RetryManager::<std::io::Error>::new(config);
1175 prop_assert!(manager.is_ok());
1176 }
1177
1178 #[rstest]
1179 fn test_retry_attempts_bounded(
1180 max_retries in 0u32..5,
1181 initial_delay_ms in 1u64..10,
1182 backoff_factor in 1.0f64..2.0,
1183 ) {
1184 let rt = tokio::runtime::Builder::new_current_thread()
1185 .enable_time()
1186 .build()
1187 .unwrap();
1188
1189 let config = RetryConfig {
1190 max_retries,
1191 initial_delay_ms,
1192 max_delay_ms: initial_delay_ms * 2,
1193 backoff_factor,
1194 jitter_ms: 0,
1195 operation_timeout_ms: None,
1196 immediate_first: false,
1197 max_elapsed_ms: None,
1198 };
1199
1200 let manager = RetryManager::new(config).unwrap();
1201 let attempt_counter = Arc::new(AtomicU32::new(0));
1202 let counter_clone = attempt_counter.clone();
1203
1204 let _result = rt.block_on(manager.execute_with_retry(
1205 "prop_test",
1206 move || {
1207 let counter = counter_clone.clone();
1208 async move {
1209 counter.fetch_add(1, Ordering::SeqCst);
1210 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1211 }
1212 },
1213 |e: &TestError| matches!(e, TestError::Retryable(_)),
1214 TestError::Timeout,
1215 ));
1216
1217 let attempts = attempt_counter.load(Ordering::SeqCst);
1218 prop_assert_eq!(attempts, max_retries + 1);
1220 }
1221
1222 #[rstest]
1223 fn test_timeout_always_respected(
1224 timeout_ms in 10u64..50,
1225 operation_delay_ms in 60u64..100,
1226 ) {
1227 let rt = tokio::runtime::Builder::new_current_thread()
1228 .enable_time()
1229 .start_paused(true)
1230 .build()
1231 .unwrap();
1232
1233 let config = RetryConfig {
1234 max_retries: 0, initial_delay_ms: 10,
1236 max_delay_ms: 100,
1237 backoff_factor: 2.0,
1238 jitter_ms: 0,
1239 operation_timeout_ms: Some(timeout_ms),
1240 immediate_first: false,
1241 max_elapsed_ms: None,
1242 };
1243
1244 let manager = RetryManager::new(config).unwrap();
1245
1246 let result = rt.block_on(async {
1247 let operation_future = manager.execute_with_retry(
1248 "timeout_test",
1249 move || async move {
1250 tokio::time::sleep(Duration::from_millis(operation_delay_ms)).await;
1251 Ok::<i32, TestError>(42)
1252 },
1253 |_: &TestError| true,
1254 TestError::Timeout,
1255 );
1256
1257 tokio::time::advance(Duration::from_millis(timeout_ms + 10)).await;
1259 operation_future.await
1260 });
1261
1262 prop_assert!(result.is_err());
1264 prop_assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
1265 }
1266
1267 #[rstest]
1268 fn test_max_elapsed_always_respected(
1269 max_elapsed_ms in 20u64..50,
1270 delay_per_retry in 15u64..30,
1271 max_retries in 10u32..20,
1272 ) {
1273 let rt = tokio::runtime::Builder::new_current_thread()
1274 .enable_time()
1275 .start_paused(true)
1276 .build()
1277 .unwrap();
1278
1279 let config = RetryConfig {
1281 max_retries,
1282 initial_delay_ms: delay_per_retry,
1283 max_delay_ms: delay_per_retry * 2,
1284 backoff_factor: 1.0, jitter_ms: 0,
1286 operation_timeout_ms: None,
1287 immediate_first: false,
1288 max_elapsed_ms: Some(max_elapsed_ms),
1289 };
1290
1291 let manager = RetryManager::new(config).unwrap();
1292 let attempt_counter = Arc::new(AtomicU32::new(0));
1293 let counter_clone = attempt_counter.clone();
1294
1295 let result = rt.block_on(async {
1296 let operation_future = manager.execute_with_retry(
1297 "elapsed_test",
1298 move || {
1299 let counter = counter_clone.clone();
1300 async move {
1301 counter.fetch_add(1, Ordering::SeqCst);
1302 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1303 }
1304 },
1305 |e: &TestError| matches!(e, TestError::Retryable(_)),
1306 TestError::Timeout,
1307 );
1308
1309 tokio::time::advance(Duration::from_millis(max_elapsed_ms + delay_per_retry)).await;
1311 operation_future.await
1312 });
1313
1314 let attempts = attempt_counter.load(Ordering::SeqCst);
1315
1316 prop_assert!(result.is_err());
1318 prop_assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
1319
1320 prop_assert!(attempts <= max_retries + 1);
1322 }
1323
1324 #[rstest]
1325 #[ignore = "Non-deterministic timing test - TODO: Convert to use deterministic tokio time"]
1326 fn test_jitter_bounds(
1327 jitter_ms in 0u64..20,
1328 base_delay_ms in 10u64..30,
1329 ) {
1330 let rt = tokio::runtime::Builder::new_current_thread()
1331 .enable_time()
1332 .build()
1333 .unwrap();
1334
1335 let config = RetryConfig {
1336 max_retries: 2,
1337 initial_delay_ms: base_delay_ms,
1338 max_delay_ms: base_delay_ms * 2,
1339 backoff_factor: 1.0, jitter_ms,
1341 operation_timeout_ms: None,
1342 immediate_first: false,
1343 max_elapsed_ms: None,
1344 };
1345
1346 let manager = RetryManager::new(config).unwrap();
1347 let attempt_times = Arc::new(std::sync::Mutex::new(Vec::new()));
1348 let times_clone = attempt_times.clone();
1349 let start_time = std::time::Instant::now();
1350
1351 let _result = rt.block_on(manager.execute_with_retry(
1352 "jitter_test",
1353 move || {
1354 let times = times_clone.clone();
1355 async move {
1356 times.lock().unwrap().push(start_time.elapsed());
1357 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1358 }
1359 },
1360 |e: &TestError| matches!(e, TestError::Retryable(_)),
1361 TestError::Timeout,
1362 ));
1363
1364 let times = attempt_times.lock().unwrap();
1365
1366 prop_assert!(times.len() >= 2);
1368
1369 prop_assert!(times[0].as_millis() < 5);
1371
1372 for i in 1..times.len() {
1374 let delay_from_previous = if i == 1 {
1375 times[i] - times[0]
1376 } else {
1377 times[i] - times[i - 1]
1378 };
1379
1380 prop_assert!(
1382 delay_from_previous.as_millis() >= base_delay_ms as u128,
1383 "Retry {} delay {}ms is less than base {}ms",
1384 i, delay_from_previous.as_millis(), base_delay_ms
1385 );
1386
1387 prop_assert!(
1389 delay_from_previous.as_millis() <= (base_delay_ms + jitter_ms + 5) as u128,
1390 "Retry {} delay {}ms exceeds base {} + jitter {}",
1391 i, delay_from_previous.as_millis(), base_delay_ms, jitter_ms
1392 );
1393 }
1394 }
1395
1396 #[rstest]
1397 #[ignore = "Non-deterministic timing test - TODO: Convert to use deterministic tokio time"]
1398 fn test_immediate_first_property(
1399 immediate_first in any::<bool>(),
1400 initial_delay_ms in 10u64..30,
1401 ) {
1402 let rt = tokio::runtime::Builder::new_current_thread()
1403 .enable_time()
1404 .build()
1405 .unwrap();
1406
1407 let config = RetryConfig {
1408 max_retries: 2,
1409 initial_delay_ms,
1410 max_delay_ms: initial_delay_ms * 2,
1411 backoff_factor: 2.0,
1412 jitter_ms: 0,
1413 operation_timeout_ms: None,
1414 immediate_first,
1415 max_elapsed_ms: None,
1416 };
1417
1418 let manager = RetryManager::new(config).unwrap();
1419 let attempt_times = Arc::new(std::sync::Mutex::new(Vec::new()));
1420 let times_clone = attempt_times.clone();
1421
1422 let start = std::time::Instant::now();
1423 let _result = rt.block_on(manager.execute_with_retry(
1424 "immediate_test",
1425 move || {
1426 let times = times_clone.clone();
1427 async move {
1428 let elapsed = start.elapsed();
1429 times.lock().unwrap().push(elapsed);
1430 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1431 }
1432 },
1433 |e: &TestError| matches!(e, TestError::Retryable(_)),
1434 TestError::Timeout,
1435 ));
1436
1437 let times = attempt_times.lock().unwrap();
1438 prop_assert!(times.len() >= 2);
1439
1440 if immediate_first {
1441 prop_assert!(times[1].as_millis() < 20,
1443 "With immediate_first=true, first retry took {}ms",
1444 times[1].as_millis());
1445 } else {
1446 prop_assert!(times[1].as_millis() >= (initial_delay_ms - 10) as u128,
1448 "With immediate_first=false, first retry was too fast: {}ms",
1449 times[1].as_millis());
1450 }
1451 }
1452
1453 #[rstest]
1454 fn test_non_retryable_stops_immediately(
1455 attempt_before_non_retryable in 0usize..3,
1456 max_retries in 3u32..5,
1457 ) {
1458 let rt = tokio::runtime::Builder::new_current_thread()
1459 .enable_time()
1460 .build()
1461 .unwrap();
1462
1463 let config = RetryConfig {
1464 max_retries,
1465 initial_delay_ms: 10,
1466 max_delay_ms: 100,
1467 backoff_factor: 2.0,
1468 jitter_ms: 0,
1469 operation_timeout_ms: None,
1470 immediate_first: false,
1471 max_elapsed_ms: None,
1472 };
1473
1474 let manager = RetryManager::new(config).unwrap();
1475 let attempt_counter = Arc::new(AtomicU32::new(0));
1476 let counter_clone = attempt_counter.clone();
1477
1478 let result: Result<i32, TestError> = rt.block_on(manager.execute_with_retry(
1479 "non_retryable_test",
1480 move || {
1481 let counter = counter_clone.clone();
1482 async move {
1483 let attempts = counter.fetch_add(1, Ordering::SeqCst) as usize;
1484 if attempts == attempt_before_non_retryable {
1485 Err(TestError::NonRetryable("stop".to_string()))
1486 } else {
1487 Err(TestError::Retryable("retry".to_string()))
1488 }
1489 }
1490 },
1491 |e: &TestError| matches!(e, TestError::Retryable(_)),
1492 TestError::Timeout,
1493 ));
1494
1495 let attempts = attempt_counter.load(Ordering::SeqCst) as usize;
1496
1497 prop_assert!(result.is_err());
1498 prop_assert!(matches!(result.unwrap_err(), TestError::NonRetryable(_)));
1499 prop_assert_eq!(attempts, attempt_before_non_retryable + 1);
1501 }
1502
1503 #[rstest]
1504 fn test_cancellation_stops_immediately(
1505 cancel_after_ms in 10u64..100,
1506 initial_delay_ms in 200u64..500,
1507 ) {
1508 use tokio_util::sync::CancellationToken;
1509
1510 let rt = tokio::runtime::Builder::new_current_thread()
1511 .enable_time()
1512 .start_paused(true)
1513 .build()
1514 .unwrap();
1515
1516 let config = RetryConfig {
1517 max_retries: 10,
1518 initial_delay_ms,
1519 max_delay_ms: initial_delay_ms * 2,
1520 backoff_factor: 2.0,
1521 jitter_ms: 0,
1522 operation_timeout_ms: None,
1523 immediate_first: false,
1524 max_elapsed_ms: None,
1525 };
1526
1527 let manager = RetryManager::new(config).unwrap();
1528 let token = CancellationToken::new();
1529 let token_clone = token.clone();
1530
1531 let result: Result<i32, TestError> = rt.block_on(async {
1532 tokio::spawn(async move {
1534 tokio::time::sleep(Duration::from_millis(cancel_after_ms)).await;
1535 token_clone.cancel();
1536 });
1537
1538 let operation_future = manager.execute_with_retry_with_cancel(
1539 "cancellation_test",
1540 || async {
1541 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1542 },
1543 |e: &TestError| matches!(e, TestError::Retryable(_)),
1544 create_test_error,
1545 &token,
1546 );
1547
1548 tokio::time::advance(Duration::from_millis(cancel_after_ms + 10)).await;
1550 operation_future.await
1551 });
1552
1553 prop_assert!(result.is_err());
1555 let error_msg = format!("{}", result.unwrap_err());
1556 prop_assert!(error_msg.contains("canceled"));
1557 }
1558
1559 #[rstest]
1560 fn test_budget_clamp_prevents_overshoot(
1561 max_elapsed_ms in 10u64..30,
1562 delay_per_retry in 20u64..50,
1563 ) {
1564 let rt = tokio::runtime::Builder::new_current_thread()
1565 .enable_time()
1566 .start_paused(true)
1567 .build()
1568 .unwrap();
1569
1570 let config = RetryConfig {
1572 max_retries: 5,
1573 initial_delay_ms: delay_per_retry,
1574 max_delay_ms: delay_per_retry * 2,
1575 backoff_factor: 1.0,
1576 jitter_ms: 0,
1577 operation_timeout_ms: None,
1578 immediate_first: false,
1579 max_elapsed_ms: Some(max_elapsed_ms),
1580 };
1581
1582 let manager = RetryManager::new(config).unwrap();
1583
1584 let _result = rt.block_on(async {
1585 let operation_future = manager.execute_with_retry(
1586 "budget_clamp_test",
1587 || async {
1588 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1590 },
1591 |e: &TestError| matches!(e, TestError::Retryable(_)),
1592 create_test_error,
1593 );
1594
1595 tokio::time::advance(Duration::from_millis(max_elapsed_ms + delay_per_retry)).await;
1597 operation_future.await
1598 });
1599
1600 }
1603
1604 #[rstest]
1605 fn test_success_on_kth_attempt(
1606 k in 1usize..5,
1607 initial_delay_ms in 5u64..20,
1608 ) {
1609 let rt = tokio::runtime::Builder::new_current_thread()
1610 .enable_time()
1611 .start_paused(true)
1612 .build()
1613 .unwrap();
1614
1615 let config = RetryConfig {
1616 max_retries: 10, initial_delay_ms,
1618 max_delay_ms: initial_delay_ms * 4,
1619 backoff_factor: 2.0,
1620 jitter_ms: 0,
1621 operation_timeout_ms: None,
1622 immediate_first: false,
1623 max_elapsed_ms: None,
1624 };
1625
1626 let manager = RetryManager::new(config).unwrap();
1627 let attempt_counter = Arc::new(AtomicU32::new(0));
1628 let counter_clone = attempt_counter.clone();
1629 let target_k = k;
1630
1631 let (result, _elapsed) = rt.block_on(async {
1632 let start = tokio::time::Instant::now();
1633
1634 let operation_future = manager.execute_with_retry(
1635 "kth_attempt_test",
1636 move || {
1637 let counter = counter_clone.clone();
1638 async move {
1639 let attempt = counter.fetch_add(1, Ordering::SeqCst) as usize;
1640 if attempt + 1 == target_k {
1641 Ok(42)
1642 } else {
1643 Err(TestError::Retryable("retry".to_string()))
1644 }
1645 }
1646 },
1647 |e: &TestError| matches!(e, TestError::Retryable(_)),
1648 create_test_error,
1649 );
1650
1651 for _ in 0..k {
1653 tokio::time::advance(Duration::from_millis(initial_delay_ms * 4)).await;
1654 }
1655
1656 let result = operation_future.await;
1657 let elapsed = start.elapsed();
1658
1659 (result, elapsed)
1660 });
1661
1662 let attempts = attempt_counter.load(Ordering::SeqCst) as usize;
1663
1664 prop_assert!(result.is_ok());
1666 prop_assert_eq!(result.unwrap(), 42);
1667 prop_assert_eq!(attempts, k);
1668 }
1669 }
1670}