1use std::{future::Future, marker::PhantomData, time::Duration};
19
20use tokio_util::sync::CancellationToken;
21
22use crate::backoff::ExponentialBackoff;
23
24#[derive(Debug, Clone)]
26pub struct RetryConfig {
27 pub max_retries: u32,
29 pub initial_delay_ms: u64,
31 pub max_delay_ms: u64,
33 pub backoff_factor: f64,
35 pub jitter_ms: u64,
37 pub operation_timeout_ms: Option<u64>,
40 pub immediate_first: bool,
43 pub max_elapsed_ms: Option<u64>,
46}
47
48impl Default for RetryConfig {
49 fn default() -> Self {
50 Self {
51 max_retries: 3,
52 initial_delay_ms: 1_000,
53 max_delay_ms: 10_000,
54 backoff_factor: 2.0,
55 jitter_ms: 100,
56 operation_timeout_ms: Some(30_000),
57 immediate_first: false,
58 max_elapsed_ms: None,
59 }
60 }
61}
62
63#[derive(Clone, Debug)]
67pub struct RetryManager<E> {
68 config: RetryConfig,
69 _phantom: PhantomData<E>,
70}
71
72impl<E> RetryManager<E>
73where
74 E: std::error::Error,
75{
76 pub const fn new(config: RetryConfig) -> Self {
78 Self {
79 config,
80 _phantom: PhantomData,
81 }
82 }
83
84 #[inline(always)]
86 fn budget_exceeded_msg(&self, attempt: u32) -> String {
87 format!(
88 "Retry budget exceeded ({}/{})",
89 attempt.saturating_add(1),
90 self.config.max_retries.saturating_add(1)
91 )
92 }
93
94 pub async fn execute_with_retry_inner<F, Fut, T>(
108 &self,
109 operation_name: &str,
110 mut operation: F,
111 should_retry: impl Fn(&E) -> bool,
112 create_error: impl Fn(String) -> E,
113 cancel: Option<&CancellationToken>,
114 ) -> Result<T, E>
115 where
116 F: FnMut() -> Fut,
117 Fut: Future<Output = Result<T, E>>,
118 {
119 let mut backoff = ExponentialBackoff::new(
120 Duration::from_millis(self.config.initial_delay_ms),
121 Duration::from_millis(self.config.max_delay_ms),
122 self.config.backoff_factor,
123 self.config.jitter_ms,
124 self.config.immediate_first,
125 )
126 .map_err(|e| create_error(format!("Invalid configuration: {e}")))?;
127
128 let mut attempt = 0;
129 let start_time = tokio::time::Instant::now();
130
131 loop {
132 if let Some(token) = cancel
133 && token.is_cancelled()
134 {
135 tracing::debug!(
136 operation = %operation_name,
137 attempts = attempt,
138 "Operation canceled"
139 );
140 return Err(create_error("canceled".to_string()));
141 }
142
143 if let Some(max_elapsed_ms) = self.config.max_elapsed_ms {
144 let elapsed = start_time.elapsed();
145 if elapsed.as_millis() >= u128::from(max_elapsed_ms) {
146 return Err(create_error(self.budget_exceeded_msg(attempt)));
147 }
148 }
149
150 let result = match (self.config.operation_timeout_ms, cancel) {
151 (Some(timeout_ms), Some(token)) => {
152 tokio::select! {
153 result = tokio::time::timeout(Duration::from_millis(timeout_ms), operation()) => result,
154 () = token.cancelled() => {
155 tracing::debug!(
156 operation = %operation_name,
157 "Operation canceled during execution"
158 );
159 return Err(create_error("canceled".to_string()));
160 }
161 }
162 }
163 (Some(timeout_ms), None) => {
164 tokio::time::timeout(Duration::from_millis(timeout_ms), operation()).await
165 }
166 (None, Some(token)) => {
167 tokio::select! {
168 result = operation() => Ok(result),
169 () = token.cancelled() => {
170 tracing::debug!(
171 operation = %operation_name,
172 "Operation canceled during execution"
173 );
174 return Err(create_error("canceled".to_string()));
175 }
176 }
177 }
178 (None, None) => Ok(operation().await),
179 };
180
181 match result {
182 Ok(Ok(success)) => {
183 if attempt > 0 {
184 tracing::trace!(
185 operation = %operation_name,
186 attempts = attempt + 1,
187 "Retry succeeded"
188 );
189 }
190 return Ok(success);
191 }
192 Ok(Err(e)) => {
193 if !should_retry(&e) {
194 tracing::trace!(
195 operation = %operation_name,
196 error = %e,
197 "Non-retryable error"
198 );
199 return Err(e);
200 }
201
202 if attempt >= self.config.max_retries {
203 tracing::trace!(
204 operation = %operation_name,
205 attempts = attempt + 1,
206 error = %e,
207 "Retries exhausted"
208 );
209 return Err(e);
210 }
211
212 let mut delay = backoff.next_duration();
213
214 if let Some(max_elapsed_ms) = self.config.max_elapsed_ms {
215 let elapsed = start_time.elapsed();
216 let remaining =
217 Duration::from_millis(max_elapsed_ms).saturating_sub(elapsed);
218
219 if remaining.is_zero() {
220 return Err(create_error(self.budget_exceeded_msg(attempt)));
221 }
222
223 delay = delay.min(remaining);
224 }
225
226 tracing::trace!(
227 operation = %operation_name,
228 attempt = attempt + 1,
229 delay_ms = delay.as_millis() as u64,
230 error = %e,
231 "Retrying after failure"
232 );
233
234 if delay.is_zero() {
236 tokio::task::yield_now().await;
237 attempt += 1;
238 continue;
239 }
240
241 if let Some(token) = cancel {
242 tokio::select! {
243 () = tokio::time::sleep(delay) => {},
244 () = token.cancelled() => {
245 tracing::debug!(
246 operation = %operation_name,
247 attempt = attempt + 1,
248 "Operation canceled during retry delay"
249 );
250 return Err(create_error("canceled".to_string()));
251 }
252 }
253 } else {
254 tokio::time::sleep(delay).await;
255 }
256 attempt += 1;
257 }
258 Err(_) => {
259 let e = create_error(format!(
260 "Timed out after {}ms",
261 self.config.operation_timeout_ms.unwrap_or(0)
262 ));
263
264 if !should_retry(&e) {
265 tracing::trace!(
266 operation = %operation_name,
267 error = %e,
268 "Non-retryable timeout"
269 );
270 return Err(e);
271 }
272
273 if attempt >= self.config.max_retries {
274 tracing::trace!(
275 operation = %operation_name,
276 attempts = attempt + 1,
277 error = %e,
278 "Retries exhausted after timeout"
279 );
280 return Err(e);
281 }
282
283 let mut delay = backoff.next_duration();
284
285 if let Some(max_elapsed_ms) = self.config.max_elapsed_ms {
286 let elapsed = start_time.elapsed();
287 let remaining =
288 Duration::from_millis(max_elapsed_ms).saturating_sub(elapsed);
289
290 if remaining.is_zero() {
291 return Err(create_error(self.budget_exceeded_msg(attempt)));
292 }
293
294 delay = delay.min(remaining);
295 }
296
297 tracing::trace!(
298 operation = %operation_name,
299 attempt = attempt + 1,
300 delay_ms = delay.as_millis() as u64,
301 error = %e,
302 "Retrying after timeout"
303 );
304
305 if delay.is_zero() {
307 tokio::task::yield_now().await;
308 attempt += 1;
309 continue;
310 }
311
312 if let Some(token) = cancel {
313 tokio::select! {
314 () = tokio::time::sleep(delay) => {},
315 () = token.cancelled() => {
316 tracing::debug!(
317 operation = %operation_name,
318 attempt = attempt + 1,
319 "Operation canceled during retry delay"
320 );
321 return Err(create_error("canceled".to_string()));
322 }
323 }
324 } else {
325 tokio::time::sleep(delay).await;
326 }
327 attempt += 1;
328 }
329 }
330 }
331 }
332
333 pub async fn execute_with_retry<F, Fut, T>(
340 &self,
341 operation_name: &str,
342 operation: F,
343 should_retry: impl Fn(&E) -> bool,
344 create_error: impl Fn(String) -> E,
345 ) -> Result<T, E>
346 where
347 F: FnMut() -> Fut,
348 Fut: Future<Output = Result<T, E>>,
349 {
350 self.execute_with_retry_inner(operation_name, operation, should_retry, create_error, None)
351 .await
352 }
353
354 pub async fn execute_with_retry_with_cancel<F, Fut, T>(
361 &self,
362 operation_name: &str,
363 operation: F,
364 should_retry: impl Fn(&E) -> bool,
365 create_error: impl Fn(String) -> E,
366 cancellation_token: &CancellationToken,
367 ) -> Result<T, E>
368 where
369 F: FnMut() -> Fut,
370 Fut: Future<Output = Result<T, E>>,
371 {
372 self.execute_with_retry_inner(
373 operation_name,
374 operation,
375 should_retry,
376 create_error,
377 Some(cancellation_token),
378 )
379 .await
380 }
381}
382
383pub fn create_default_retry_manager<E>() -> RetryManager<E>
385where
386 E: std::error::Error,
387{
388 RetryManager::new(RetryConfig::default())
389}
390
391pub const fn create_http_retry_manager<E>() -> RetryManager<E>
393where
394 E: std::error::Error,
395{
396 let config = RetryConfig {
397 max_retries: 3,
398 initial_delay_ms: 1_000,
399 max_delay_ms: 10_000,
400 backoff_factor: 2.0,
401 jitter_ms: 1_000,
402 operation_timeout_ms: Some(60_000), immediate_first: false,
404 max_elapsed_ms: Some(180_000), };
406 RetryManager::new(config)
407}
408
409pub const fn create_websocket_retry_manager<E>() -> RetryManager<E>
411where
412 E: std::error::Error,
413{
414 let config = RetryConfig {
415 max_retries: 5,
416 initial_delay_ms: 1_000,
417 max_delay_ms: 10_000,
418 backoff_factor: 2.0,
419 jitter_ms: 1_000,
420 operation_timeout_ms: Some(30_000), immediate_first: true,
422 max_elapsed_ms: Some(120_000), };
424 RetryManager::new(config)
425}
426
427#[cfg(test)]
432mod test_utils {
433 #[derive(Debug, thiserror::Error)]
434 pub enum TestError {
435 #[error("Retryable error: {0}")]
436 Retryable(String),
437 #[error("Non-retryable error: {0}")]
438 NonRetryable(String),
439 #[error("Timeout error: {0}")]
440 Timeout(String),
441 }
442
443 pub fn should_retry_test_error(error: &TestError) -> bool {
444 matches!(error, TestError::Retryable(_))
445 }
446
447 pub fn create_test_error(msg: String) -> TestError {
448 TestError::Timeout(msg)
449 }
450}
451
452#[cfg(test)]
453mod tests {
454 use std::sync::{
455 Arc,
456 atomic::{AtomicU32, Ordering},
457 };
458
459 use nautilus_core::MUTEX_POISONED;
460 use rstest::rstest;
461
462 use super::{test_utils::*, *};
463
464 const MAX_WAIT_ITERS: usize = 10_000;
465 const MAX_ADVANCE_ITERS: usize = 10_000;
466
467 pub(crate) async fn yield_until<F>(mut condition: F)
468 where
469 F: FnMut() -> bool,
470 {
471 for _ in 0..MAX_WAIT_ITERS {
472 if condition() {
473 return;
474 }
475 tokio::task::yield_now().await;
476 }
477
478 panic!("yield_until timed out waiting for condition");
479 }
480
481 pub(crate) async fn advance_until<F>(mut condition: F)
482 where
483 F: FnMut() -> bool,
484 {
485 for _ in 0..MAX_ADVANCE_ITERS {
486 if condition() {
487 return;
488 }
489 tokio::time::advance(Duration::from_millis(1)).await;
490 tokio::task::yield_now().await;
491 }
492
493 panic!("advance_until timed out waiting for condition");
494 }
495
496 #[rstest]
497 fn test_retry_config_default() {
498 let config = RetryConfig::default();
499 assert_eq!(config.max_retries, 3);
500 assert_eq!(config.initial_delay_ms, 1_000);
501 assert_eq!(config.max_delay_ms, 10_000);
502 assert_eq!(config.backoff_factor, 2.0);
503 assert_eq!(config.jitter_ms, 100);
504 assert_eq!(config.operation_timeout_ms, Some(30_000));
505 assert!(!config.immediate_first);
506 assert_eq!(config.max_elapsed_ms, None);
507 }
508
509 #[tokio::test]
510 async fn test_retry_manager_success_first_attempt() {
511 let manager = RetryManager::new(RetryConfig::default());
512
513 let result = manager
514 .execute_with_retry(
515 "test_operation",
516 || async { Ok::<i32, TestError>(42) },
517 should_retry_test_error,
518 create_test_error,
519 )
520 .await;
521
522 assert_eq!(result.unwrap(), 42);
523 }
524
525 #[tokio::test]
526 async fn test_retry_manager_non_retryable_error() {
527 let manager = RetryManager::new(RetryConfig::default());
528
529 let result = manager
530 .execute_with_retry(
531 "test_operation",
532 || async { Err::<i32, TestError>(TestError::NonRetryable("test".to_string())) },
533 should_retry_test_error,
534 create_test_error,
535 )
536 .await;
537
538 assert!(result.is_err());
539 assert!(matches!(result.unwrap_err(), TestError::NonRetryable(_)));
540 }
541
542 #[tokio::test]
543 async fn test_retry_manager_retryable_error_exhausted() {
544 let config = RetryConfig {
545 max_retries: 2,
546 initial_delay_ms: 10,
547 max_delay_ms: 50,
548 backoff_factor: 2.0,
549 jitter_ms: 0,
550 operation_timeout_ms: None,
551 immediate_first: false,
552 max_elapsed_ms: None,
553 };
554 let manager = RetryManager::new(config);
555
556 let result = manager
557 .execute_with_retry(
558 "test_operation",
559 || async { Err::<i32, TestError>(TestError::Retryable("test".to_string())) },
560 should_retry_test_error,
561 create_test_error,
562 )
563 .await;
564
565 assert!(result.is_err());
566 assert!(matches!(result.unwrap_err(), TestError::Retryable(_)));
567 }
568
569 #[tokio::test]
570 async fn test_timeout_path() {
571 let config = RetryConfig {
572 max_retries: 2,
573 initial_delay_ms: 10,
574 max_delay_ms: 50,
575 backoff_factor: 2.0,
576 jitter_ms: 0,
577 operation_timeout_ms: Some(50),
578 immediate_first: false,
579 max_elapsed_ms: None,
580 };
581 let manager = RetryManager::new(config);
582
583 let result = manager
584 .execute_with_retry(
585 "test_timeout",
586 || async {
587 tokio::time::sleep(Duration::from_millis(100)).await;
588 Ok::<i32, TestError>(42)
589 },
590 should_retry_test_error,
591 create_test_error,
592 )
593 .await;
594
595 assert!(result.is_err());
596 assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
597 }
598
599 #[tokio::test]
600 async fn test_max_elapsed_time_budget() {
601 let config = RetryConfig {
602 max_retries: 10,
603 initial_delay_ms: 50,
604 max_delay_ms: 100,
605 backoff_factor: 2.0,
606 jitter_ms: 0,
607 operation_timeout_ms: None,
608 immediate_first: false,
609 max_elapsed_ms: Some(200),
610 };
611 let manager = RetryManager::new(config);
612
613 let start = tokio::time::Instant::now();
614 let result = manager
615 .execute_with_retry(
616 "test_budget",
617 || async { Err::<i32, TestError>(TestError::Retryable("test".to_string())) },
618 should_retry_test_error,
619 create_test_error,
620 )
621 .await;
622
623 let elapsed = start.elapsed();
624 assert!(result.is_err());
625 assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
626 assert!(elapsed.as_millis() >= 150);
627 assert!(elapsed.as_millis() < 1000);
628 }
629
630 #[tokio::test]
631 async fn test_budget_exceeded_message_format() {
632 let config = RetryConfig {
633 max_retries: 5,
634 initial_delay_ms: 10,
635 max_delay_ms: 20,
636 backoff_factor: 1.0,
637 jitter_ms: 0,
638 operation_timeout_ms: None,
639 immediate_first: false,
640 max_elapsed_ms: Some(35),
641 };
642 let manager = RetryManager::new(config);
643
644 let result = manager
645 .execute_with_retry(
646 "test_budget_msg",
647 || async { Err::<i32, TestError>(TestError::Retryable("test".to_string())) },
648 should_retry_test_error,
649 create_test_error,
650 )
651 .await;
652
653 assert!(result.is_err());
654 let error_msg = result.unwrap_err().to_string();
655
656 assert!(error_msg.contains("Retry budget exceeded"));
657 assert!(error_msg.contains("/6)"));
658
659 if let Some(captures) = error_msg.strip_prefix("Timeout error: Retry budget exceeded (")
660 && let Some(nums) = captures.strip_suffix(")")
661 {
662 let parts: Vec<&str> = nums.split('/').collect();
663 assert_eq!(parts.len(), 2);
664 let current: u32 = parts[0].parse().unwrap();
665 let total: u32 = parts[1].parse().unwrap();
666
667 assert_eq!(total, 6, "Total should be max_retries + 1");
668 assert!(current <= total, "Current attempt should not exceed total");
669 assert!(current >= 1, "Current attempt should be at least 1");
670 }
671 }
672
673 #[tokio::test(start_paused = true)]
674 async fn test_budget_exceeded_edge_cases() {
675 let config = RetryConfig {
676 max_retries: 2,
677 initial_delay_ms: 50,
678 max_delay_ms: 100,
679 backoff_factor: 1.0,
680 jitter_ms: 0,
681 operation_timeout_ms: None,
682 immediate_first: false,
683 max_elapsed_ms: Some(100),
684 };
685 let manager = RetryManager::new(config);
686
687 let attempt_count = Arc::new(AtomicU32::new(0));
688 let count_clone = attempt_count.clone();
689
690 let handle = tokio::spawn(async move {
691 manager
692 .execute_with_retry(
693 "test_first_attempt",
694 move || {
695 let count = count_clone.clone();
696 async move {
697 count.fetch_add(1, Ordering::SeqCst);
698 Err::<i32, TestError>(TestError::Retryable("test".to_string()))
699 }
700 },
701 should_retry_test_error,
702 create_test_error,
703 )
704 .await
705 });
706
707 yield_until(|| attempt_count.load(Ordering::SeqCst) >= 1).await;
709
710 tokio::time::advance(Duration::from_millis(101)).await;
712 tokio::task::yield_now().await;
713
714 let result = handle.await.unwrap();
715 assert!(result.is_err());
716 let error_msg = result.unwrap_err().to_string();
717
718 assert!(
720 error_msg.contains("(2/3)"),
721 "Expected (2/3) but got: {error_msg}"
722 );
723 }
724
725 #[tokio::test]
726 async fn test_budget_exceeded_no_overflow() {
727 let config = RetryConfig {
728 max_retries: u32::MAX,
729 initial_delay_ms: 10,
730 max_delay_ms: 20,
731 backoff_factor: 1.0,
732 jitter_ms: 0,
733 operation_timeout_ms: None,
734 immediate_first: false,
735 max_elapsed_ms: Some(1),
736 };
737 let manager = RetryManager::new(config);
738
739 let result = manager
740 .execute_with_retry(
741 "test_overflow",
742 || async { Err::<i32, TestError>(TestError::Retryable("test".to_string())) },
743 should_retry_test_error,
744 create_test_error,
745 )
746 .await;
747
748 assert!(result.is_err());
749 let error_msg = result.unwrap_err().to_string();
750
751 assert!(error_msg.contains("Retry budget exceeded"));
753 assert!(error_msg.contains(&format!("/{}", u32::MAX)));
754 }
755
756 #[rstest]
757 fn test_http_retry_manager_config() {
758 let manager = create_http_retry_manager::<TestError>();
759 assert_eq!(manager.config.max_retries, 3);
760 assert!(!manager.config.immediate_first);
761 assert_eq!(manager.config.max_elapsed_ms, Some(180_000));
762 }
763
764 #[rstest]
765 fn test_websocket_retry_manager_config() {
766 let manager = create_websocket_retry_manager::<TestError>();
767 assert_eq!(manager.config.max_retries, 5);
768 assert!(manager.config.immediate_first);
769 assert_eq!(manager.config.max_elapsed_ms, Some(120_000));
770 }
771
772 #[tokio::test]
773 async fn test_timeout_respects_retry_predicate() {
774 let config = RetryConfig {
775 max_retries: 3,
776 initial_delay_ms: 10,
777 max_delay_ms: 50,
778 backoff_factor: 2.0,
779 jitter_ms: 0,
780 operation_timeout_ms: Some(50),
781 immediate_first: false,
782 max_elapsed_ms: None,
783 };
784 let manager = RetryManager::new(config);
785
786 let should_not_retry_timeouts = |error: &TestError| !matches!(error, TestError::Timeout(_));
788
789 let result = manager
790 .execute_with_retry(
791 "test_timeout_non_retryable",
792 || async {
793 tokio::time::sleep(Duration::from_millis(100)).await;
794 Ok::<i32, TestError>(42)
795 },
796 should_not_retry_timeouts,
797 create_test_error,
798 )
799 .await;
800
801 assert!(result.is_err());
803 assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
804 }
805
806 #[tokio::test]
807 async fn test_timeout_retries_when_predicate_allows() {
808 let config = RetryConfig {
809 max_retries: 2,
810 initial_delay_ms: 10,
811 max_delay_ms: 50,
812 backoff_factor: 2.0,
813 jitter_ms: 0,
814 operation_timeout_ms: Some(50),
815 immediate_first: false,
816 max_elapsed_ms: None,
817 };
818 let manager = RetryManager::new(config);
819
820 let should_retry_timeouts = |error: &TestError| matches!(error, TestError::Timeout(_));
822
823 let start = tokio::time::Instant::now();
824 let result = manager
825 .execute_with_retry(
826 "test_timeout_retryable",
827 || async {
828 tokio::time::sleep(Duration::from_millis(100)).await;
829 Ok::<i32, TestError>(42)
830 },
831 should_retry_timeouts,
832 create_test_error,
833 )
834 .await;
835
836 let elapsed = start.elapsed();
837
838 assert!(result.is_err());
840 assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
841 assert!(elapsed.as_millis() > 80); }
844
845 #[tokio::test]
846 async fn test_successful_retry_after_failures() {
847 let config = RetryConfig {
848 max_retries: 3,
849 initial_delay_ms: 10,
850 max_delay_ms: 50,
851 backoff_factor: 2.0,
852 jitter_ms: 0,
853 operation_timeout_ms: None,
854 immediate_first: false,
855 max_elapsed_ms: None,
856 };
857 let manager = RetryManager::new(config);
858
859 let attempt_counter = Arc::new(AtomicU32::new(0));
860 let counter_clone = attempt_counter.clone();
861
862 let result = manager
863 .execute_with_retry(
864 "test_eventual_success",
865 move || {
866 let counter = counter_clone.clone();
867 async move {
868 let attempts = counter.fetch_add(1, Ordering::SeqCst);
869 if attempts < 2 {
870 Err(TestError::Retryable("temporary failure".to_string()))
871 } else {
872 Ok(42)
873 }
874 }
875 },
876 should_retry_test_error,
877 create_test_error,
878 )
879 .await;
880
881 assert_eq!(result.unwrap(), 42);
882 assert_eq!(attempt_counter.load(Ordering::SeqCst), 3);
883 }
884
885 #[tokio::test(start_paused = true)]
886 async fn test_immediate_first_retry() {
887 let config = RetryConfig {
888 max_retries: 2,
889 initial_delay_ms: 100,
890 max_delay_ms: 200,
891 backoff_factor: 2.0,
892 jitter_ms: 0,
893 operation_timeout_ms: None,
894 immediate_first: true,
895 max_elapsed_ms: None,
896 };
897 let manager = RetryManager::new(config);
898
899 let attempt_times = Arc::new(std::sync::Mutex::new(Vec::new()));
900 let times_clone = attempt_times.clone();
901 let start = tokio::time::Instant::now();
902
903 let handle = tokio::spawn({
904 let times_clone = times_clone.clone();
905 async move {
906 let _ = manager
907 .execute_with_retry(
908 "test_immediate",
909 move || {
910 let times = times_clone.clone();
911 async move {
912 times.lock().expect(MUTEX_POISONED).push(start.elapsed());
913 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
914 }
915 },
916 should_retry_test_error,
917 create_test_error,
918 )
919 .await;
920 }
921 });
922
923 yield_until(|| attempt_times.lock().expect(MUTEX_POISONED).len() >= 2).await;
925
926 tokio::time::advance(Duration::from_millis(100)).await;
928 tokio::task::yield_now().await;
929
930 yield_until(|| attempt_times.lock().expect(MUTEX_POISONED).len() >= 3).await;
932
933 handle.await.unwrap();
934
935 let times = attempt_times.lock().expect(MUTEX_POISONED);
936 assert_eq!(times.len(), 3); assert!(times[1] <= Duration::from_millis(1));
940 assert!(times[2] >= Duration::from_millis(100));
942 assert!(times[2] <= Duration::from_millis(110));
943 }
944
945 #[tokio::test]
946 async fn test_operation_without_timeout() {
947 let config = RetryConfig {
948 max_retries: 2,
949 initial_delay_ms: 10,
950 max_delay_ms: 50,
951 backoff_factor: 2.0,
952 jitter_ms: 0,
953 operation_timeout_ms: None, immediate_first: false,
955 max_elapsed_ms: None,
956 };
957 let manager = RetryManager::new(config);
958
959 let start = tokio::time::Instant::now();
960 let result = manager
961 .execute_with_retry(
962 "test_no_timeout",
963 || async {
964 tokio::time::sleep(Duration::from_millis(50)).await;
965 Ok::<i32, TestError>(42)
966 },
967 should_retry_test_error,
968 create_test_error,
969 )
970 .await;
971
972 let elapsed = start.elapsed();
973 assert_eq!(result.unwrap(), 42);
974 assert!(elapsed.as_millis() >= 30);
976 assert!(elapsed.as_millis() < 200);
977 }
978
979 #[tokio::test]
980 async fn test_zero_retries() {
981 let config = RetryConfig {
982 max_retries: 0,
983 initial_delay_ms: 10,
984 max_delay_ms: 50,
985 backoff_factor: 2.0,
986 jitter_ms: 0,
987 operation_timeout_ms: None,
988 immediate_first: false,
989 max_elapsed_ms: None,
990 };
991 let manager = RetryManager::new(config);
992
993 let attempt_counter = Arc::new(AtomicU32::new(0));
994 let counter_clone = attempt_counter.clone();
995
996 let result = manager
997 .execute_with_retry(
998 "test_no_retries",
999 move || {
1000 let counter = counter_clone.clone();
1001 async move {
1002 counter.fetch_add(1, Ordering::SeqCst);
1003 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1004 }
1005 },
1006 should_retry_test_error,
1007 create_test_error,
1008 )
1009 .await;
1010
1011 assert!(result.is_err());
1012 assert_eq!(attempt_counter.load(Ordering::SeqCst), 1);
1014 }
1015
1016 #[tokio::test(start_paused = true)]
1017 async fn test_jitter_applied() {
1018 let config = RetryConfig {
1019 max_retries: 2,
1020 initial_delay_ms: 50,
1021 max_delay_ms: 100,
1022 backoff_factor: 2.0,
1023 jitter_ms: 50, operation_timeout_ms: None,
1025 immediate_first: false,
1026 max_elapsed_ms: None,
1027 };
1028 let manager = RetryManager::new(config);
1029
1030 let delays = Arc::new(std::sync::Mutex::new(Vec::new()));
1031 let delays_clone = delays.clone();
1032 let last_time = Arc::new(std::sync::Mutex::new(tokio::time::Instant::now()));
1033 let last_time_clone = last_time.clone();
1034
1035 let handle = tokio::spawn({
1036 let delays_clone = delays_clone.clone();
1037 async move {
1038 let _ = manager
1039 .execute_with_retry(
1040 "test_jitter",
1041 move || {
1042 let delays = delays_clone.clone();
1043 let last_time = last_time_clone.clone();
1044 async move {
1045 let now = tokio::time::Instant::now();
1046 let delay = {
1047 let mut last = last_time.lock().expect(MUTEX_POISONED);
1048 let d = now.duration_since(*last);
1049 *last = now;
1050 d
1051 };
1052 delays.lock().expect(MUTEX_POISONED).push(delay);
1053 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1054 }
1055 },
1056 should_retry_test_error,
1057 create_test_error,
1058 )
1059 .await;
1060 }
1061 });
1062
1063 yield_until(|| !delays.lock().expect(MUTEX_POISONED).is_empty()).await;
1064 advance_until(|| delays.lock().expect(MUTEX_POISONED).len() >= 2).await;
1065 advance_until(|| delays.lock().expect(MUTEX_POISONED).len() >= 3).await;
1066
1067 handle.await.unwrap();
1068
1069 let delays = delays.lock().expect(MUTEX_POISONED);
1070 for delay in delays.iter().skip(1) {
1072 assert!(delay.as_millis() >= 50);
1074 assert!(delay.as_millis() <= 151);
1076 }
1077 }
1078
1079 #[tokio::test]
1080 async fn test_max_elapsed_stops_early() {
1081 let config = RetryConfig {
1082 max_retries: 100, initial_delay_ms: 50,
1084 max_delay_ms: 100,
1085 backoff_factor: 1.5,
1086 jitter_ms: 0,
1087 operation_timeout_ms: None,
1088 immediate_first: false,
1089 max_elapsed_ms: Some(150), };
1091 let manager = RetryManager::new(config);
1092
1093 let attempt_counter = Arc::new(AtomicU32::new(0));
1094 let counter_clone = attempt_counter.clone();
1095
1096 let start = tokio::time::Instant::now();
1097 let result = manager
1098 .execute_with_retry(
1099 "test_elapsed_limit",
1100 move || {
1101 let counter = counter_clone.clone();
1102 async move {
1103 counter.fetch_add(1, Ordering::SeqCst);
1104 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1105 }
1106 },
1107 should_retry_test_error,
1108 create_test_error,
1109 )
1110 .await;
1111
1112 let elapsed = start.elapsed();
1113 assert!(result.is_err());
1114 assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
1115
1116 let attempts = attempt_counter.load(Ordering::SeqCst);
1118 assert!(attempts < 10); assert!(elapsed.as_millis() >= 100);
1120 }
1121
1122 #[tokio::test]
1123 async fn test_mixed_errors_retry_behavior() {
1124 let config = RetryConfig {
1125 max_retries: 5,
1126 initial_delay_ms: 10,
1127 max_delay_ms: 50,
1128 backoff_factor: 2.0,
1129 jitter_ms: 0,
1130 operation_timeout_ms: None,
1131 immediate_first: false,
1132 max_elapsed_ms: None,
1133 };
1134 let manager = RetryManager::new(config);
1135
1136 let attempt_counter = Arc::new(AtomicU32::new(0));
1137 let counter_clone = attempt_counter.clone();
1138
1139 let result = manager
1140 .execute_with_retry(
1141 "test_mixed_errors",
1142 move || {
1143 let counter = counter_clone.clone();
1144 async move {
1145 let attempts = counter.fetch_add(1, Ordering::SeqCst);
1146 match attempts {
1147 0 => Err(TestError::Retryable("retry 1".to_string())),
1148 1 => Err(TestError::Retryable("retry 2".to_string())),
1149 2 => Err(TestError::NonRetryable("stop here".to_string())),
1150 _ => Ok(42),
1151 }
1152 }
1153 },
1154 should_retry_test_error,
1155 create_test_error,
1156 )
1157 .await;
1158
1159 assert!(result.is_err());
1160 assert!(matches!(result.unwrap_err(), TestError::NonRetryable(_)));
1161 assert_eq!(attempt_counter.load(Ordering::SeqCst), 3);
1163 }
1164
1165 #[tokio::test]
1166 async fn test_cancellation_during_retry_delay() {
1167 use tokio_util::sync::CancellationToken;
1168
1169 let config = RetryConfig {
1170 max_retries: 10,
1171 initial_delay_ms: 500, max_delay_ms: 1000,
1173 backoff_factor: 2.0,
1174 jitter_ms: 0,
1175 operation_timeout_ms: None,
1176 immediate_first: false,
1177 max_elapsed_ms: None,
1178 };
1179 let manager = RetryManager::new(config);
1180
1181 let token = CancellationToken::new();
1182 let token_clone = token.clone();
1183
1184 tokio::spawn(async move {
1186 tokio::time::sleep(Duration::from_millis(100)).await;
1187 token_clone.cancel();
1188 });
1189
1190 let attempt_counter = Arc::new(AtomicU32::new(0));
1191 let counter_clone = attempt_counter.clone();
1192
1193 let start = tokio::time::Instant::now();
1194 let result = manager
1195 .execute_with_retry_with_cancel(
1196 "test_cancellation",
1197 move || {
1198 let counter = counter_clone.clone();
1199 async move {
1200 counter.fetch_add(1, Ordering::SeqCst);
1201 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1202 }
1203 },
1204 should_retry_test_error,
1205 create_test_error,
1206 &token,
1207 )
1208 .await;
1209
1210 let elapsed = start.elapsed();
1211
1212 assert!(result.is_err());
1214 let error_msg = format!("{}", result.unwrap_err());
1215 assert!(error_msg.contains("canceled"));
1216
1217 assert!(elapsed.as_millis() < 600);
1219
1220 let attempts = attempt_counter.load(Ordering::SeqCst);
1222 assert!(attempts >= 1);
1223 }
1224
1225 #[tokio::test]
1226 async fn test_cancellation_during_operation_execution() {
1227 use tokio_util::sync::CancellationToken;
1228
1229 let config = RetryConfig {
1230 max_retries: 5,
1231 initial_delay_ms: 50,
1232 max_delay_ms: 100,
1233 backoff_factor: 2.0,
1234 jitter_ms: 0,
1235 operation_timeout_ms: None,
1236 immediate_first: false,
1237 max_elapsed_ms: None,
1238 };
1239 let manager = RetryManager::new(config);
1240
1241 let token = CancellationToken::new();
1242 let token_clone = token.clone();
1243
1244 tokio::spawn(async move {
1246 tokio::time::sleep(Duration::from_millis(50)).await;
1247 token_clone.cancel();
1248 });
1249
1250 let start = tokio::time::Instant::now();
1251 let result = manager
1252 .execute_with_retry_with_cancel(
1253 "test_cancellation_during_op",
1254 || async {
1255 tokio::time::sleep(Duration::from_millis(200)).await;
1257 Ok::<i32, TestError>(42)
1258 },
1259 should_retry_test_error,
1260 create_test_error,
1261 &token,
1262 )
1263 .await;
1264
1265 let elapsed = start.elapsed();
1266
1267 assert!(result.is_err());
1269 let error_msg = format!("{}", result.unwrap_err());
1270 assert!(error_msg.contains("canceled"));
1271
1272 assert!(elapsed.as_millis() < 250);
1274 }
1275
1276 #[tokio::test]
1277 async fn test_cancellation_error_message() {
1278 use tokio_util::sync::CancellationToken;
1279
1280 let config = RetryConfig::default();
1281 let manager = RetryManager::new(config);
1282
1283 let token = CancellationToken::new();
1284 token.cancel(); let result = manager
1287 .execute_with_retry_with_cancel(
1288 "test_operation",
1289 || async { Ok::<i32, TestError>(42) },
1290 should_retry_test_error,
1291 create_test_error,
1292 &token,
1293 )
1294 .await;
1295
1296 assert!(result.is_err());
1297 let error_msg = format!("{}", result.unwrap_err());
1298 assert!(error_msg.contains("canceled"));
1299 }
1300}
1301
1302#[cfg(test)]
1303mod proptest_tests {
1304 use std::sync::{
1305 Arc,
1306 atomic::{AtomicU32, Ordering},
1307 };
1308
1309 use nautilus_core::MUTEX_POISONED;
1310 use proptest::prelude::*;
1311 use rstest::rstest;
1313
1314 use super::{
1315 test_utils::*,
1316 tests::{advance_until, yield_until},
1317 *,
1318 };
1319
1320 proptest! {
1321 #[rstest]
1322 fn test_retry_config_valid_ranges(
1323 max_retries in 0u32..100,
1324 initial_delay_ms in 1u64..10_000,
1325 max_delay_ms in 1u64..60_000,
1326 backoff_factor in 1.0f64..10.0,
1327 jitter_ms in 0u64..1_000,
1328 operation_timeout_ms in prop::option::of(1u64..120_000),
1329 immediate_first in any::<bool>(),
1330 max_elapsed_ms in prop::option::of(1u64..300_000)
1331 ) {
1332 let max_delay_ms = max_delay_ms.max(initial_delay_ms);
1334
1335 let config = RetryConfig {
1336 max_retries,
1337 initial_delay_ms,
1338 max_delay_ms,
1339 backoff_factor,
1340 jitter_ms,
1341 operation_timeout_ms,
1342 immediate_first,
1343 max_elapsed_ms,
1344 };
1345
1346 let _manager = RetryManager::<std::io::Error>::new(config);
1348 }
1349
1350 #[rstest]
1351 fn test_retry_attempts_bounded(
1352 max_retries in 0u32..5,
1353 initial_delay_ms in 1u64..10,
1354 backoff_factor in 1.0f64..2.0,
1355 ) {
1356 let rt = tokio::runtime::Builder::new_current_thread()
1357 .enable_time()
1358 .build()
1359 .unwrap();
1360
1361 let config = RetryConfig {
1362 max_retries,
1363 initial_delay_ms,
1364 max_delay_ms: initial_delay_ms * 2,
1365 backoff_factor,
1366 jitter_ms: 0,
1367 operation_timeout_ms: None,
1368 immediate_first: false,
1369 max_elapsed_ms: None,
1370 };
1371
1372 let manager = RetryManager::new(config);
1373 let attempt_counter = Arc::new(AtomicU32::new(0));
1374 let counter_clone = attempt_counter.clone();
1375
1376 let _result = rt.block_on(manager.execute_with_retry(
1377 "prop_test",
1378 move || {
1379 let counter = counter_clone.clone();
1380 async move {
1381 counter.fetch_add(1, Ordering::SeqCst);
1382 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1383 }
1384 },
1385 |e: &TestError| matches!(e, TestError::Retryable(_)),
1386 TestError::Timeout,
1387 ));
1388
1389 let attempts = attempt_counter.load(Ordering::SeqCst);
1390 prop_assert_eq!(attempts, max_retries + 1);
1392 }
1393
1394 #[rstest]
1395 fn test_timeout_always_respected(
1396 timeout_ms in 10u64..50,
1397 operation_delay_ms in 60u64..100,
1398 ) {
1399 let rt = tokio::runtime::Builder::new_current_thread()
1400 .enable_time()
1401 .start_paused(true)
1402 .build()
1403 .unwrap();
1404
1405 let config = RetryConfig {
1406 max_retries: 0, initial_delay_ms: 10,
1408 max_delay_ms: 100,
1409 backoff_factor: 2.0,
1410 jitter_ms: 0,
1411 operation_timeout_ms: Some(timeout_ms),
1412 immediate_first: false,
1413 max_elapsed_ms: None,
1414 };
1415
1416 let manager = RetryManager::new(config);
1417
1418 let result = rt.block_on(async {
1419 let operation_future = manager.execute_with_retry(
1420 "timeout_test",
1421 move || async move {
1422 tokio::time::sleep(Duration::from_millis(operation_delay_ms)).await;
1423 Ok::<i32, TestError>(42)
1424 },
1425 |_: &TestError| true,
1426 TestError::Timeout,
1427 );
1428
1429 tokio::time::advance(Duration::from_millis(timeout_ms + 10)).await;
1431 operation_future.await
1432 });
1433
1434 prop_assert!(result.is_err());
1436 prop_assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
1437 }
1438
1439 #[rstest]
1440 fn test_max_elapsed_always_respected(
1441 max_elapsed_ms in 20u64..50,
1442 delay_per_retry in 15u64..30,
1443 max_retries in 10u32..20,
1444 ) {
1445 let rt = tokio::runtime::Builder::new_current_thread()
1446 .enable_time()
1447 .start_paused(true)
1448 .build()
1449 .unwrap();
1450
1451 let config = RetryConfig {
1453 max_retries,
1454 initial_delay_ms: delay_per_retry,
1455 max_delay_ms: delay_per_retry * 2,
1456 backoff_factor: 1.0, jitter_ms: 0,
1458 operation_timeout_ms: None,
1459 immediate_first: false,
1460 max_elapsed_ms: Some(max_elapsed_ms),
1461 };
1462
1463 let manager = RetryManager::new(config);
1464 let attempt_counter = Arc::new(AtomicU32::new(0));
1465 let counter_clone = attempt_counter.clone();
1466
1467 let result = rt.block_on(async {
1468 let operation_future = manager.execute_with_retry(
1469 "elapsed_test",
1470 move || {
1471 let counter = counter_clone.clone();
1472 async move {
1473 counter.fetch_add(1, Ordering::SeqCst);
1474 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1475 }
1476 },
1477 |e: &TestError| matches!(e, TestError::Retryable(_)),
1478 TestError::Timeout,
1479 );
1480
1481 tokio::time::advance(Duration::from_millis(max_elapsed_ms + delay_per_retry)).await;
1483 operation_future.await
1484 });
1485
1486 let attempts = attempt_counter.load(Ordering::SeqCst);
1487
1488 prop_assert!(result.is_err());
1490 prop_assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
1491
1492 prop_assert!(attempts <= max_retries + 1);
1494 }
1495
1496 #[rstest]
1497 fn test_jitter_bounds(
1498 jitter_ms in 0u64..20,
1499 base_delay_ms in 10u64..30,
1500 ) {
1501 let rt = tokio::runtime::Builder::new_current_thread()
1502 .enable_time()
1503 .start_paused(true)
1504 .build()
1505 .unwrap();
1506
1507 let config = RetryConfig {
1508 max_retries: 2,
1509 initial_delay_ms: base_delay_ms,
1510 max_delay_ms: base_delay_ms * 2,
1511 backoff_factor: 1.0, jitter_ms,
1513 operation_timeout_ms: None,
1514 immediate_first: false,
1515 max_elapsed_ms: None,
1516 };
1517
1518 let manager = RetryManager::new(config);
1519 let attempt_times = Arc::new(std::sync::Mutex::new(Vec::new()));
1520 let attempt_times_for_block = attempt_times.clone();
1521
1522 rt.block_on(async move {
1523 let attempt_times_for_wait = attempt_times_for_block.clone();
1524 let handle = tokio::spawn({
1525 let attempt_times_for_task = attempt_times_for_block.clone();
1526 let manager = manager;
1527 async move {
1528 let start_time = tokio::time::Instant::now();
1529 let _ = manager
1530 .execute_with_retry(
1531 "jitter_test",
1532 move || {
1533 let attempt_times_inner = attempt_times_for_task.clone();
1534 async move {
1535 attempt_times_inner
1536 .lock()
1537 .unwrap()
1538 .push(start_time.elapsed());
1539 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1540 }
1541 },
1542 |e: &TestError| matches!(e, TestError::Retryable(_)),
1543 TestError::Timeout,
1544 )
1545 .await;
1546 }
1547 });
1548
1549 yield_until(|| !attempt_times_for_wait.lock().expect(MUTEX_POISONED).is_empty()).await;
1550 advance_until(|| attempt_times_for_wait.lock().expect(MUTEX_POISONED).len() >= 2).await;
1551 advance_until(|| attempt_times_for_wait.lock().expect(MUTEX_POISONED).len() >= 3).await;
1552
1553 handle.await.unwrap();
1554 });
1555
1556 let times = attempt_times.lock().expect(MUTEX_POISONED);
1557
1558 prop_assert!(times.len() >= 2);
1560
1561 prop_assert!(times[0].as_millis() < 5);
1563
1564 for i in 1..times.len() {
1566 let delay_from_previous = if i == 1 {
1567 times[i] - times[0]
1568 } else {
1569 times[i] - times[i - 1]
1570 };
1571
1572 prop_assert!(
1574 delay_from_previous.as_millis() >= base_delay_ms as u128,
1575 "Retry {} delay {}ms is less than base {}ms",
1576 i, delay_from_previous.as_millis(), base_delay_ms
1577 );
1578
1579 prop_assert!(
1581 delay_from_previous.as_millis() <= (base_delay_ms + jitter_ms + 1) as u128,
1582 "Retry {} delay {}ms exceeds base {} + jitter {}",
1583 i, delay_from_previous.as_millis(), base_delay_ms, jitter_ms
1584 );
1585 }
1586 }
1587
1588 #[rstest]
1589 fn test_immediate_first_property(
1590 immediate_first in any::<bool>(),
1591 initial_delay_ms in 10u64..30,
1592 ) {
1593 let rt = tokio::runtime::Builder::new_current_thread()
1594 .enable_time()
1595 .start_paused(true)
1596 .build()
1597 .unwrap();
1598
1599 let config = RetryConfig {
1600 max_retries: 2,
1601 initial_delay_ms,
1602 max_delay_ms: initial_delay_ms * 2,
1603 backoff_factor: 2.0,
1604 jitter_ms: 0,
1605 operation_timeout_ms: None,
1606 immediate_first,
1607 max_elapsed_ms: None,
1608 };
1609
1610 let manager = RetryManager::new(config);
1611 let attempt_times = Arc::new(std::sync::Mutex::new(Vec::new()));
1612 let attempt_times_for_block = attempt_times.clone();
1613
1614 rt.block_on(async move {
1615 let attempt_times_for_wait = attempt_times_for_block.clone();
1616 let handle = tokio::spawn({
1617 let attempt_times_for_task = attempt_times_for_block.clone();
1618 let manager = manager;
1619 async move {
1620 let start = tokio::time::Instant::now();
1621 let _ = manager
1622 .execute_with_retry(
1623 "immediate_test",
1624 move || {
1625 let attempt_times_inner = attempt_times_for_task.clone();
1626 async move {
1627 let elapsed = start.elapsed();
1628 attempt_times_inner.lock().expect(MUTEX_POISONED).push(elapsed);
1629 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1630 }
1631 },
1632 |e: &TestError| matches!(e, TestError::Retryable(_)),
1633 TestError::Timeout,
1634 )
1635 .await;
1636 }
1637 });
1638
1639 yield_until(|| !attempt_times_for_wait.lock().expect(MUTEX_POISONED).is_empty()).await;
1640 advance_until(|| attempt_times_for_wait.lock().expect(MUTEX_POISONED).len() >= 2).await;
1641 advance_until(|| attempt_times_for_wait.lock().expect(MUTEX_POISONED).len() >= 3).await;
1642
1643 handle.await.unwrap();
1644 });
1645
1646 let times = attempt_times.lock().expect(MUTEX_POISONED);
1647 prop_assert!(times.len() >= 2);
1648
1649 if immediate_first {
1650 prop_assert!(times[1].as_millis() < 20,
1652 "With immediate_first=true, first retry took {}ms",
1653 times[1].as_millis());
1654 } else {
1655 prop_assert!(times[1].as_millis() >= (initial_delay_ms - 1) as u128,
1657 "With immediate_first=false, first retry was too fast: {}ms",
1658 times[1].as_millis());
1659 }
1660 }
1661
1662 #[rstest]
1663 fn test_non_retryable_stops_immediately(
1664 attempt_before_non_retryable in 0usize..3,
1665 max_retries in 3u32..5,
1666 ) {
1667 let rt = tokio::runtime::Builder::new_current_thread()
1668 .enable_time()
1669 .build()
1670 .unwrap();
1671
1672 let config = RetryConfig {
1673 max_retries,
1674 initial_delay_ms: 10,
1675 max_delay_ms: 100,
1676 backoff_factor: 2.0,
1677 jitter_ms: 0,
1678 operation_timeout_ms: None,
1679 immediate_first: false,
1680 max_elapsed_ms: None,
1681 };
1682
1683 let manager = RetryManager::new(config);
1684 let attempt_counter = Arc::new(AtomicU32::new(0));
1685 let counter_clone = attempt_counter.clone();
1686
1687 let result: Result<i32, TestError> = rt.block_on(manager.execute_with_retry(
1688 "non_retryable_test",
1689 move || {
1690 let counter = counter_clone.clone();
1691 async move {
1692 let attempts = counter.fetch_add(1, Ordering::SeqCst) as usize;
1693 if attempts == attempt_before_non_retryable {
1694 Err(TestError::NonRetryable("stop".to_string()))
1695 } else {
1696 Err(TestError::Retryable("retry".to_string()))
1697 }
1698 }
1699 },
1700 |e: &TestError| matches!(e, TestError::Retryable(_)),
1701 TestError::Timeout,
1702 ));
1703
1704 let attempts = attempt_counter.load(Ordering::SeqCst) as usize;
1705
1706 prop_assert!(result.is_err());
1707 prop_assert!(matches!(result.unwrap_err(), TestError::NonRetryable(_)));
1708 prop_assert_eq!(attempts, attempt_before_non_retryable + 1);
1710 }
1711
1712 #[rstest]
1713 fn test_cancellation_stops_immediately(
1714 cancel_after_ms in 10u64..100,
1715 initial_delay_ms in 200u64..500,
1716 ) {
1717 use tokio_util::sync::CancellationToken;
1718
1719 let rt = tokio::runtime::Builder::new_current_thread()
1720 .enable_time()
1721 .start_paused(true)
1722 .build()
1723 .unwrap();
1724
1725 let config = RetryConfig {
1726 max_retries: 10,
1727 initial_delay_ms,
1728 max_delay_ms: initial_delay_ms * 2,
1729 backoff_factor: 2.0,
1730 jitter_ms: 0,
1731 operation_timeout_ms: None,
1732 immediate_first: false,
1733 max_elapsed_ms: None,
1734 };
1735
1736 let manager = RetryManager::new(config);
1737 let token = CancellationToken::new();
1738 let token_clone = token.clone();
1739
1740 let result: Result<i32, TestError> = rt.block_on(async {
1741 tokio::spawn(async move {
1743 tokio::time::sleep(Duration::from_millis(cancel_after_ms)).await;
1744 token_clone.cancel();
1745 });
1746
1747 let operation_future = manager.execute_with_retry_with_cancel(
1748 "cancellation_test",
1749 || async {
1750 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1751 },
1752 |e: &TestError| matches!(e, TestError::Retryable(_)),
1753 create_test_error,
1754 &token,
1755 );
1756
1757 tokio::time::advance(Duration::from_millis(cancel_after_ms + 10)).await;
1759 operation_future.await
1760 });
1761
1762 prop_assert!(result.is_err());
1764 let error_msg = format!("{}", result.unwrap_err());
1765 prop_assert!(error_msg.contains("canceled"));
1766 }
1767
1768 #[rstest]
1769 fn test_budget_clamp_prevents_overshoot(
1770 max_elapsed_ms in 10u64..30,
1771 delay_per_retry in 20u64..50,
1772 ) {
1773 let rt = tokio::runtime::Builder::new_current_thread()
1774 .enable_time()
1775 .start_paused(true)
1776 .build()
1777 .unwrap();
1778
1779 let config = RetryConfig {
1781 max_retries: 5,
1782 initial_delay_ms: delay_per_retry,
1783 max_delay_ms: delay_per_retry * 2,
1784 backoff_factor: 1.0,
1785 jitter_ms: 0,
1786 operation_timeout_ms: None,
1787 immediate_first: false,
1788 max_elapsed_ms: Some(max_elapsed_ms),
1789 };
1790
1791 let manager = RetryManager::new(config);
1792
1793 let _result = rt.block_on(async {
1794 let operation_future = manager.execute_with_retry(
1795 "budget_clamp_test",
1796 || async {
1797 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1799 },
1800 |e: &TestError| matches!(e, TestError::Retryable(_)),
1801 create_test_error,
1802 );
1803
1804 tokio::time::advance(Duration::from_millis(max_elapsed_ms + delay_per_retry)).await;
1806 operation_future.await
1807 });
1808
1809 }
1812
1813 #[rstest]
1814 fn test_success_on_kth_attempt(
1815 k in 1usize..5,
1816 initial_delay_ms in 5u64..20,
1817 ) {
1818 let rt = tokio::runtime::Builder::new_current_thread()
1819 .enable_time()
1820 .start_paused(true)
1821 .build()
1822 .unwrap();
1823
1824 let config = RetryConfig {
1825 max_retries: 10, initial_delay_ms,
1827 max_delay_ms: initial_delay_ms * 4,
1828 backoff_factor: 2.0,
1829 jitter_ms: 0,
1830 operation_timeout_ms: None,
1831 immediate_first: false,
1832 max_elapsed_ms: None,
1833 };
1834
1835 let manager = RetryManager::new(config);
1836 let attempt_counter = Arc::new(AtomicU32::new(0));
1837 let counter_clone = attempt_counter.clone();
1838 let target_k = k;
1839
1840 let (result, _elapsed) = rt.block_on(async {
1841 let start = tokio::time::Instant::now();
1842
1843 let operation_future = manager.execute_with_retry(
1844 "kth_attempt_test",
1845 move || {
1846 let counter = counter_clone.clone();
1847 async move {
1848 let attempt = counter.fetch_add(1, Ordering::SeqCst) as usize;
1849 if attempt + 1 == target_k {
1850 Ok(42)
1851 } else {
1852 Err(TestError::Retryable("retry".to_string()))
1853 }
1854 }
1855 },
1856 |e: &TestError| matches!(e, TestError::Retryable(_)),
1857 create_test_error,
1858 );
1859
1860 for _ in 0..k {
1862 tokio::time::advance(Duration::from_millis(initial_delay_ms * 4)).await;
1863 }
1864
1865 let result = operation_future.await;
1866 let elapsed = start.elapsed();
1867
1868 (result, elapsed)
1869 });
1870
1871 let attempts = attempt_counter.load(Ordering::SeqCst) as usize;
1872
1873 prop_assert!(result.is_ok());
1875 prop_assert_eq!(result.unwrap(), 42);
1876 prop_assert_eq!(attempts, k);
1877 }
1878 }
1879}