1use std::{future::Future, marker::PhantomData, time::Duration};
19
20use tokio_util::sync::CancellationToken;
21
22use crate::backoff::ExponentialBackoff;
23
24#[derive(Debug, Clone)]
26pub struct RetryConfig {
27 pub max_retries: u32,
29 pub initial_delay_ms: u64,
31 pub max_delay_ms: u64,
33 pub backoff_factor: f64,
35 pub jitter_ms: u64,
37 pub operation_timeout_ms: Option<u64>,
40 pub immediate_first: bool,
43 pub max_elapsed_ms: Option<u64>,
46}
47
48impl Default for RetryConfig {
49 fn default() -> Self {
50 Self {
51 max_retries: 3,
52 initial_delay_ms: 1_000,
53 max_delay_ms: 10_000,
54 backoff_factor: 2.0,
55 jitter_ms: 100,
56 operation_timeout_ms: Some(30_000),
57 immediate_first: false,
58 max_elapsed_ms: None,
59 }
60 }
61}
62
63#[derive(Clone, Debug)]
67pub struct RetryManager<E> {
68 config: RetryConfig,
69 _phantom: PhantomData<E>,
70}
71
72impl<E> RetryManager<E>
73where
74 E: std::error::Error,
75{
76 pub const fn new(config: RetryConfig) -> anyhow::Result<Self> {
82 Ok(Self {
83 config,
84 _phantom: PhantomData,
85 })
86 }
87
88 pub async fn execute_with_retry_inner<F, Fut, T>(
102 &self,
103 operation_name: &str,
104 mut operation: F,
105 should_retry: impl Fn(&E) -> bool,
106 create_error: impl Fn(String) -> E,
107 cancel: Option<&CancellationToken>,
108 ) -> Result<T, E>
109 where
110 F: FnMut() -> Fut,
111 Fut: Future<Output = Result<T, E>>,
112 {
113 let mut backoff = ExponentialBackoff::new(
114 Duration::from_millis(self.config.initial_delay_ms),
115 Duration::from_millis(self.config.max_delay_ms),
116 self.config.backoff_factor,
117 self.config.jitter_ms,
118 self.config.immediate_first,
119 )
120 .map_err(|e| create_error(format!("Invalid configuration: {e}")))?;
121
122 let mut attempt = 0;
123 let start_time = tokio::time::Instant::now();
124
125 loop {
126 if let Some(token) = cancel
127 && token.is_cancelled()
128 {
129 tracing::debug!(
130 operation = %operation_name,
131 attempts = attempt,
132 "Operation canceled"
133 );
134 return Err(create_error("canceled".to_string()));
135 }
136
137 if let Some(max_elapsed_ms) = self.config.max_elapsed_ms {
138 let elapsed = start_time.elapsed();
139 if elapsed.as_millis() >= u128::from(max_elapsed_ms) {
140 let e = create_error("Budget exceeded".to_string());
141 tracing::trace!(
142 operation = %operation_name,
143 attempts = attempt + 1,
144 budget_ms = max_elapsed_ms,
145 "Retry budget exceeded"
146 );
147 return Err(e);
148 }
149 }
150
151 let result = match (self.config.operation_timeout_ms, cancel) {
152 (Some(timeout_ms), Some(token)) => {
153 tokio::select! {
154 result = tokio::time::timeout(Duration::from_millis(timeout_ms), operation()) => result,
155 () = token.cancelled() => {
156 tracing::debug!(
157 operation = %operation_name,
158 "Operation canceled during execution"
159 );
160 return Err(create_error("canceled".to_string()));
161 }
162 }
163 }
164 (Some(timeout_ms), None) => {
165 tokio::time::timeout(Duration::from_millis(timeout_ms), operation()).await
166 }
167 (None, Some(token)) => {
168 tokio::select! {
169 result = operation() => Ok(result),
170 () = token.cancelled() => {
171 tracing::debug!(
172 operation = %operation_name,
173 "Operation canceled during execution"
174 );
175 return Err(create_error("canceled".to_string()));
176 }
177 }
178 }
179 (None, None) => Ok(operation().await),
180 };
181
182 match result {
183 Ok(Ok(success)) => {
184 if attempt > 0 {
185 tracing::trace!(
186 operation = %operation_name,
187 attempts = attempt + 1,
188 "Retry succeeded"
189 );
190 }
191 return Ok(success);
192 }
193 Ok(Err(e)) => {
194 if !should_retry(&e) {
195 tracing::trace!(
196 operation = %operation_name,
197 error = %e,
198 "Non-retryable error"
199 );
200 return Err(e);
201 }
202
203 if attempt >= self.config.max_retries {
204 tracing::trace!(
205 operation = %operation_name,
206 attempts = attempt + 1,
207 error = %e,
208 "Retries exhausted"
209 );
210 return Err(e);
211 }
212
213 let mut delay = backoff.next_duration();
214
215 if let Some(max_elapsed_ms) = self.config.max_elapsed_ms {
216 let elapsed = start_time.elapsed();
217 let remaining =
218 Duration::from_millis(max_elapsed_ms).saturating_sub(elapsed);
219
220 if remaining.is_zero() {
221 let e = create_error("Budget exceeded".to_string());
222 tracing::trace!(
223 operation = %operation_name,
224 attempts = attempt + 1,
225 budget_ms = max_elapsed_ms,
226 "Retry budget exceeded"
227 );
228 return Err(e);
229 }
230
231 delay = delay.min(remaining);
232 }
233
234 tracing::trace!(
235 operation = %operation_name,
236 attempt = attempt + 1,
237 delay_ms = delay.as_millis() as u64,
238 error = %e,
239 "Retrying after failure"
240 );
241
242 if delay.is_zero() {
244 tokio::task::yield_now().await;
245 attempt += 1;
246 continue;
247 }
248
249 if let Some(token) = cancel {
250 tokio::select! {
251 () = tokio::time::sleep(delay) => {},
252 () = token.cancelled() => {
253 tracing::debug!(
254 operation = %operation_name,
255 attempt = attempt + 1,
256 "Operation canceled during retry delay"
257 );
258 return Err(create_error("canceled".to_string()));
259 }
260 }
261 } else {
262 tokio::time::sleep(delay).await;
263 }
264 attempt += 1;
265 }
266 Err(_) => {
267 let e = create_error(format!(
268 "Timed out after {}ms",
269 self.config.operation_timeout_ms.unwrap_or(0)
270 ));
271
272 if !should_retry(&e) {
273 tracing::trace!(
274 operation = %operation_name,
275 error = %e,
276 "Non-retryable timeout"
277 );
278 return Err(e);
279 }
280
281 if attempt >= self.config.max_retries {
282 tracing::trace!(
283 operation = %operation_name,
284 attempts = attempt + 1,
285 error = %e,
286 "Retries exhausted after timeout"
287 );
288 return Err(e);
289 }
290
291 let mut delay = backoff.next_duration();
292
293 if let Some(max_elapsed_ms) = self.config.max_elapsed_ms {
294 let elapsed = start_time.elapsed();
295 let remaining =
296 Duration::from_millis(max_elapsed_ms).saturating_sub(elapsed);
297
298 if remaining.is_zero() {
299 let e = create_error("Budget exceeded".to_string());
300 tracing::trace!(
301 operation = %operation_name,
302 attempts = attempt + 1,
303 budget_ms = max_elapsed_ms,
304 "Retry budget exceeded"
305 );
306 return Err(e);
307 }
308
309 delay = delay.min(remaining);
310 }
311
312 tracing::trace!(
313 operation = %operation_name,
314 attempt = attempt + 1,
315 delay_ms = delay.as_millis() as u64,
316 error = %e,
317 "Retrying after timeout"
318 );
319
320 if delay.is_zero() {
322 tokio::task::yield_now().await;
323 attempt += 1;
324 continue;
325 }
326
327 if let Some(token) = cancel {
328 tokio::select! {
329 () = tokio::time::sleep(delay) => {},
330 () = token.cancelled() => {
331 tracing::debug!(
332 operation = %operation_name,
333 attempt = attempt + 1,
334 "Operation canceled during retry delay"
335 );
336 return Err(create_error("canceled".to_string()));
337 }
338 }
339 } else {
340 tokio::time::sleep(delay).await;
341 }
342 attempt += 1;
343 }
344 }
345 }
346 }
347
348 pub async fn execute_with_retry<F, Fut, T>(
355 &self,
356 operation_name: &str,
357 operation: F,
358 should_retry: impl Fn(&E) -> bool,
359 create_error: impl Fn(String) -> E,
360 ) -> Result<T, E>
361 where
362 F: FnMut() -> Fut,
363 Fut: Future<Output = Result<T, E>>,
364 {
365 self.execute_with_retry_inner(operation_name, operation, should_retry, create_error, None)
366 .await
367 }
368
369 pub async fn execute_with_retry_with_cancel<F, Fut, T>(
376 &self,
377 operation_name: &str,
378 operation: F,
379 should_retry: impl Fn(&E) -> bool,
380 create_error: impl Fn(String) -> E,
381 cancellation_token: &CancellationToken,
382 ) -> Result<T, E>
383 where
384 F: FnMut() -> Fut,
385 Fut: Future<Output = Result<T, E>>,
386 {
387 self.execute_with_retry_inner(
388 operation_name,
389 operation,
390 should_retry,
391 create_error,
392 Some(cancellation_token),
393 )
394 .await
395 }
396}
397
398pub fn create_default_retry_manager<E>() -> anyhow::Result<RetryManager<E>>
404where
405 E: std::error::Error,
406{
407 RetryManager::new(RetryConfig::default())
408}
409
410pub const fn create_http_retry_manager<E>() -> anyhow::Result<RetryManager<E>>
416where
417 E: std::error::Error,
418{
419 let config = RetryConfig {
420 max_retries: 3,
421 initial_delay_ms: 1_000,
422 max_delay_ms: 10_000,
423 backoff_factor: 2.0,
424 jitter_ms: 1_000,
425 operation_timeout_ms: Some(60_000), immediate_first: false,
427 max_elapsed_ms: Some(180_000), };
429 RetryManager::new(config)
430}
431
432pub const fn create_websocket_retry_manager<E>() -> anyhow::Result<RetryManager<E>>
438where
439 E: std::error::Error,
440{
441 let config = RetryConfig {
442 max_retries: 5,
443 initial_delay_ms: 1_000,
444 max_delay_ms: 10_000,
445 backoff_factor: 2.0,
446 jitter_ms: 1_000,
447 operation_timeout_ms: Some(30_000), immediate_first: true,
449 max_elapsed_ms: Some(120_000), };
451 RetryManager::new(config)
452}
453
454#[cfg(test)]
459mod test_utils {
460 #[derive(Debug, thiserror::Error)]
461 pub enum TestError {
462 #[error("Retryable error: {0}")]
463 Retryable(String),
464 #[error("Non-retryable error: {0}")]
465 NonRetryable(String),
466 #[error("Timeout error: {0}")]
467 Timeout(String),
468 }
469
470 pub fn should_retry_test_error(error: &TestError) -> bool {
471 matches!(error, TestError::Retryable(_))
472 }
473
474 pub fn create_test_error(msg: String) -> TestError {
475 TestError::Timeout(msg)
476 }
477}
478
479#[cfg(test)]
480mod tests {
481 use std::sync::{
482 Arc,
483 atomic::{AtomicU32, Ordering},
484 };
485
486 use nautilus_core::MUTEX_POISONED;
487 use rstest::rstest;
488
489 use super::{test_utils::*, *};
490
491 const MAX_WAIT_ITERS: usize = 10_000;
492 const MAX_ADVANCE_ITERS: usize = 10_000;
493
494 pub(crate) async fn yield_until<F>(mut condition: F)
495 where
496 F: FnMut() -> bool,
497 {
498 for _ in 0..MAX_WAIT_ITERS {
499 if condition() {
500 return;
501 }
502 tokio::task::yield_now().await;
503 }
504
505 panic!("yield_until timed out waiting for condition");
506 }
507
508 pub(crate) async fn advance_until<F>(mut condition: F)
509 where
510 F: FnMut() -> bool,
511 {
512 for _ in 0..MAX_ADVANCE_ITERS {
513 if condition() {
514 return;
515 }
516 tokio::time::advance(Duration::from_millis(1)).await;
517 tokio::task::yield_now().await;
518 }
519
520 panic!("advance_until timed out waiting for condition");
521 }
522
523 #[rstest]
524 fn test_retry_config_default() {
525 let config = RetryConfig::default();
526 assert_eq!(config.max_retries, 3);
527 assert_eq!(config.initial_delay_ms, 1_000);
528 assert_eq!(config.max_delay_ms, 10_000);
529 assert_eq!(config.backoff_factor, 2.0);
530 assert_eq!(config.jitter_ms, 100);
531 assert_eq!(config.operation_timeout_ms, Some(30_000));
532 assert!(!config.immediate_first);
533 assert_eq!(config.max_elapsed_ms, None);
534 }
535
536 #[tokio::test]
537 async fn test_retry_manager_success_first_attempt() {
538 let manager = RetryManager::new(RetryConfig::default()).unwrap();
539
540 let result = manager
541 .execute_with_retry(
542 "test_operation",
543 || async { Ok::<i32, TestError>(42) },
544 should_retry_test_error,
545 create_test_error,
546 )
547 .await;
548
549 assert_eq!(result.unwrap(), 42);
550 }
551
552 #[tokio::test]
553 async fn test_retry_manager_non_retryable_error() {
554 let manager = RetryManager::new(RetryConfig::default()).unwrap();
555
556 let result = manager
557 .execute_with_retry(
558 "test_operation",
559 || async { Err::<i32, TestError>(TestError::NonRetryable("test".to_string())) },
560 should_retry_test_error,
561 create_test_error,
562 )
563 .await;
564
565 assert!(result.is_err());
566 assert!(matches!(result.unwrap_err(), TestError::NonRetryable(_)));
567 }
568
569 #[tokio::test]
570 async fn test_retry_manager_retryable_error_exhausted() {
571 let config = RetryConfig {
572 max_retries: 2,
573 initial_delay_ms: 10,
574 max_delay_ms: 50,
575 backoff_factor: 2.0,
576 jitter_ms: 0,
577 operation_timeout_ms: None,
578 immediate_first: false,
579 max_elapsed_ms: None,
580 };
581 let manager = RetryManager::new(config).unwrap();
582
583 let result = manager
584 .execute_with_retry(
585 "test_operation",
586 || async { Err::<i32, TestError>(TestError::Retryable("test".to_string())) },
587 should_retry_test_error,
588 create_test_error,
589 )
590 .await;
591
592 assert!(result.is_err());
593 assert!(matches!(result.unwrap_err(), TestError::Retryable(_)));
594 }
595
596 #[tokio::test]
597 async fn test_timeout_path() {
598 let config = RetryConfig {
599 max_retries: 2,
600 initial_delay_ms: 10,
601 max_delay_ms: 50,
602 backoff_factor: 2.0,
603 jitter_ms: 0,
604 operation_timeout_ms: Some(50),
605 immediate_first: false,
606 max_elapsed_ms: None,
607 };
608 let manager = RetryManager::new(config).unwrap();
609
610 let result = manager
611 .execute_with_retry(
612 "test_timeout",
613 || async {
614 tokio::time::sleep(Duration::from_millis(100)).await;
615 Ok::<i32, TestError>(42)
616 },
617 should_retry_test_error,
618 create_test_error,
619 )
620 .await;
621
622 assert!(result.is_err());
623 assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
624 }
625
626 #[tokio::test]
627 async fn test_max_elapsed_time_budget() {
628 let config = RetryConfig {
629 max_retries: 10,
630 initial_delay_ms: 50,
631 max_delay_ms: 100,
632 backoff_factor: 2.0,
633 jitter_ms: 0,
634 operation_timeout_ms: None,
635 immediate_first: false,
636 max_elapsed_ms: Some(200),
637 };
638 let manager = RetryManager::new(config).unwrap();
639
640 let start = tokio::time::Instant::now();
641 let result = manager
642 .execute_with_retry(
643 "test_budget",
644 || async { Err::<i32, TestError>(TestError::Retryable("test".to_string())) },
645 should_retry_test_error,
646 create_test_error,
647 )
648 .await;
649
650 let elapsed = start.elapsed();
651 assert!(result.is_err());
652 assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
653 assert!(elapsed.as_millis() >= 150);
654 assert!(elapsed.as_millis() < 1000);
655 }
656
657 #[rstest]
658 fn test_http_retry_manager_config() {
659 let manager = create_http_retry_manager::<TestError>().unwrap();
660 assert_eq!(manager.config.max_retries, 3);
661 assert!(!manager.config.immediate_first);
662 assert_eq!(manager.config.max_elapsed_ms, Some(180_000));
663 }
664
665 #[rstest]
666 fn test_websocket_retry_manager_config() {
667 let manager = create_websocket_retry_manager::<TestError>().unwrap();
668 assert_eq!(manager.config.max_retries, 5);
669 assert!(manager.config.immediate_first);
670 assert_eq!(manager.config.max_elapsed_ms, Some(120_000));
671 }
672
673 #[tokio::test]
674 async fn test_timeout_respects_retry_predicate() {
675 let config = RetryConfig {
676 max_retries: 3,
677 initial_delay_ms: 10,
678 max_delay_ms: 50,
679 backoff_factor: 2.0,
680 jitter_ms: 0,
681 operation_timeout_ms: Some(50),
682 immediate_first: false,
683 max_elapsed_ms: None,
684 };
685 let manager = RetryManager::new(config).unwrap();
686
687 let should_not_retry_timeouts = |error: &TestError| !matches!(error, TestError::Timeout(_));
689
690 let result = manager
691 .execute_with_retry(
692 "test_timeout_non_retryable",
693 || async {
694 tokio::time::sleep(Duration::from_millis(100)).await;
695 Ok::<i32, TestError>(42)
696 },
697 should_not_retry_timeouts,
698 create_test_error,
699 )
700 .await;
701
702 assert!(result.is_err());
704 assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
705 }
706
707 #[tokio::test]
708 async fn test_timeout_retries_when_predicate_allows() {
709 let config = RetryConfig {
710 max_retries: 2,
711 initial_delay_ms: 10,
712 max_delay_ms: 50,
713 backoff_factor: 2.0,
714 jitter_ms: 0,
715 operation_timeout_ms: Some(50),
716 immediate_first: false,
717 max_elapsed_ms: None,
718 };
719 let manager = RetryManager::new(config).unwrap();
720
721 let should_retry_timeouts = |error: &TestError| matches!(error, TestError::Timeout(_));
723
724 let start = tokio::time::Instant::now();
725 let result = manager
726 .execute_with_retry(
727 "test_timeout_retryable",
728 || async {
729 tokio::time::sleep(Duration::from_millis(100)).await;
730 Ok::<i32, TestError>(42)
731 },
732 should_retry_timeouts,
733 create_test_error,
734 )
735 .await;
736
737 let elapsed = start.elapsed();
738
739 assert!(result.is_err());
741 assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
742 assert!(elapsed.as_millis() > 80); }
745
746 #[tokio::test]
747 async fn test_successful_retry_after_failures() {
748 let config = RetryConfig {
749 max_retries: 3,
750 initial_delay_ms: 10,
751 max_delay_ms: 50,
752 backoff_factor: 2.0,
753 jitter_ms: 0,
754 operation_timeout_ms: None,
755 immediate_first: false,
756 max_elapsed_ms: None,
757 };
758 let manager = RetryManager::new(config).unwrap();
759
760 let attempt_counter = Arc::new(AtomicU32::new(0));
761 let counter_clone = attempt_counter.clone();
762
763 let result = manager
764 .execute_with_retry(
765 "test_eventual_success",
766 move || {
767 let counter = counter_clone.clone();
768 async move {
769 let attempts = counter.fetch_add(1, Ordering::SeqCst);
770 if attempts < 2 {
771 Err(TestError::Retryable("temporary failure".to_string()))
772 } else {
773 Ok(42)
774 }
775 }
776 },
777 should_retry_test_error,
778 create_test_error,
779 )
780 .await;
781
782 assert_eq!(result.unwrap(), 42);
783 assert_eq!(attempt_counter.load(Ordering::SeqCst), 3);
784 }
785
786 #[tokio::test(start_paused = true)]
787 async fn test_immediate_first_retry() {
788 let config = RetryConfig {
789 max_retries: 2,
790 initial_delay_ms: 100,
791 max_delay_ms: 200,
792 backoff_factor: 2.0,
793 jitter_ms: 0,
794 operation_timeout_ms: None,
795 immediate_first: true,
796 max_elapsed_ms: None,
797 };
798 let manager = RetryManager::new(config).unwrap();
799
800 let attempt_times = Arc::new(std::sync::Mutex::new(Vec::new()));
801 let times_clone = attempt_times.clone();
802 let start = tokio::time::Instant::now();
803
804 let handle = tokio::spawn({
805 let times_clone = times_clone.clone();
806 async move {
807 let _ = manager
808 .execute_with_retry(
809 "test_immediate",
810 move || {
811 let times = times_clone.clone();
812 async move {
813 times.lock().expect(MUTEX_POISONED).push(start.elapsed());
814 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
815 }
816 },
817 should_retry_test_error,
818 create_test_error,
819 )
820 .await;
821 }
822 });
823
824 yield_until(|| attempt_times.lock().expect(MUTEX_POISONED).len() >= 2).await;
826
827 tokio::time::advance(Duration::from_millis(100)).await;
829 tokio::task::yield_now().await;
830
831 yield_until(|| attempt_times.lock().expect(MUTEX_POISONED).len() >= 3).await;
833
834 handle.await.unwrap();
835
836 let times = attempt_times.lock().expect(MUTEX_POISONED);
837 assert_eq!(times.len(), 3); assert!(times[1] <= Duration::from_millis(1));
841 assert!(times[2] >= Duration::from_millis(100));
843 assert!(times[2] <= Duration::from_millis(110));
844 }
845
846 #[tokio::test]
847 async fn test_operation_without_timeout() {
848 let config = RetryConfig {
849 max_retries: 2,
850 initial_delay_ms: 10,
851 max_delay_ms: 50,
852 backoff_factor: 2.0,
853 jitter_ms: 0,
854 operation_timeout_ms: None, immediate_first: false,
856 max_elapsed_ms: None,
857 };
858 let manager = RetryManager::new(config).unwrap();
859
860 let start = tokio::time::Instant::now();
861 let result = manager
862 .execute_with_retry(
863 "test_no_timeout",
864 || async {
865 tokio::time::sleep(Duration::from_millis(50)).await;
866 Ok::<i32, TestError>(42)
867 },
868 should_retry_test_error,
869 create_test_error,
870 )
871 .await;
872
873 let elapsed = start.elapsed();
874 assert_eq!(result.unwrap(), 42);
875 assert!(elapsed.as_millis() >= 30);
877 assert!(elapsed.as_millis() < 200);
878 }
879
880 #[tokio::test]
881 async fn test_zero_retries() {
882 let config = RetryConfig {
883 max_retries: 0,
884 initial_delay_ms: 10,
885 max_delay_ms: 50,
886 backoff_factor: 2.0,
887 jitter_ms: 0,
888 operation_timeout_ms: None,
889 immediate_first: false,
890 max_elapsed_ms: None,
891 };
892 let manager = RetryManager::new(config).unwrap();
893
894 let attempt_counter = Arc::new(AtomicU32::new(0));
895 let counter_clone = attempt_counter.clone();
896
897 let result = manager
898 .execute_with_retry(
899 "test_no_retries",
900 move || {
901 let counter = counter_clone.clone();
902 async move {
903 counter.fetch_add(1, Ordering::SeqCst);
904 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
905 }
906 },
907 should_retry_test_error,
908 create_test_error,
909 )
910 .await;
911
912 assert!(result.is_err());
913 assert_eq!(attempt_counter.load(Ordering::SeqCst), 1);
915 }
916
917 #[tokio::test(start_paused = true)]
918 async fn test_jitter_applied() {
919 let config = RetryConfig {
920 max_retries: 2,
921 initial_delay_ms: 50,
922 max_delay_ms: 100,
923 backoff_factor: 2.0,
924 jitter_ms: 50, operation_timeout_ms: None,
926 immediate_first: false,
927 max_elapsed_ms: None,
928 };
929 let manager = RetryManager::new(config).unwrap();
930
931 let delays = Arc::new(std::sync::Mutex::new(Vec::new()));
932 let delays_clone = delays.clone();
933 let last_time = Arc::new(std::sync::Mutex::new(tokio::time::Instant::now()));
934 let last_time_clone = last_time.clone();
935
936 let handle = tokio::spawn({
937 let delays_clone = delays_clone.clone();
938 async move {
939 let _ = manager
940 .execute_with_retry(
941 "test_jitter",
942 move || {
943 let delays = delays_clone.clone();
944 let last_time = last_time_clone.clone();
945 async move {
946 let now = tokio::time::Instant::now();
947 let delay = {
948 let mut last = last_time.lock().expect(MUTEX_POISONED);
949 let d = now.duration_since(*last);
950 *last = now;
951 d
952 };
953 delays.lock().expect(MUTEX_POISONED).push(delay);
954 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
955 }
956 },
957 should_retry_test_error,
958 create_test_error,
959 )
960 .await;
961 }
962 });
963
964 yield_until(|| !delays.lock().expect(MUTEX_POISONED).is_empty()).await;
965 advance_until(|| delays.lock().expect(MUTEX_POISONED).len() >= 2).await;
966 advance_until(|| delays.lock().expect(MUTEX_POISONED).len() >= 3).await;
967
968 handle.await.unwrap();
969
970 let delays = delays.lock().expect(MUTEX_POISONED);
971 for delay in delays.iter().skip(1) {
973 assert!(delay.as_millis() >= 50);
975 assert!(delay.as_millis() <= 151);
977 }
978 }
979
980 #[tokio::test]
981 async fn test_max_elapsed_stops_early() {
982 let config = RetryConfig {
983 max_retries: 100, initial_delay_ms: 50,
985 max_delay_ms: 100,
986 backoff_factor: 1.5,
987 jitter_ms: 0,
988 operation_timeout_ms: None,
989 immediate_first: false,
990 max_elapsed_ms: Some(150), };
992 let manager = RetryManager::new(config).unwrap();
993
994 let attempt_counter = Arc::new(AtomicU32::new(0));
995 let counter_clone = attempt_counter.clone();
996
997 let start = tokio::time::Instant::now();
998 let result = manager
999 .execute_with_retry(
1000 "test_elapsed_limit",
1001 move || {
1002 let counter = counter_clone.clone();
1003 async move {
1004 counter.fetch_add(1, Ordering::SeqCst);
1005 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1006 }
1007 },
1008 should_retry_test_error,
1009 create_test_error,
1010 )
1011 .await;
1012
1013 let elapsed = start.elapsed();
1014 assert!(result.is_err());
1015 assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
1016
1017 let attempts = attempt_counter.load(Ordering::SeqCst);
1019 assert!(attempts < 10); assert!(elapsed.as_millis() >= 100);
1021 }
1022
1023 #[tokio::test]
1024 async fn test_mixed_errors_retry_behavior() {
1025 let config = RetryConfig {
1026 max_retries: 5,
1027 initial_delay_ms: 10,
1028 max_delay_ms: 50,
1029 backoff_factor: 2.0,
1030 jitter_ms: 0,
1031 operation_timeout_ms: None,
1032 immediate_first: false,
1033 max_elapsed_ms: None,
1034 };
1035 let manager = RetryManager::new(config).unwrap();
1036
1037 let attempt_counter = Arc::new(AtomicU32::new(0));
1038 let counter_clone = attempt_counter.clone();
1039
1040 let result = manager
1041 .execute_with_retry(
1042 "test_mixed_errors",
1043 move || {
1044 let counter = counter_clone.clone();
1045 async move {
1046 let attempts = counter.fetch_add(1, Ordering::SeqCst);
1047 match attempts {
1048 0 => Err(TestError::Retryable("retry 1".to_string())),
1049 1 => Err(TestError::Retryable("retry 2".to_string())),
1050 2 => Err(TestError::NonRetryable("stop here".to_string())),
1051 _ => Ok(42),
1052 }
1053 }
1054 },
1055 should_retry_test_error,
1056 create_test_error,
1057 )
1058 .await;
1059
1060 assert!(result.is_err());
1061 assert!(matches!(result.unwrap_err(), TestError::NonRetryable(_)));
1062 assert_eq!(attempt_counter.load(Ordering::SeqCst), 3);
1064 }
1065
1066 #[tokio::test]
1067 async fn test_cancellation_during_retry_delay() {
1068 use tokio_util::sync::CancellationToken;
1069
1070 let config = RetryConfig {
1071 max_retries: 10,
1072 initial_delay_ms: 500, max_delay_ms: 1000,
1074 backoff_factor: 2.0,
1075 jitter_ms: 0,
1076 operation_timeout_ms: None,
1077 immediate_first: false,
1078 max_elapsed_ms: None,
1079 };
1080 let manager = RetryManager::new(config).unwrap();
1081
1082 let token = CancellationToken::new();
1083 let token_clone = token.clone();
1084
1085 tokio::spawn(async move {
1087 tokio::time::sleep(Duration::from_millis(100)).await;
1088 token_clone.cancel();
1089 });
1090
1091 let attempt_counter = Arc::new(AtomicU32::new(0));
1092 let counter_clone = attempt_counter.clone();
1093
1094 let start = tokio::time::Instant::now();
1095 let result = manager
1096 .execute_with_retry_with_cancel(
1097 "test_cancellation",
1098 move || {
1099 let counter = counter_clone.clone();
1100 async move {
1101 counter.fetch_add(1, Ordering::SeqCst);
1102 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1103 }
1104 },
1105 should_retry_test_error,
1106 create_test_error,
1107 &token,
1108 )
1109 .await;
1110
1111 let elapsed = start.elapsed();
1112
1113 assert!(result.is_err());
1115 let error_msg = format!("{}", result.unwrap_err());
1116 assert!(error_msg.contains("canceled"));
1117
1118 assert!(elapsed.as_millis() < 600);
1120
1121 let attempts = attempt_counter.load(Ordering::SeqCst);
1123 assert!(attempts >= 1);
1124 }
1125
1126 #[tokio::test]
1127 async fn test_cancellation_during_operation_execution() {
1128 use tokio_util::sync::CancellationToken;
1129
1130 let config = RetryConfig {
1131 max_retries: 5,
1132 initial_delay_ms: 50,
1133 max_delay_ms: 100,
1134 backoff_factor: 2.0,
1135 jitter_ms: 0,
1136 operation_timeout_ms: None,
1137 immediate_first: false,
1138 max_elapsed_ms: None,
1139 };
1140 let manager = RetryManager::new(config).unwrap();
1141
1142 let token = CancellationToken::new();
1143 let token_clone = token.clone();
1144
1145 tokio::spawn(async move {
1147 tokio::time::sleep(Duration::from_millis(50)).await;
1148 token_clone.cancel();
1149 });
1150
1151 let start = tokio::time::Instant::now();
1152 let result = manager
1153 .execute_with_retry_with_cancel(
1154 "test_cancellation_during_op",
1155 || async {
1156 tokio::time::sleep(Duration::from_millis(200)).await;
1158 Ok::<i32, TestError>(42)
1159 },
1160 should_retry_test_error,
1161 create_test_error,
1162 &token,
1163 )
1164 .await;
1165
1166 let elapsed = start.elapsed();
1167
1168 assert!(result.is_err());
1170 let error_msg = format!("{}", result.unwrap_err());
1171 assert!(error_msg.contains("canceled"));
1172
1173 assert!(elapsed.as_millis() < 250);
1175 }
1176
1177 #[tokio::test]
1178 async fn test_cancellation_error_message() {
1179 use tokio_util::sync::CancellationToken;
1180
1181 let config = RetryConfig::default();
1182 let manager = RetryManager::new(config).unwrap();
1183
1184 let token = CancellationToken::new();
1185 token.cancel(); let result = manager
1188 .execute_with_retry_with_cancel(
1189 "test_operation",
1190 || async { Ok::<i32, TestError>(42) },
1191 should_retry_test_error,
1192 create_test_error,
1193 &token,
1194 )
1195 .await;
1196
1197 assert!(result.is_err());
1198 let error_msg = format!("{}", result.unwrap_err());
1199 assert!(error_msg.contains("canceled"));
1200 }
1201}
1202
1203#[cfg(test)]
1204mod proptest_tests {
1205 use std::sync::{
1206 Arc,
1207 atomic::{AtomicU32, Ordering},
1208 };
1209
1210 use nautilus_core::MUTEX_POISONED;
1211 use proptest::prelude::*;
1212 use rstest::rstest;
1214
1215 use super::{
1216 test_utils::*,
1217 tests::{advance_until, yield_until},
1218 *,
1219 };
1220
1221 proptest! {
1222 #[rstest]
1223 fn test_retry_config_valid_ranges(
1224 max_retries in 0u32..100,
1225 initial_delay_ms in 1u64..10_000,
1226 max_delay_ms in 1u64..60_000,
1227 backoff_factor in 1.0f64..10.0,
1228 jitter_ms in 0u64..1_000,
1229 operation_timeout_ms in prop::option::of(1u64..120_000),
1230 immediate_first in any::<bool>(),
1231 max_elapsed_ms in prop::option::of(1u64..300_000)
1232 ) {
1233 let max_delay_ms = max_delay_ms.max(initial_delay_ms);
1235
1236 let config = RetryConfig {
1237 max_retries,
1238 initial_delay_ms,
1239 max_delay_ms,
1240 backoff_factor,
1241 jitter_ms,
1242 operation_timeout_ms,
1243 immediate_first,
1244 max_elapsed_ms,
1245 };
1246
1247 let manager = RetryManager::<std::io::Error>::new(config);
1249 prop_assert!(manager.is_ok());
1250 }
1251
1252 #[rstest]
1253 fn test_retry_attempts_bounded(
1254 max_retries in 0u32..5,
1255 initial_delay_ms in 1u64..10,
1256 backoff_factor in 1.0f64..2.0,
1257 ) {
1258 let rt = tokio::runtime::Builder::new_current_thread()
1259 .enable_time()
1260 .build()
1261 .unwrap();
1262
1263 let config = RetryConfig {
1264 max_retries,
1265 initial_delay_ms,
1266 max_delay_ms: initial_delay_ms * 2,
1267 backoff_factor,
1268 jitter_ms: 0,
1269 operation_timeout_ms: None,
1270 immediate_first: false,
1271 max_elapsed_ms: None,
1272 };
1273
1274 let manager = RetryManager::new(config).unwrap();
1275 let attempt_counter = Arc::new(AtomicU32::new(0));
1276 let counter_clone = attempt_counter.clone();
1277
1278 let _result = rt.block_on(manager.execute_with_retry(
1279 "prop_test",
1280 move || {
1281 let counter = counter_clone.clone();
1282 async move {
1283 counter.fetch_add(1, Ordering::SeqCst);
1284 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1285 }
1286 },
1287 |e: &TestError| matches!(e, TestError::Retryable(_)),
1288 TestError::Timeout,
1289 ));
1290
1291 let attempts = attempt_counter.load(Ordering::SeqCst);
1292 prop_assert_eq!(attempts, max_retries + 1);
1294 }
1295
1296 #[rstest]
1297 fn test_timeout_always_respected(
1298 timeout_ms in 10u64..50,
1299 operation_delay_ms in 60u64..100,
1300 ) {
1301 let rt = tokio::runtime::Builder::new_current_thread()
1302 .enable_time()
1303 .start_paused(true)
1304 .build()
1305 .unwrap();
1306
1307 let config = RetryConfig {
1308 max_retries: 0, initial_delay_ms: 10,
1310 max_delay_ms: 100,
1311 backoff_factor: 2.0,
1312 jitter_ms: 0,
1313 operation_timeout_ms: Some(timeout_ms),
1314 immediate_first: false,
1315 max_elapsed_ms: None,
1316 };
1317
1318 let manager = RetryManager::new(config).unwrap();
1319
1320 let result = rt.block_on(async {
1321 let operation_future = manager.execute_with_retry(
1322 "timeout_test",
1323 move || async move {
1324 tokio::time::sleep(Duration::from_millis(operation_delay_ms)).await;
1325 Ok::<i32, TestError>(42)
1326 },
1327 |_: &TestError| true,
1328 TestError::Timeout,
1329 );
1330
1331 tokio::time::advance(Duration::from_millis(timeout_ms + 10)).await;
1333 operation_future.await
1334 });
1335
1336 prop_assert!(result.is_err());
1338 prop_assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
1339 }
1340
1341 #[rstest]
1342 fn test_max_elapsed_always_respected(
1343 max_elapsed_ms in 20u64..50,
1344 delay_per_retry in 15u64..30,
1345 max_retries in 10u32..20,
1346 ) {
1347 let rt = tokio::runtime::Builder::new_current_thread()
1348 .enable_time()
1349 .start_paused(true)
1350 .build()
1351 .unwrap();
1352
1353 let config = RetryConfig {
1355 max_retries,
1356 initial_delay_ms: delay_per_retry,
1357 max_delay_ms: delay_per_retry * 2,
1358 backoff_factor: 1.0, jitter_ms: 0,
1360 operation_timeout_ms: None,
1361 immediate_first: false,
1362 max_elapsed_ms: Some(max_elapsed_ms),
1363 };
1364
1365 let manager = RetryManager::new(config).unwrap();
1366 let attempt_counter = Arc::new(AtomicU32::new(0));
1367 let counter_clone = attempt_counter.clone();
1368
1369 let result = rt.block_on(async {
1370 let operation_future = manager.execute_with_retry(
1371 "elapsed_test",
1372 move || {
1373 let counter = counter_clone.clone();
1374 async move {
1375 counter.fetch_add(1, Ordering::SeqCst);
1376 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1377 }
1378 },
1379 |e: &TestError| matches!(e, TestError::Retryable(_)),
1380 TestError::Timeout,
1381 );
1382
1383 tokio::time::advance(Duration::from_millis(max_elapsed_ms + delay_per_retry)).await;
1385 operation_future.await
1386 });
1387
1388 let attempts = attempt_counter.load(Ordering::SeqCst);
1389
1390 prop_assert!(result.is_err());
1392 prop_assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
1393
1394 prop_assert!(attempts <= max_retries + 1);
1396 }
1397
1398 #[rstest]
1399 fn test_jitter_bounds(
1400 jitter_ms in 0u64..20,
1401 base_delay_ms in 10u64..30,
1402 ) {
1403 let rt = tokio::runtime::Builder::new_current_thread()
1404 .enable_time()
1405 .start_paused(true)
1406 .build()
1407 .unwrap();
1408
1409 let config = RetryConfig {
1410 max_retries: 2,
1411 initial_delay_ms: base_delay_ms,
1412 max_delay_ms: base_delay_ms * 2,
1413 backoff_factor: 1.0, jitter_ms,
1415 operation_timeout_ms: None,
1416 immediate_first: false,
1417 max_elapsed_ms: None,
1418 };
1419
1420 let manager = RetryManager::new(config).unwrap();
1421 let attempt_times = Arc::new(std::sync::Mutex::new(Vec::new()));
1422 let attempt_times_for_block = attempt_times.clone();
1423
1424 rt.block_on(async move {
1425 let attempt_times_for_wait = attempt_times_for_block.clone();
1426 let handle = tokio::spawn({
1427 let attempt_times_for_task = attempt_times_for_block.clone();
1428 let manager = manager;
1429 async move {
1430 let start_time = tokio::time::Instant::now();
1431 let _ = manager
1432 .execute_with_retry(
1433 "jitter_test",
1434 move || {
1435 let attempt_times_inner = attempt_times_for_task.clone();
1436 async move {
1437 attempt_times_inner
1438 .lock()
1439 .unwrap()
1440 .push(start_time.elapsed());
1441 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1442 }
1443 },
1444 |e: &TestError| matches!(e, TestError::Retryable(_)),
1445 TestError::Timeout,
1446 )
1447 .await;
1448 }
1449 });
1450
1451 yield_until(|| !attempt_times_for_wait.lock().expect(MUTEX_POISONED).is_empty()).await;
1452 advance_until(|| attempt_times_for_wait.lock().expect(MUTEX_POISONED).len() >= 2).await;
1453 advance_until(|| attempt_times_for_wait.lock().expect(MUTEX_POISONED).len() >= 3).await;
1454
1455 handle.await.unwrap();
1456 });
1457
1458 let times = attempt_times.lock().expect(MUTEX_POISONED);
1459
1460 prop_assert!(times.len() >= 2);
1462
1463 prop_assert!(times[0].as_millis() < 5);
1465
1466 for i in 1..times.len() {
1468 let delay_from_previous = if i == 1 {
1469 times[i] - times[0]
1470 } else {
1471 times[i] - times[i - 1]
1472 };
1473
1474 prop_assert!(
1476 delay_from_previous.as_millis() >= base_delay_ms as u128,
1477 "Retry {} delay {}ms is less than base {}ms",
1478 i, delay_from_previous.as_millis(), base_delay_ms
1479 );
1480
1481 prop_assert!(
1483 delay_from_previous.as_millis() <= (base_delay_ms + jitter_ms + 1) as u128,
1484 "Retry {} delay {}ms exceeds base {} + jitter {}",
1485 i, delay_from_previous.as_millis(), base_delay_ms, jitter_ms
1486 );
1487 }
1488 }
1489
1490 #[rstest]
1491 fn test_immediate_first_property(
1492 immediate_first in any::<bool>(),
1493 initial_delay_ms in 10u64..30,
1494 ) {
1495 let rt = tokio::runtime::Builder::new_current_thread()
1496 .enable_time()
1497 .start_paused(true)
1498 .build()
1499 .unwrap();
1500
1501 let config = RetryConfig {
1502 max_retries: 2,
1503 initial_delay_ms,
1504 max_delay_ms: initial_delay_ms * 2,
1505 backoff_factor: 2.0,
1506 jitter_ms: 0,
1507 operation_timeout_ms: None,
1508 immediate_first,
1509 max_elapsed_ms: None,
1510 };
1511
1512 let manager = RetryManager::new(config).unwrap();
1513 let attempt_times = Arc::new(std::sync::Mutex::new(Vec::new()));
1514 let attempt_times_for_block = attempt_times.clone();
1515
1516 rt.block_on(async move {
1517 let attempt_times_for_wait = attempt_times_for_block.clone();
1518 let handle = tokio::spawn({
1519 let attempt_times_for_task = attempt_times_for_block.clone();
1520 let manager = manager;
1521 async move {
1522 let start = tokio::time::Instant::now();
1523 let _ = manager
1524 .execute_with_retry(
1525 "immediate_test",
1526 move || {
1527 let attempt_times_inner = attempt_times_for_task.clone();
1528 async move {
1529 let elapsed = start.elapsed();
1530 attempt_times_inner.lock().expect(MUTEX_POISONED).push(elapsed);
1531 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1532 }
1533 },
1534 |e: &TestError| matches!(e, TestError::Retryable(_)),
1535 TestError::Timeout,
1536 )
1537 .await;
1538 }
1539 });
1540
1541 yield_until(|| !attempt_times_for_wait.lock().expect(MUTEX_POISONED).is_empty()).await;
1542 advance_until(|| attempt_times_for_wait.lock().expect(MUTEX_POISONED).len() >= 2).await;
1543 advance_until(|| attempt_times_for_wait.lock().expect(MUTEX_POISONED).len() >= 3).await;
1544
1545 handle.await.unwrap();
1546 });
1547
1548 let times = attempt_times.lock().expect(MUTEX_POISONED);
1549 prop_assert!(times.len() >= 2);
1550
1551 if immediate_first {
1552 prop_assert!(times[1].as_millis() < 20,
1554 "With immediate_first=true, first retry took {}ms",
1555 times[1].as_millis());
1556 } else {
1557 prop_assert!(times[1].as_millis() >= (initial_delay_ms - 1) as u128,
1559 "With immediate_first=false, first retry was too fast: {}ms",
1560 times[1].as_millis());
1561 }
1562 }
1563
1564 #[rstest]
1565 fn test_non_retryable_stops_immediately(
1566 attempt_before_non_retryable in 0usize..3,
1567 max_retries in 3u32..5,
1568 ) {
1569 let rt = tokio::runtime::Builder::new_current_thread()
1570 .enable_time()
1571 .build()
1572 .unwrap();
1573
1574 let config = RetryConfig {
1575 max_retries,
1576 initial_delay_ms: 10,
1577 max_delay_ms: 100,
1578 backoff_factor: 2.0,
1579 jitter_ms: 0,
1580 operation_timeout_ms: None,
1581 immediate_first: false,
1582 max_elapsed_ms: None,
1583 };
1584
1585 let manager = RetryManager::new(config).unwrap();
1586 let attempt_counter = Arc::new(AtomicU32::new(0));
1587 let counter_clone = attempt_counter.clone();
1588
1589 let result: Result<i32, TestError> = rt.block_on(manager.execute_with_retry(
1590 "non_retryable_test",
1591 move || {
1592 let counter = counter_clone.clone();
1593 async move {
1594 let attempts = counter.fetch_add(1, Ordering::SeqCst) as usize;
1595 if attempts == attempt_before_non_retryable {
1596 Err(TestError::NonRetryable("stop".to_string()))
1597 } else {
1598 Err(TestError::Retryable("retry".to_string()))
1599 }
1600 }
1601 },
1602 |e: &TestError| matches!(e, TestError::Retryable(_)),
1603 TestError::Timeout,
1604 ));
1605
1606 let attempts = attempt_counter.load(Ordering::SeqCst) as usize;
1607
1608 prop_assert!(result.is_err());
1609 prop_assert!(matches!(result.unwrap_err(), TestError::NonRetryable(_)));
1610 prop_assert_eq!(attempts, attempt_before_non_retryable + 1);
1612 }
1613
1614 #[rstest]
1615 fn test_cancellation_stops_immediately(
1616 cancel_after_ms in 10u64..100,
1617 initial_delay_ms in 200u64..500,
1618 ) {
1619 use tokio_util::sync::CancellationToken;
1620
1621 let rt = tokio::runtime::Builder::new_current_thread()
1622 .enable_time()
1623 .start_paused(true)
1624 .build()
1625 .unwrap();
1626
1627 let config = RetryConfig {
1628 max_retries: 10,
1629 initial_delay_ms,
1630 max_delay_ms: initial_delay_ms * 2,
1631 backoff_factor: 2.0,
1632 jitter_ms: 0,
1633 operation_timeout_ms: None,
1634 immediate_first: false,
1635 max_elapsed_ms: None,
1636 };
1637
1638 let manager = RetryManager::new(config).unwrap();
1639 let token = CancellationToken::new();
1640 let token_clone = token.clone();
1641
1642 let result: Result<i32, TestError> = rt.block_on(async {
1643 tokio::spawn(async move {
1645 tokio::time::sleep(Duration::from_millis(cancel_after_ms)).await;
1646 token_clone.cancel();
1647 });
1648
1649 let operation_future = manager.execute_with_retry_with_cancel(
1650 "cancellation_test",
1651 || async {
1652 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1653 },
1654 |e: &TestError| matches!(e, TestError::Retryable(_)),
1655 create_test_error,
1656 &token,
1657 );
1658
1659 tokio::time::advance(Duration::from_millis(cancel_after_ms + 10)).await;
1661 operation_future.await
1662 });
1663
1664 prop_assert!(result.is_err());
1666 let error_msg = format!("{}", result.unwrap_err());
1667 prop_assert!(error_msg.contains("canceled"));
1668 }
1669
1670 #[rstest]
1671 fn test_budget_clamp_prevents_overshoot(
1672 max_elapsed_ms in 10u64..30,
1673 delay_per_retry in 20u64..50,
1674 ) {
1675 let rt = tokio::runtime::Builder::new_current_thread()
1676 .enable_time()
1677 .start_paused(true)
1678 .build()
1679 .unwrap();
1680
1681 let config = RetryConfig {
1683 max_retries: 5,
1684 initial_delay_ms: delay_per_retry,
1685 max_delay_ms: delay_per_retry * 2,
1686 backoff_factor: 1.0,
1687 jitter_ms: 0,
1688 operation_timeout_ms: None,
1689 immediate_first: false,
1690 max_elapsed_ms: Some(max_elapsed_ms),
1691 };
1692
1693 let manager = RetryManager::new(config).unwrap();
1694
1695 let _result = rt.block_on(async {
1696 let operation_future = manager.execute_with_retry(
1697 "budget_clamp_test",
1698 || async {
1699 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1701 },
1702 |e: &TestError| matches!(e, TestError::Retryable(_)),
1703 create_test_error,
1704 );
1705
1706 tokio::time::advance(Duration::from_millis(max_elapsed_ms + delay_per_retry)).await;
1708 operation_future.await
1709 });
1710
1711 }
1714
1715 #[rstest]
1716 fn test_success_on_kth_attempt(
1717 k in 1usize..5,
1718 initial_delay_ms in 5u64..20,
1719 ) {
1720 let rt = tokio::runtime::Builder::new_current_thread()
1721 .enable_time()
1722 .start_paused(true)
1723 .build()
1724 .unwrap();
1725
1726 let config = RetryConfig {
1727 max_retries: 10, initial_delay_ms,
1729 max_delay_ms: initial_delay_ms * 4,
1730 backoff_factor: 2.0,
1731 jitter_ms: 0,
1732 operation_timeout_ms: None,
1733 immediate_first: false,
1734 max_elapsed_ms: None,
1735 };
1736
1737 let manager = RetryManager::new(config).unwrap();
1738 let attempt_counter = Arc::new(AtomicU32::new(0));
1739 let counter_clone = attempt_counter.clone();
1740 let target_k = k;
1741
1742 let (result, _elapsed) = rt.block_on(async {
1743 let start = tokio::time::Instant::now();
1744
1745 let operation_future = manager.execute_with_retry(
1746 "kth_attempt_test",
1747 move || {
1748 let counter = counter_clone.clone();
1749 async move {
1750 let attempt = counter.fetch_add(1, Ordering::SeqCst) as usize;
1751 if attempt + 1 == target_k {
1752 Ok(42)
1753 } else {
1754 Err(TestError::Retryable("retry".to_string()))
1755 }
1756 }
1757 },
1758 |e: &TestError| matches!(e, TestError::Retryable(_)),
1759 create_test_error,
1760 );
1761
1762 for _ in 0..k {
1764 tokio::time::advance(Duration::from_millis(initial_delay_ms * 4)).await;
1765 }
1766
1767 let result = operation_future.await;
1768 let elapsed = start.elapsed();
1769
1770 (result, elapsed)
1771 });
1772
1773 let attempts = attempt_counter.load(Ordering::SeqCst) as usize;
1774
1775 prop_assert!(result.is_ok());
1777 prop_assert_eq!(result.unwrap(), 42);
1778 prop_assert_eq!(attempts, k);
1779 }
1780 }
1781}