1use std::{future::Future, marker::PhantomData, time::Duration};
19
20use tokio_util::sync::CancellationToken;
21
22use crate::backoff::ExponentialBackoff;
23
24#[derive(Debug, Clone)]
26pub struct RetryConfig {
27 pub max_retries: u32,
29 pub initial_delay_ms: u64,
31 pub max_delay_ms: u64,
33 pub backoff_factor: f64,
35 pub jitter_ms: u64,
37 pub operation_timeout_ms: Option<u64>,
40 pub immediate_first: bool,
43 pub max_elapsed_ms: Option<u64>,
46}
47
48impl Default for RetryConfig {
49 fn default() -> Self {
50 Self {
51 max_retries: 3,
52 initial_delay_ms: 1_000,
53 max_delay_ms: 10_000,
54 backoff_factor: 2.0,
55 jitter_ms: 100,
56 operation_timeout_ms: Some(30_000),
57 immediate_first: false,
58 max_elapsed_ms: None,
59 }
60 }
61}
62
63#[derive(Clone, Debug)]
67pub struct RetryManager<E> {
68 config: RetryConfig,
69 _phantom: PhantomData<E>,
70}
71
72impl<E> RetryManager<E>
73where
74 E: std::error::Error,
75{
76 pub fn new(config: RetryConfig) -> anyhow::Result<Self> {
82 Ok(Self {
83 config,
84 _phantom: PhantomData,
85 })
86 }
87
88 pub async fn execute_with_retry_inner<F, Fut, T>(
95 &self,
96 operation_name: &str,
97 mut operation: F,
98 should_retry: impl Fn(&E) -> bool,
99 create_error: impl Fn(String) -> E,
100 cancel: Option<&CancellationToken>,
101 ) -> Result<T, E>
102 where
103 F: FnMut() -> Fut,
104 Fut: Future<Output = Result<T, E>>,
105 {
106 let mut backoff = ExponentialBackoff::new(
107 Duration::from_millis(self.config.initial_delay_ms),
108 Duration::from_millis(self.config.max_delay_ms),
109 self.config.backoff_factor,
110 self.config.jitter_ms,
111 self.config.immediate_first,
112 )
113 .map_err(|e| create_error(format!("Invalid configuration: {e}")))?;
114
115 let mut attempt = 0;
116 let start_time = tokio::time::Instant::now();
117
118 loop {
119 if let Some(token) = cancel
120 && token.is_cancelled()
121 {
122 tracing::debug!(
123 operation = %operation_name,
124 attempts = attempt,
125 "Operation canceled"
126 );
127 return Err(create_error("canceled".to_string()));
128 }
129
130 if let Some(max_elapsed_ms) = self.config.max_elapsed_ms {
131 let elapsed = start_time.elapsed();
132 if elapsed.as_millis() >= max_elapsed_ms as u128 {
133 let timeout_error = create_error("Budget exceeded".to_string());
134 tracing::trace!(
135 operation = %operation_name,
136 attempts = attempt + 1,
137 budget_ms = max_elapsed_ms,
138 "Retry budget exceeded"
139 );
140 return Err(timeout_error);
141 }
142 }
143
144 let result = match (self.config.operation_timeout_ms, cancel) {
145 (Some(timeout_ms), Some(token)) => {
146 tokio::select! {
147 result = tokio::time::timeout(Duration::from_millis(timeout_ms), operation()) => result,
148 _ = token.cancelled() => {
149 tracing::debug!(
150 operation = %operation_name,
151 "Operation canceled during execution"
152 );
153 return Err(create_error("canceled".to_string()));
154 }
155 }
156 }
157 (Some(timeout_ms), None) => {
158 tokio::time::timeout(Duration::from_millis(timeout_ms), operation()).await
159 }
160 (None, Some(token)) => {
161 tokio::select! {
162 result = operation() => Ok(result),
163 _ = token.cancelled() => {
164 tracing::debug!(
165 operation = %operation_name,
166 "Operation canceled during execution"
167 );
168 return Err(create_error("canceled".to_string()));
169 }
170 }
171 }
172 (None, None) => Ok(operation().await),
173 };
174
175 match result {
176 Ok(Ok(success)) => {
177 if attempt > 0 {
178 tracing::trace!(
179 operation = %operation_name,
180 attempts = attempt + 1,
181 "Retry succeeded"
182 );
183 }
184 return Ok(success);
185 }
186 Ok(Err(error)) => {
187 if !should_retry(&error) {
188 tracing::trace!(
189 operation = %operation_name,
190 error = %error,
191 "Non-retryable error"
192 );
193 return Err(error);
194 }
195
196 if attempt >= self.config.max_retries {
197 tracing::trace!(
198 operation = %operation_name,
199 attempts = attempt + 1,
200 error = %error,
201 "Retries exhausted"
202 );
203 return Err(error);
204 }
205
206 let mut delay = backoff.next_duration();
207
208 if let Some(max_elapsed_ms) = self.config.max_elapsed_ms {
209 let elapsed = start_time.elapsed();
210 let remaining =
211 Duration::from_millis(max_elapsed_ms).saturating_sub(elapsed);
212
213 if remaining.is_zero() {
214 let timeout_error = create_error("Budget exceeded".to_string());
215 tracing::trace!(
216 operation = %operation_name,
217 attempts = attempt + 1,
218 budget_ms = max_elapsed_ms,
219 "Retry budget exceeded"
220 );
221 return Err(timeout_error);
222 }
223
224 delay = delay.min(remaining);
225 }
226
227 tracing::trace!(
228 operation = %operation_name,
229 attempt = attempt + 1,
230 delay_ms = delay.as_millis() as u64,
231 error = %error,
232 "Retrying after failure"
233 );
234
235 if delay.is_zero() {
237 attempt += 1;
238 continue;
239 }
240
241 if let Some(token) = cancel {
242 tokio::select! {
243 _ = tokio::time::sleep(delay) => {},
244 _ = token.cancelled() => {
245 tracing::debug!(
246 operation = %operation_name,
247 attempt = attempt + 1,
248 "Operation canceled during retry delay"
249 );
250 return Err(create_error("canceled".to_string()));
251 }
252 }
253 } else {
254 tokio::time::sleep(delay).await;
255 }
256 attempt += 1;
257 }
258 Err(_timeout) => {
259 let timeout_error = create_error(format!(
260 "Timed out after {}ms",
261 self.config.operation_timeout_ms.unwrap_or(0)
262 ));
263
264 if !should_retry(&timeout_error) {
265 tracing::trace!(
266 operation = %operation_name,
267 error = %timeout_error,
268 "Non-retryable timeout"
269 );
270 return Err(timeout_error);
271 }
272
273 if attempt >= self.config.max_retries {
274 tracing::trace!(
275 operation = %operation_name,
276 attempts = attempt + 1,
277 error = %timeout_error,
278 "Retries exhausted after timeout"
279 );
280 return Err(timeout_error);
281 }
282
283 let mut delay = backoff.next_duration();
284
285 if let Some(max_elapsed_ms) = self.config.max_elapsed_ms {
286 let elapsed = start_time.elapsed();
287 let remaining =
288 Duration::from_millis(max_elapsed_ms).saturating_sub(elapsed);
289
290 if remaining.is_zero() {
291 let budget_error = create_error("Budget exceeded".to_string());
292 tracing::trace!(
293 operation = %operation_name,
294 attempts = attempt + 1,
295 budget_ms = max_elapsed_ms,
296 "Retry budget exceeded"
297 );
298 return Err(budget_error);
299 }
300
301 delay = delay.min(remaining);
302 }
303
304 tracing::trace!(
305 operation = %operation_name,
306 attempt = attempt + 1,
307 delay_ms = delay.as_millis() as u64,
308 error = %timeout_error,
309 "Retrying after timeout"
310 );
311
312 if delay.is_zero() {
314 attempt += 1;
315 continue;
316 }
317
318 if let Some(token) = cancel {
319 tokio::select! {
320 _ = tokio::time::sleep(delay) => {},
321 _ = token.cancelled() => {
322 tracing::debug!(
323 operation = %operation_name,
324 attempt = attempt + 1,
325 "Operation canceled during retry delay"
326 );
327 return Err(create_error("canceled".to_string()));
328 }
329 }
330 } else {
331 tokio::time::sleep(delay).await;
332 }
333 attempt += 1;
334 }
335 }
336 }
337 }
338
339 pub async fn execute_with_retry<F, Fut, T>(
346 &self,
347 operation_name: &str,
348 operation: F,
349 should_retry: impl Fn(&E) -> bool,
350 create_error: impl Fn(String) -> E,
351 ) -> Result<T, E>
352 where
353 F: FnMut() -> Fut,
354 Fut: Future<Output = Result<T, E>>,
355 {
356 self.execute_with_retry_inner(operation_name, operation, should_retry, create_error, None)
357 .await
358 }
359
360 pub async fn execute_with_retry_with_cancel<F, Fut, T>(
367 &self,
368 operation_name: &str,
369 operation: F,
370 should_retry: impl Fn(&E) -> bool,
371 create_error: impl Fn(String) -> E,
372 cancellation_token: &CancellationToken,
373 ) -> Result<T, E>
374 where
375 F: FnMut() -> Fut,
376 Fut: Future<Output = Result<T, E>>,
377 {
378 self.execute_with_retry_inner(
379 operation_name,
380 operation,
381 should_retry,
382 create_error,
383 Some(cancellation_token),
384 )
385 .await
386 }
387}
388
389pub fn create_default_retry_manager<E>() -> anyhow::Result<RetryManager<E>>
395where
396 E: std::error::Error,
397{
398 RetryManager::new(RetryConfig::default())
399}
400
401pub fn create_http_retry_manager<E>() -> anyhow::Result<RetryManager<E>>
407where
408 E: std::error::Error,
409{
410 let config = RetryConfig {
411 max_retries: 3,
412 initial_delay_ms: 1_000,
413 max_delay_ms: 10_000,
414 backoff_factor: 2.0,
415 jitter_ms: 1_000,
416 operation_timeout_ms: Some(60_000), immediate_first: false,
418 max_elapsed_ms: Some(180_000), };
420 RetryManager::new(config)
421}
422
423pub fn create_websocket_retry_manager<E>() -> anyhow::Result<RetryManager<E>>
429where
430 E: std::error::Error,
431{
432 let config = RetryConfig {
433 max_retries: 5,
434 initial_delay_ms: 1_000,
435 max_delay_ms: 10_000,
436 backoff_factor: 2.0,
437 jitter_ms: 1_000,
438 operation_timeout_ms: Some(30_000), immediate_first: true,
440 max_elapsed_ms: Some(120_000), };
442 RetryManager::new(config)
443}
444
445#[cfg(test)]
450mod test_utils {
451 #[derive(Debug, thiserror::Error)]
452 pub enum TestError {
453 #[error("Retryable error: {0}")]
454 Retryable(String),
455 #[error("Non-retryable error: {0}")]
456 NonRetryable(String),
457 #[error("Timeout error: {0}")]
458 Timeout(String),
459 }
460
461 pub fn should_retry_test_error(error: &TestError) -> bool {
462 matches!(error, TestError::Retryable(_))
463 }
464
465 pub fn create_test_error(msg: String) -> TestError {
466 TestError::Timeout(msg)
467 }
468}
469
470#[cfg(test)]
471mod tests {
472 use std::sync::{
473 Arc,
474 atomic::{AtomicU32, Ordering},
475 };
476
477 use super::{test_utils::*, *};
478
479 #[test]
480 fn test_retry_config_default() {
481 let config = RetryConfig::default();
482 assert_eq!(config.max_retries, 3);
483 assert_eq!(config.initial_delay_ms, 1_000);
484 assert_eq!(config.max_delay_ms, 10_000);
485 assert_eq!(config.backoff_factor, 2.0);
486 assert_eq!(config.jitter_ms, 100);
487 assert_eq!(config.operation_timeout_ms, Some(30_000));
488 assert!(!config.immediate_first);
489 assert_eq!(config.max_elapsed_ms, None);
490 }
491
492 #[tokio::test]
493 async fn test_retry_manager_success_first_attempt() {
494 let manager = RetryManager::new(RetryConfig::default()).unwrap();
495
496 let result = manager
497 .execute_with_retry(
498 "test_operation",
499 || async { Ok::<i32, TestError>(42) },
500 should_retry_test_error,
501 create_test_error,
502 )
503 .await;
504
505 assert_eq!(result.unwrap(), 42);
506 }
507
508 #[tokio::test]
509 async fn test_retry_manager_non_retryable_error() {
510 let manager = RetryManager::new(RetryConfig::default()).unwrap();
511
512 let result = manager
513 .execute_with_retry(
514 "test_operation",
515 || async { Err::<i32, TestError>(TestError::NonRetryable("test".to_string())) },
516 should_retry_test_error,
517 create_test_error,
518 )
519 .await;
520
521 assert!(result.is_err());
522 assert!(matches!(result.unwrap_err(), TestError::NonRetryable(_)));
523 }
524
525 #[tokio::test]
526 async fn test_retry_manager_retryable_error_exhausted() {
527 let config = RetryConfig {
528 max_retries: 2,
529 initial_delay_ms: 10,
530 max_delay_ms: 50,
531 backoff_factor: 2.0,
532 jitter_ms: 0,
533 operation_timeout_ms: None,
534 immediate_first: false,
535 max_elapsed_ms: None,
536 };
537 let manager = RetryManager::new(config).unwrap();
538
539 let result = manager
540 .execute_with_retry(
541 "test_operation",
542 || async { Err::<i32, TestError>(TestError::Retryable("test".to_string())) },
543 should_retry_test_error,
544 create_test_error,
545 )
546 .await;
547
548 assert!(result.is_err());
549 assert!(matches!(result.unwrap_err(), TestError::Retryable(_)));
550 }
551
552 #[tokio::test]
553 async fn test_timeout_path() {
554 let config = RetryConfig {
555 max_retries: 2,
556 initial_delay_ms: 10,
557 max_delay_ms: 50,
558 backoff_factor: 2.0,
559 jitter_ms: 0,
560 operation_timeout_ms: Some(50),
561 immediate_first: false,
562 max_elapsed_ms: None,
563 };
564 let manager = RetryManager::new(config).unwrap();
565
566 let result = manager
567 .execute_with_retry(
568 "test_timeout",
569 || async {
570 tokio::time::sleep(Duration::from_millis(100)).await;
571 Ok::<i32, TestError>(42)
572 },
573 should_retry_test_error,
574 create_test_error,
575 )
576 .await;
577
578 assert!(result.is_err());
579 assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
580 }
581
582 #[tokio::test]
583 async fn test_max_elapsed_time_budget() {
584 let config = RetryConfig {
585 max_retries: 10,
586 initial_delay_ms: 50,
587 max_delay_ms: 100,
588 backoff_factor: 2.0,
589 jitter_ms: 0,
590 operation_timeout_ms: None,
591 immediate_first: false,
592 max_elapsed_ms: Some(200),
593 };
594 let manager = RetryManager::new(config).unwrap();
595
596 let start = tokio::time::Instant::now();
597 let result = manager
598 .execute_with_retry(
599 "test_budget",
600 || async { Err::<i32, TestError>(TestError::Retryable("test".to_string())) },
601 should_retry_test_error,
602 create_test_error,
603 )
604 .await;
605
606 let elapsed = start.elapsed();
607 assert!(result.is_err());
608 assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
609 assert!(elapsed.as_millis() >= 150);
610 assert!(elapsed.as_millis() < 1000);
611 }
612
613 #[test]
614 fn test_http_retry_manager_config() {
615 let manager = create_http_retry_manager::<TestError>().unwrap();
616 assert_eq!(manager.config.max_retries, 3);
617 assert!(!manager.config.immediate_first);
618 assert_eq!(manager.config.max_elapsed_ms, Some(180_000));
619 }
620
621 #[test]
622 fn test_websocket_retry_manager_config() {
623 let manager = create_websocket_retry_manager::<TestError>().unwrap();
624 assert_eq!(manager.config.max_retries, 5);
625 assert!(manager.config.immediate_first);
626 assert_eq!(manager.config.max_elapsed_ms, Some(120_000));
627 }
628
629 #[tokio::test]
630 async fn test_timeout_respects_retry_predicate() {
631 let config = RetryConfig {
632 max_retries: 3,
633 initial_delay_ms: 10,
634 max_delay_ms: 50,
635 backoff_factor: 2.0,
636 jitter_ms: 0,
637 operation_timeout_ms: Some(50),
638 immediate_first: false,
639 max_elapsed_ms: None,
640 };
641 let manager = RetryManager::new(config).unwrap();
642
643 let should_not_retry_timeouts = |error: &TestError| !matches!(error, TestError::Timeout(_));
645
646 let result = manager
647 .execute_with_retry(
648 "test_timeout_non_retryable",
649 || async {
650 tokio::time::sleep(Duration::from_millis(100)).await;
651 Ok::<i32, TestError>(42)
652 },
653 should_not_retry_timeouts,
654 create_test_error,
655 )
656 .await;
657
658 assert!(result.is_err());
660 assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
661 }
662
663 #[tokio::test]
664 async fn test_timeout_retries_when_predicate_allows() {
665 let config = RetryConfig {
666 max_retries: 2,
667 initial_delay_ms: 10,
668 max_delay_ms: 50,
669 backoff_factor: 2.0,
670 jitter_ms: 0,
671 operation_timeout_ms: Some(50),
672 immediate_first: false,
673 max_elapsed_ms: None,
674 };
675 let manager = RetryManager::new(config).unwrap();
676
677 let should_retry_timeouts = |error: &TestError| matches!(error, TestError::Timeout(_));
679
680 let start = tokio::time::Instant::now();
681 let result = manager
682 .execute_with_retry(
683 "test_timeout_retryable",
684 || async {
685 tokio::time::sleep(Duration::from_millis(100)).await;
686 Ok::<i32, TestError>(42)
687 },
688 should_retry_timeouts,
689 create_test_error,
690 )
691 .await;
692
693 let elapsed = start.elapsed();
694
695 assert!(result.is_err());
697 assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
698 assert!(elapsed.as_millis() > 80); }
701
702 #[tokio::test]
703 async fn test_successful_retry_after_failures() {
704 let config = RetryConfig {
705 max_retries: 3,
706 initial_delay_ms: 10,
707 max_delay_ms: 50,
708 backoff_factor: 2.0,
709 jitter_ms: 0,
710 operation_timeout_ms: None,
711 immediate_first: false,
712 max_elapsed_ms: None,
713 };
714 let manager = RetryManager::new(config).unwrap();
715
716 let attempt_counter = Arc::new(AtomicU32::new(0));
717 let counter_clone = attempt_counter.clone();
718
719 let result = manager
720 .execute_with_retry(
721 "test_eventual_success",
722 move || {
723 let counter = counter_clone.clone();
724 async move {
725 let attempts = counter.fetch_add(1, Ordering::SeqCst);
726 if attempts < 2 {
727 Err(TestError::Retryable("temporary failure".to_string()))
728 } else {
729 Ok(42)
730 }
731 }
732 },
733 should_retry_test_error,
734 create_test_error,
735 )
736 .await;
737
738 assert_eq!(result.unwrap(), 42);
739 assert_eq!(attempt_counter.load(Ordering::SeqCst), 3);
740 }
741
742 #[tokio::test]
743 #[ignore = "Non-deterministic timing test - TODO: Convert to use deterministic tokio time"]
744 async fn test_immediate_first_retry() {
745 let config = RetryConfig {
746 max_retries: 2,
747 initial_delay_ms: 100,
748 max_delay_ms: 200,
749 backoff_factor: 2.0,
750 jitter_ms: 0,
751 operation_timeout_ms: None,
752 immediate_first: true,
753 max_elapsed_ms: None,
754 };
755 let manager = RetryManager::new(config).unwrap();
756
757 let attempt_times = Arc::new(std::sync::Mutex::new(Vec::new()));
758 let times_clone = attempt_times.clone();
759 let start = tokio::time::Instant::now();
760
761 let _result = manager
762 .execute_with_retry(
763 "test_immediate",
764 move || {
765 let times = times_clone.clone();
766 async move {
767 times.lock().unwrap().push(start.elapsed());
768 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
769 }
770 },
771 should_retry_test_error,
772 create_test_error,
773 )
774 .await;
775
776 let times = attempt_times.lock().unwrap();
777 assert_eq!(times.len(), 3); assert!(times[1].as_millis() < 10);
781 assert!(times[2].as_millis() >= 100);
783 }
784
785 #[tokio::test]
786 async fn test_operation_without_timeout() {
787 let config = RetryConfig {
788 max_retries: 2,
789 initial_delay_ms: 10,
790 max_delay_ms: 50,
791 backoff_factor: 2.0,
792 jitter_ms: 0,
793 operation_timeout_ms: None, immediate_first: false,
795 max_elapsed_ms: None,
796 };
797 let manager = RetryManager::new(config).unwrap();
798
799 let start = tokio::time::Instant::now();
800 let result = manager
801 .execute_with_retry(
802 "test_no_timeout",
803 || async {
804 tokio::time::sleep(Duration::from_millis(50)).await;
805 Ok::<i32, TestError>(42)
806 },
807 should_retry_test_error,
808 create_test_error,
809 )
810 .await;
811
812 let elapsed = start.elapsed();
813 assert_eq!(result.unwrap(), 42);
814 assert!(elapsed.as_millis() >= 30);
816 assert!(elapsed.as_millis() < 200);
817 }
818
819 #[tokio::test]
820 async fn test_zero_retries() {
821 let config = RetryConfig {
822 max_retries: 0,
823 initial_delay_ms: 10,
824 max_delay_ms: 50,
825 backoff_factor: 2.0,
826 jitter_ms: 0,
827 operation_timeout_ms: None,
828 immediate_first: false,
829 max_elapsed_ms: None,
830 };
831 let manager = RetryManager::new(config).unwrap();
832
833 let attempt_counter = Arc::new(AtomicU32::new(0));
834 let counter_clone = attempt_counter.clone();
835
836 let result = manager
837 .execute_with_retry(
838 "test_no_retries",
839 move || {
840 let counter = counter_clone.clone();
841 async move {
842 counter.fetch_add(1, Ordering::SeqCst);
843 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
844 }
845 },
846 should_retry_test_error,
847 create_test_error,
848 )
849 .await;
850
851 assert!(result.is_err());
852 assert_eq!(attempt_counter.load(Ordering::SeqCst), 1);
854 }
855
856 #[tokio::test]
857 #[ignore = "Non-deterministic timing test - TODO: Convert to use deterministic tokio time"]
858 async fn test_jitter_applied() {
859 let config = RetryConfig {
860 max_retries: 2,
861 initial_delay_ms: 50,
862 max_delay_ms: 100,
863 backoff_factor: 2.0,
864 jitter_ms: 50, operation_timeout_ms: None,
866 immediate_first: false,
867 max_elapsed_ms: None,
868 };
869 let manager = RetryManager::new(config).unwrap();
870
871 let delays = Arc::new(std::sync::Mutex::new(Vec::new()));
872 let delays_clone = delays.clone();
873 let last_time = Arc::new(std::sync::Mutex::new(tokio::time::Instant::now()));
874 let last_time_clone = last_time.clone();
875
876 let _result = manager
877 .execute_with_retry(
878 "test_jitter",
879 move || {
880 let delays = delays_clone.clone();
881 let last_time = last_time_clone.clone();
882 async move {
883 let now = tokio::time::Instant::now();
884 let delay = {
885 let mut last = last_time.lock().unwrap();
886 let d = now.duration_since(*last);
887 *last = now;
888 d
889 };
890 delays.lock().unwrap().push(delay);
891 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
892 }
893 },
894 should_retry_test_error,
895 create_test_error,
896 )
897 .await;
898
899 let delays = delays.lock().unwrap();
900 for delay in delays.iter().skip(1) {
902 assert!(delay.as_millis() >= 50);
904 assert!(delay.as_millis() <= 150);
906 }
907 }
908
909 #[tokio::test]
910 async fn test_max_elapsed_stops_early() {
911 let config = RetryConfig {
912 max_retries: 100, initial_delay_ms: 50,
914 max_delay_ms: 100,
915 backoff_factor: 1.5,
916 jitter_ms: 0,
917 operation_timeout_ms: None,
918 immediate_first: false,
919 max_elapsed_ms: Some(150), };
921 let manager = RetryManager::new(config).unwrap();
922
923 let attempt_counter = Arc::new(AtomicU32::new(0));
924 let counter_clone = attempt_counter.clone();
925
926 let start = tokio::time::Instant::now();
927 let result = manager
928 .execute_with_retry(
929 "test_elapsed_limit",
930 move || {
931 let counter = counter_clone.clone();
932 async move {
933 counter.fetch_add(1, Ordering::SeqCst);
934 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
935 }
936 },
937 should_retry_test_error,
938 create_test_error,
939 )
940 .await;
941
942 let elapsed = start.elapsed();
943 assert!(result.is_err());
944 assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
945
946 let attempts = attempt_counter.load(Ordering::SeqCst);
948 assert!(attempts < 10); assert!(elapsed.as_millis() >= 100);
950 }
951
952 #[tokio::test]
953 async fn test_mixed_errors_retry_behavior() {
954 let config = RetryConfig {
955 max_retries: 5,
956 initial_delay_ms: 10,
957 max_delay_ms: 50,
958 backoff_factor: 2.0,
959 jitter_ms: 0,
960 operation_timeout_ms: None,
961 immediate_first: false,
962 max_elapsed_ms: None,
963 };
964 let manager = RetryManager::new(config).unwrap();
965
966 let attempt_counter = Arc::new(AtomicU32::new(0));
967 let counter_clone = attempt_counter.clone();
968
969 let result = manager
970 .execute_with_retry(
971 "test_mixed_errors",
972 move || {
973 let counter = counter_clone.clone();
974 async move {
975 let attempts = counter.fetch_add(1, Ordering::SeqCst);
976 match attempts {
977 0 => Err(TestError::Retryable("retry 1".to_string())),
978 1 => Err(TestError::Retryable("retry 2".to_string())),
979 2 => Err(TestError::NonRetryable("stop here".to_string())),
980 _ => Ok(42),
981 }
982 }
983 },
984 should_retry_test_error,
985 create_test_error,
986 )
987 .await;
988
989 assert!(result.is_err());
990 assert!(matches!(result.unwrap_err(), TestError::NonRetryable(_)));
991 assert_eq!(attempt_counter.load(Ordering::SeqCst), 3);
993 }
994
995 #[tokio::test]
996 async fn test_cancellation_during_retry_delay() {
997 use tokio_util::sync::CancellationToken;
998
999 let config = RetryConfig {
1000 max_retries: 10,
1001 initial_delay_ms: 500, max_delay_ms: 1000,
1003 backoff_factor: 2.0,
1004 jitter_ms: 0,
1005 operation_timeout_ms: None,
1006 immediate_first: false,
1007 max_elapsed_ms: None,
1008 };
1009 let manager = RetryManager::new(config).unwrap();
1010
1011 let token = CancellationToken::new();
1012 let token_clone = token.clone();
1013
1014 tokio::spawn(async move {
1016 tokio::time::sleep(Duration::from_millis(100)).await;
1017 token_clone.cancel();
1018 });
1019
1020 let attempt_counter = Arc::new(AtomicU32::new(0));
1021 let counter_clone = attempt_counter.clone();
1022
1023 let start = tokio::time::Instant::now();
1024 let result = manager
1025 .execute_with_retry_with_cancel(
1026 "test_cancellation",
1027 move || {
1028 let counter = counter_clone.clone();
1029 async move {
1030 counter.fetch_add(1, Ordering::SeqCst);
1031 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1032 }
1033 },
1034 should_retry_test_error,
1035 create_test_error,
1036 &token,
1037 )
1038 .await;
1039
1040 let elapsed = start.elapsed();
1041
1042 assert!(result.is_err());
1044 let error_msg = format!("{}", result.unwrap_err());
1045 assert!(error_msg.contains("canceled"));
1046
1047 assert!(elapsed.as_millis() < 600);
1049
1050 let attempts = attempt_counter.load(Ordering::SeqCst);
1052 assert!(attempts >= 1);
1053 }
1054
1055 #[tokio::test]
1056 async fn test_cancellation_during_operation_execution() {
1057 use tokio_util::sync::CancellationToken;
1058
1059 let config = RetryConfig {
1060 max_retries: 5,
1061 initial_delay_ms: 50,
1062 max_delay_ms: 100,
1063 backoff_factor: 2.0,
1064 jitter_ms: 0,
1065 operation_timeout_ms: None,
1066 immediate_first: false,
1067 max_elapsed_ms: None,
1068 };
1069 let manager = RetryManager::new(config).unwrap();
1070
1071 let token = CancellationToken::new();
1072 let token_clone = token.clone();
1073
1074 tokio::spawn(async move {
1076 tokio::time::sleep(Duration::from_millis(50)).await;
1077 token_clone.cancel();
1078 });
1079
1080 let start = tokio::time::Instant::now();
1081 let result = manager
1082 .execute_with_retry_with_cancel(
1083 "test_cancellation_during_op",
1084 || async {
1085 tokio::time::sleep(Duration::from_millis(200)).await;
1087 Ok::<i32, TestError>(42)
1088 },
1089 should_retry_test_error,
1090 create_test_error,
1091 &token,
1092 )
1093 .await;
1094
1095 let elapsed = start.elapsed();
1096
1097 assert!(result.is_err());
1099 let error_msg = format!("{}", result.unwrap_err());
1100 assert!(error_msg.contains("canceled"));
1101
1102 assert!(elapsed.as_millis() < 250);
1104 }
1105
1106 #[tokio::test]
1107 async fn test_cancellation_error_message() {
1108 use tokio_util::sync::CancellationToken;
1109
1110 let config = RetryConfig::default();
1111 let manager = RetryManager::new(config).unwrap();
1112
1113 let token = CancellationToken::new();
1114 token.cancel(); let result = manager
1117 .execute_with_retry_with_cancel(
1118 "test_operation",
1119 || async { Ok::<i32, TestError>(42) },
1120 should_retry_test_error,
1121 create_test_error,
1122 &token,
1123 )
1124 .await;
1125
1126 assert!(result.is_err());
1127 let error_msg = format!("{}", result.unwrap_err());
1128 assert!(error_msg.contains("canceled"));
1129 }
1130}
1131
1132#[cfg(test)]
1133mod proptest_tests {
1134 use std::sync::{
1135 Arc,
1136 atomic::{AtomicU32, Ordering},
1137 };
1138
1139 use proptest::prelude::*;
1140
1141 use super::{test_utils::*, *};
1142
1143 proptest! {
1144 #[test]
1145 fn test_retry_config_valid_ranges(
1146 max_retries in 0u32..100,
1147 initial_delay_ms in 1u64..10_000,
1148 max_delay_ms in 1u64..60_000,
1149 backoff_factor in 1.0f64..10.0,
1150 jitter_ms in 0u64..1_000,
1151 operation_timeout_ms in prop::option::of(1u64..120_000),
1152 immediate_first in any::<bool>(),
1153 max_elapsed_ms in prop::option::of(1u64..300_000)
1154 ) {
1155 let max_delay_ms = max_delay_ms.max(initial_delay_ms);
1157
1158 let config = RetryConfig {
1159 max_retries,
1160 initial_delay_ms,
1161 max_delay_ms,
1162 backoff_factor,
1163 jitter_ms,
1164 operation_timeout_ms,
1165 immediate_first,
1166 max_elapsed_ms,
1167 };
1168
1169 let manager = RetryManager::<std::io::Error>::new(config);
1171 prop_assert!(manager.is_ok());
1172 }
1173
1174 #[test]
1175 fn test_retry_attempts_bounded(
1176 max_retries in 0u32..5,
1177 initial_delay_ms in 1u64..10,
1178 backoff_factor in 1.0f64..2.0,
1179 ) {
1180 let rt = tokio::runtime::Builder::new_current_thread()
1181 .enable_time()
1182 .build()
1183 .unwrap();
1184
1185 let config = RetryConfig {
1186 max_retries,
1187 initial_delay_ms,
1188 max_delay_ms: initial_delay_ms * 2,
1189 backoff_factor,
1190 jitter_ms: 0,
1191 operation_timeout_ms: None,
1192 immediate_first: false,
1193 max_elapsed_ms: None,
1194 };
1195
1196 let manager = RetryManager::new(config).unwrap();
1197 let attempt_counter = Arc::new(AtomicU32::new(0));
1198 let counter_clone = attempt_counter.clone();
1199
1200 let _result = rt.block_on(manager.execute_with_retry(
1201 "prop_test",
1202 move || {
1203 let counter = counter_clone.clone();
1204 async move {
1205 counter.fetch_add(1, Ordering::SeqCst);
1206 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1207 }
1208 },
1209 |e: &TestError| matches!(e, TestError::Retryable(_)),
1210 TestError::Timeout,
1211 ));
1212
1213 let attempts = attempt_counter.load(Ordering::SeqCst);
1214 prop_assert_eq!(attempts, max_retries + 1);
1216 }
1217
1218 #[test]
1219 fn test_timeout_always_respected(
1220 timeout_ms in 10u64..50,
1221 operation_delay_ms in 60u64..100,
1222 ) {
1223 let rt = tokio::runtime::Builder::new_current_thread()
1224 .enable_time()
1225 .start_paused(true)
1226 .build()
1227 .unwrap();
1228
1229 let config = RetryConfig {
1230 max_retries: 0, initial_delay_ms: 10,
1232 max_delay_ms: 100,
1233 backoff_factor: 2.0,
1234 jitter_ms: 0,
1235 operation_timeout_ms: Some(timeout_ms),
1236 immediate_first: false,
1237 max_elapsed_ms: None,
1238 };
1239
1240 let manager = RetryManager::new(config).unwrap();
1241
1242 let result = rt.block_on(async {
1243 let operation_future = manager.execute_with_retry(
1244 "timeout_test",
1245 move || async move {
1246 tokio::time::sleep(Duration::from_millis(operation_delay_ms)).await;
1247 Ok::<i32, TestError>(42)
1248 },
1249 |_: &TestError| true,
1250 TestError::Timeout,
1251 );
1252
1253 tokio::time::advance(Duration::from_millis(timeout_ms + 10)).await;
1255 operation_future.await
1256 });
1257
1258 prop_assert!(result.is_err());
1260 prop_assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
1261 }
1262
1263 #[test]
1264 fn test_max_elapsed_always_respected(
1265 max_elapsed_ms in 20u64..50,
1266 delay_per_retry in 15u64..30,
1267 max_retries in 10u32..20,
1268 ) {
1269 let rt = tokio::runtime::Builder::new_current_thread()
1270 .enable_time()
1271 .start_paused(true)
1272 .build()
1273 .unwrap();
1274
1275 let config = RetryConfig {
1277 max_retries,
1278 initial_delay_ms: delay_per_retry,
1279 max_delay_ms: delay_per_retry * 2,
1280 backoff_factor: 1.0, jitter_ms: 0,
1282 operation_timeout_ms: None,
1283 immediate_first: false,
1284 max_elapsed_ms: Some(max_elapsed_ms),
1285 };
1286
1287 let manager = RetryManager::new(config).unwrap();
1288 let attempt_counter = Arc::new(AtomicU32::new(0));
1289 let counter_clone = attempt_counter.clone();
1290
1291 let result = rt.block_on(async {
1292 let operation_future = manager.execute_with_retry(
1293 "elapsed_test",
1294 move || {
1295 let counter = counter_clone.clone();
1296 async move {
1297 counter.fetch_add(1, Ordering::SeqCst);
1298 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1299 }
1300 },
1301 |e: &TestError| matches!(e, TestError::Retryable(_)),
1302 TestError::Timeout,
1303 );
1304
1305 tokio::time::advance(Duration::from_millis(max_elapsed_ms + delay_per_retry)).await;
1307 operation_future.await
1308 });
1309
1310 let attempts = attempt_counter.load(Ordering::SeqCst);
1311
1312 prop_assert!(result.is_err());
1314 prop_assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
1315
1316 prop_assert!(attempts <= max_retries + 1);
1318 }
1319
1320 #[test]
1321 #[ignore = "Non-deterministic timing test - TODO: Convert to use deterministic tokio time"]
1322 fn test_jitter_bounds(
1323 jitter_ms in 0u64..20,
1324 base_delay_ms in 10u64..30,
1325 ) {
1326 let rt = tokio::runtime::Builder::new_current_thread()
1327 .enable_time()
1328 .build()
1329 .unwrap();
1330
1331 let config = RetryConfig {
1332 max_retries: 2,
1333 initial_delay_ms: base_delay_ms,
1334 max_delay_ms: base_delay_ms * 2,
1335 backoff_factor: 1.0, jitter_ms,
1337 operation_timeout_ms: None,
1338 immediate_first: false,
1339 max_elapsed_ms: None,
1340 };
1341
1342 let manager = RetryManager::new(config).unwrap();
1343 let attempt_times = Arc::new(std::sync::Mutex::new(Vec::new()));
1344 let times_clone = attempt_times.clone();
1345 let start_time = std::time::Instant::now();
1346
1347 let _result = rt.block_on(manager.execute_with_retry(
1348 "jitter_test",
1349 move || {
1350 let times = times_clone.clone();
1351 async move {
1352 times.lock().unwrap().push(start_time.elapsed());
1353 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1354 }
1355 },
1356 |e: &TestError| matches!(e, TestError::Retryable(_)),
1357 TestError::Timeout,
1358 ));
1359
1360 let times = attempt_times.lock().unwrap();
1361
1362 prop_assert!(times.len() >= 2);
1364
1365 prop_assert!(times[0].as_millis() < 5);
1367
1368 for i in 1..times.len() {
1370 let delay_from_previous = if i == 1 {
1371 times[i] - times[0]
1372 } else {
1373 times[i] - times[i - 1]
1374 };
1375
1376 prop_assert!(
1378 delay_from_previous.as_millis() >= base_delay_ms as u128,
1379 "Retry {} delay {}ms is less than base {}ms",
1380 i, delay_from_previous.as_millis(), base_delay_ms
1381 );
1382
1383 prop_assert!(
1385 delay_from_previous.as_millis() <= (base_delay_ms + jitter_ms + 5) as u128,
1386 "Retry {} delay {}ms exceeds base {} + jitter {}",
1387 i, delay_from_previous.as_millis(), base_delay_ms, jitter_ms
1388 );
1389 }
1390 }
1391
1392 #[test]
1393 #[ignore = "Non-deterministic timing test - TODO: Convert to use deterministic tokio time"]
1394 fn test_immediate_first_property(
1395 immediate_first in any::<bool>(),
1396 initial_delay_ms in 10u64..30,
1397 ) {
1398 let rt = tokio::runtime::Builder::new_current_thread()
1399 .enable_time()
1400 .build()
1401 .unwrap();
1402
1403 let config = RetryConfig {
1404 max_retries: 2,
1405 initial_delay_ms,
1406 max_delay_ms: initial_delay_ms * 2,
1407 backoff_factor: 2.0,
1408 jitter_ms: 0,
1409 operation_timeout_ms: None,
1410 immediate_first,
1411 max_elapsed_ms: None,
1412 };
1413
1414 let manager = RetryManager::new(config).unwrap();
1415 let attempt_times = Arc::new(std::sync::Mutex::new(Vec::new()));
1416 let times_clone = attempt_times.clone();
1417
1418 let start = std::time::Instant::now();
1419 let _result = rt.block_on(manager.execute_with_retry(
1420 "immediate_test",
1421 move || {
1422 let times = times_clone.clone();
1423 async move {
1424 let elapsed = start.elapsed();
1425 times.lock().unwrap().push(elapsed);
1426 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1427 }
1428 },
1429 |e: &TestError| matches!(e, TestError::Retryable(_)),
1430 TestError::Timeout,
1431 ));
1432
1433 let times = attempt_times.lock().unwrap();
1434 prop_assert!(times.len() >= 2);
1435
1436 if immediate_first {
1437 prop_assert!(times[1].as_millis() < 20,
1439 "With immediate_first=true, first retry took {}ms",
1440 times[1].as_millis());
1441 } else {
1442 prop_assert!(times[1].as_millis() >= (initial_delay_ms - 10) as u128,
1444 "With immediate_first=false, first retry was too fast: {}ms",
1445 times[1].as_millis());
1446 }
1447 }
1448
1449 #[test]
1450 fn test_non_retryable_stops_immediately(
1451 attempt_before_non_retryable in 0usize..3,
1452 max_retries in 3u32..5,
1453 ) {
1454 let rt = tokio::runtime::Builder::new_current_thread()
1455 .enable_time()
1456 .build()
1457 .unwrap();
1458
1459 let config = RetryConfig {
1460 max_retries,
1461 initial_delay_ms: 10,
1462 max_delay_ms: 100,
1463 backoff_factor: 2.0,
1464 jitter_ms: 0,
1465 operation_timeout_ms: None,
1466 immediate_first: false,
1467 max_elapsed_ms: None,
1468 };
1469
1470 let manager = RetryManager::new(config).unwrap();
1471 let attempt_counter = Arc::new(AtomicU32::new(0));
1472 let counter_clone = attempt_counter.clone();
1473
1474 let result: Result<i32, TestError> = rt.block_on(manager.execute_with_retry(
1475 "non_retryable_test",
1476 move || {
1477 let counter = counter_clone.clone();
1478 async move {
1479 let attempts = counter.fetch_add(1, Ordering::SeqCst) as usize;
1480 if attempts == attempt_before_non_retryable {
1481 Err(TestError::NonRetryable("stop".to_string()))
1482 } else {
1483 Err(TestError::Retryable("retry".to_string()))
1484 }
1485 }
1486 },
1487 |e: &TestError| matches!(e, TestError::Retryable(_)),
1488 TestError::Timeout,
1489 ));
1490
1491 let attempts = attempt_counter.load(Ordering::SeqCst) as usize;
1492
1493 prop_assert!(result.is_err());
1494 prop_assert!(matches!(result.unwrap_err(), TestError::NonRetryable(_)));
1495 prop_assert_eq!(attempts, attempt_before_non_retryable + 1);
1497 }
1498
1499 #[test]
1500 fn test_cancellation_stops_immediately(
1501 cancel_after_ms in 10u64..100,
1502 initial_delay_ms in 200u64..500,
1503 ) {
1504 use tokio_util::sync::CancellationToken;
1505
1506 let rt = tokio::runtime::Builder::new_current_thread()
1507 .enable_time()
1508 .start_paused(true)
1509 .build()
1510 .unwrap();
1511
1512 let config = RetryConfig {
1513 max_retries: 10,
1514 initial_delay_ms,
1515 max_delay_ms: initial_delay_ms * 2,
1516 backoff_factor: 2.0,
1517 jitter_ms: 0,
1518 operation_timeout_ms: None,
1519 immediate_first: false,
1520 max_elapsed_ms: None,
1521 };
1522
1523 let manager = RetryManager::new(config).unwrap();
1524 let token = CancellationToken::new();
1525 let token_clone = token.clone();
1526
1527 let result: Result<i32, TestError> = rt.block_on(async {
1528 tokio::spawn(async move {
1530 tokio::time::sleep(Duration::from_millis(cancel_after_ms)).await;
1531 token_clone.cancel();
1532 });
1533
1534 let operation_future = manager.execute_with_retry_with_cancel(
1535 "cancellation_test",
1536 || async {
1537 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1538 },
1539 |e: &TestError| matches!(e, TestError::Retryable(_)),
1540 create_test_error,
1541 &token,
1542 );
1543
1544 tokio::time::advance(Duration::from_millis(cancel_after_ms + 10)).await;
1546 operation_future.await
1547 });
1548
1549 prop_assert!(result.is_err());
1551 let error_msg = format!("{}", result.unwrap_err());
1552 prop_assert!(error_msg.contains("canceled"));
1553 }
1554
1555 #[test]
1556 fn test_budget_clamp_prevents_overshoot(
1557 max_elapsed_ms in 10u64..30,
1558 delay_per_retry in 20u64..50,
1559 ) {
1560 let rt = tokio::runtime::Builder::new_current_thread()
1561 .enable_time()
1562 .start_paused(true)
1563 .build()
1564 .unwrap();
1565
1566 let config = RetryConfig {
1568 max_retries: 5,
1569 initial_delay_ms: delay_per_retry,
1570 max_delay_ms: delay_per_retry * 2,
1571 backoff_factor: 1.0,
1572 jitter_ms: 0,
1573 operation_timeout_ms: None,
1574 immediate_first: false,
1575 max_elapsed_ms: Some(max_elapsed_ms),
1576 };
1577
1578 let manager = RetryManager::new(config).unwrap();
1579
1580 let _result = rt.block_on(async {
1581 let operation_future = manager.execute_with_retry(
1582 "budget_clamp_test",
1583 || async {
1584 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1586 },
1587 |e: &TestError| matches!(e, TestError::Retryable(_)),
1588 create_test_error,
1589 );
1590
1591 tokio::time::advance(Duration::from_millis(max_elapsed_ms + delay_per_retry)).await;
1593 operation_future.await
1594 });
1595
1596 }
1599
1600 #[test]
1601 fn test_success_on_kth_attempt(
1602 k in 1usize..5,
1603 initial_delay_ms in 5u64..20,
1604 ) {
1605 let rt = tokio::runtime::Builder::new_current_thread()
1606 .enable_time()
1607 .start_paused(true)
1608 .build()
1609 .unwrap();
1610
1611 let config = RetryConfig {
1612 max_retries: 10, initial_delay_ms,
1614 max_delay_ms: initial_delay_ms * 4,
1615 backoff_factor: 2.0,
1616 jitter_ms: 0,
1617 operation_timeout_ms: None,
1618 immediate_first: false,
1619 max_elapsed_ms: None,
1620 };
1621
1622 let manager = RetryManager::new(config).unwrap();
1623 let attempt_counter = Arc::new(AtomicU32::new(0));
1624 let counter_clone = attempt_counter.clone();
1625 let target_k = k;
1626
1627 let (result, _elapsed) = rt.block_on(async {
1628 let start = tokio::time::Instant::now();
1629
1630 let operation_future = manager.execute_with_retry(
1631 "kth_attempt_test",
1632 move || {
1633 let counter = counter_clone.clone();
1634 async move {
1635 let attempt = counter.fetch_add(1, Ordering::SeqCst) as usize;
1636 if attempt + 1 == target_k {
1637 Ok(42)
1638 } else {
1639 Err(TestError::Retryable("retry".to_string()))
1640 }
1641 }
1642 },
1643 |e: &TestError| matches!(e, TestError::Retryable(_)),
1644 create_test_error,
1645 );
1646
1647 for _ in 0..k {
1649 tokio::time::advance(Duration::from_millis(initial_delay_ms * 4)).await;
1650 }
1651
1652 let result = operation_future.await;
1653 let elapsed = start.elapsed();
1654
1655 (result, elapsed)
1656 });
1657
1658 let attempts = attempt_counter.load(Ordering::SeqCst) as usize;
1659
1660 prop_assert!(result.is_ok());
1662 prop_assert_eq!(result.unwrap(), 42);
1663 prop_assert_eq!(attempts, k);
1664 }
1665 }
1666}