1use std::{future::Future, marker::PhantomData, time::Duration};
19
20use tokio_util::sync::CancellationToken;
21
22use crate::backoff::ExponentialBackoff;
23
24#[derive(Debug, Clone)]
26pub struct RetryConfig {
27 pub max_retries: u32,
29 pub initial_delay_ms: u64,
31 pub max_delay_ms: u64,
33 pub backoff_factor: f64,
35 pub jitter_ms: u64,
37 pub operation_timeout_ms: Option<u64>,
40 pub immediate_first: bool,
43 pub max_elapsed_ms: Option<u64>,
46}
47
48impl Default for RetryConfig {
49 fn default() -> Self {
50 Self {
51 max_retries: 3,
52 initial_delay_ms: 1_000,
53 max_delay_ms: 10_000,
54 backoff_factor: 2.0,
55 jitter_ms: 100,
56 operation_timeout_ms: Some(30_000),
57 immediate_first: false,
58 max_elapsed_ms: None,
59 }
60 }
61}
62
63#[derive(Clone, Debug)]
67pub struct RetryManager<E> {
68 config: RetryConfig,
69 _phantom: PhantomData<E>,
70}
71
72impl<E> RetryManager<E>
73where
74 E: std::error::Error,
75{
76 pub const fn new(config: RetryConfig) -> Self {
78 Self {
79 config,
80 _phantom: PhantomData,
81 }
82 }
83
84 #[inline(always)]
86 fn budget_exceeded_msg(&self, attempt: u32) -> String {
87 format!(
88 "Retry budget exceeded ({}/{})",
89 attempt.saturating_add(1),
90 self.config.max_retries.saturating_add(1)
91 )
92 }
93
94 pub async fn execute_with_retry_inner<F, Fut, T>(
108 &self,
109 operation_name: &str,
110 mut operation: F,
111 should_retry: impl Fn(&E) -> bool,
112 create_error: impl Fn(String) -> E,
113 cancel: Option<&CancellationToken>,
114 ) -> Result<T, E>
115 where
116 F: FnMut() -> Fut,
117 Fut: Future<Output = Result<T, E>>,
118 {
119 let mut backoff = ExponentialBackoff::new(
120 Duration::from_millis(self.config.initial_delay_ms),
121 Duration::from_millis(self.config.max_delay_ms),
122 self.config.backoff_factor,
123 self.config.jitter_ms,
124 self.config.immediate_first,
125 )
126 .map_err(|e| create_error(format!("Invalid configuration: {e}")))?;
127
128 let mut attempt = 0;
129 let start_time = tokio::time::Instant::now();
130
131 loop {
132 if let Some(token) = cancel
133 && token.is_cancelled()
134 {
135 tracing::debug!(
136 operation = %operation_name,
137 attempts = attempt,
138 "Operation canceled"
139 );
140 return Err(create_error("canceled".to_string()));
141 }
142
143 if let Some(max_elapsed_ms) = self.config.max_elapsed_ms {
144 let elapsed = start_time.elapsed();
145 if elapsed.as_millis() >= u128::from(max_elapsed_ms) {
146 return Err(create_error(self.budget_exceeded_msg(attempt)));
147 }
148 }
149
150 let result = match (self.config.operation_timeout_ms, cancel) {
151 (Some(timeout_ms), Some(token)) => {
152 tokio::select! {
153 result = tokio::time::timeout(Duration::from_millis(timeout_ms), operation()) => result,
154 () = token.cancelled() => {
155 tracing::debug!(
156 operation = %operation_name,
157 "Operation canceled during execution"
158 );
159 return Err(create_error("canceled".to_string()));
160 }
161 }
162 }
163 (Some(timeout_ms), None) => {
164 tokio::time::timeout(Duration::from_millis(timeout_ms), operation()).await
165 }
166 (None, Some(token)) => {
167 tokio::select! {
168 result = operation() => Ok(result),
169 () = token.cancelled() => {
170 tracing::debug!(
171 operation = %operation_name,
172 "Operation canceled during execution"
173 );
174 return Err(create_error("canceled".to_string()));
175 }
176 }
177 }
178 (None, None) => Ok(operation().await),
179 };
180
181 match result {
182 Ok(Ok(success)) => {
183 if attempt > 0 {
184 tracing::trace!(
185 operation = %operation_name,
186 attempts = attempt + 1,
187 "Retry succeeded"
188 );
189 }
190 return Ok(success);
191 }
192 Ok(Err(e)) => {
193 if !should_retry(&e) {
194 tracing::trace!(
195 operation = %operation_name,
196 error = %e,
197 "Non-retryable error"
198 );
199 return Err(e);
200 }
201
202 if attempt >= self.config.max_retries {
203 tracing::trace!(
204 operation = %operation_name,
205 attempts = attempt + 1,
206 error = %e,
207 "Retries exhausted"
208 );
209 return Err(e);
210 }
211
212 let mut delay = backoff.next_duration();
213
214 if let Some(max_elapsed_ms) = self.config.max_elapsed_ms {
215 let elapsed = start_time.elapsed();
216 let remaining =
217 Duration::from_millis(max_elapsed_ms).saturating_sub(elapsed);
218
219 if remaining.is_zero() {
220 return Err(create_error(self.budget_exceeded_msg(attempt)));
221 }
222
223 delay = delay.min(remaining);
224 }
225
226 tracing::trace!(
227 operation = %operation_name,
228 attempt = attempt + 1,
229 delay_ms = delay.as_millis() as u64,
230 error = %e,
231 "Retrying after failure"
232 );
233
234 if delay.is_zero() {
236 tokio::task::yield_now().await;
237 attempt += 1;
238 continue;
239 }
240
241 if let Some(token) = cancel {
242 tokio::select! {
243 () = tokio::time::sleep(delay) => {},
244 () = token.cancelled() => {
245 tracing::debug!(
246 operation = %operation_name,
247 attempt = attempt + 1,
248 "Operation canceled during retry delay"
249 );
250 return Err(create_error("canceled".to_string()));
251 }
252 }
253 } else {
254 tokio::time::sleep(delay).await;
255 }
256 attempt += 1;
257 }
258 Err(_) => {
259 let e = create_error(format!(
260 "Timed out after {}ms",
261 self.config.operation_timeout_ms.unwrap_or(0)
262 ));
263
264 if !should_retry(&e) {
265 tracing::trace!(
266 operation = %operation_name,
267 error = %e,
268 "Non-retryable timeout"
269 );
270 return Err(e);
271 }
272
273 if attempt >= self.config.max_retries {
274 tracing::trace!(
275 operation = %operation_name,
276 attempts = attempt + 1,
277 error = %e,
278 "Retries exhausted after timeout"
279 );
280 return Err(e);
281 }
282
283 let mut delay = backoff.next_duration();
284
285 if let Some(max_elapsed_ms) = self.config.max_elapsed_ms {
286 let elapsed = start_time.elapsed();
287 let remaining =
288 Duration::from_millis(max_elapsed_ms).saturating_sub(elapsed);
289
290 if remaining.is_zero() {
291 return Err(create_error(self.budget_exceeded_msg(attempt)));
292 }
293
294 delay = delay.min(remaining);
295 }
296
297 tracing::trace!(
298 operation = %operation_name,
299 attempt = attempt + 1,
300 delay_ms = delay.as_millis() as u64,
301 error = %e,
302 "Retrying after timeout"
303 );
304
305 if delay.is_zero() {
307 tokio::task::yield_now().await;
308 attempt += 1;
309 continue;
310 }
311
312 if let Some(token) = cancel {
313 tokio::select! {
314 () = tokio::time::sleep(delay) => {},
315 () = token.cancelled() => {
316 tracing::debug!(
317 operation = %operation_name,
318 attempt = attempt + 1,
319 "Operation canceled during retry delay"
320 );
321 return Err(create_error("canceled".to_string()));
322 }
323 }
324 } else {
325 tokio::time::sleep(delay).await;
326 }
327 attempt += 1;
328 }
329 }
330 }
331 }
332
333 pub async fn execute_with_retry<F, Fut, T>(
340 &self,
341 operation_name: &str,
342 operation: F,
343 should_retry: impl Fn(&E) -> bool,
344 create_error: impl Fn(String) -> E,
345 ) -> Result<T, E>
346 where
347 F: FnMut() -> Fut,
348 Fut: Future<Output = Result<T, E>>,
349 {
350 self.execute_with_retry_inner(operation_name, operation, should_retry, create_error, None)
351 .await
352 }
353
354 pub async fn execute_with_retry_with_cancel<F, Fut, T>(
361 &self,
362 operation_name: &str,
363 operation: F,
364 should_retry: impl Fn(&E) -> bool,
365 create_error: impl Fn(String) -> E,
366 cancellation_token: &CancellationToken,
367 ) -> Result<T, E>
368 where
369 F: FnMut() -> Fut,
370 Fut: Future<Output = Result<T, E>>,
371 {
372 self.execute_with_retry_inner(
373 operation_name,
374 operation,
375 should_retry,
376 create_error,
377 Some(cancellation_token),
378 )
379 .await
380 }
381}
382
383pub fn create_default_retry_manager<E>() -> RetryManager<E>
385where
386 E: std::error::Error,
387{
388 RetryManager::new(RetryConfig::default())
389}
390
391pub const fn create_http_retry_manager<E>() -> RetryManager<E>
393where
394 E: std::error::Error,
395{
396 let config = RetryConfig {
397 max_retries: 3,
398 initial_delay_ms: 1_000,
399 max_delay_ms: 10_000,
400 backoff_factor: 2.0,
401 jitter_ms: 1_000,
402 operation_timeout_ms: Some(60_000), immediate_first: false,
404 max_elapsed_ms: Some(180_000), };
406 RetryManager::new(config)
407}
408
409pub const fn create_websocket_retry_manager<E>() -> RetryManager<E>
411where
412 E: std::error::Error,
413{
414 let config = RetryConfig {
415 max_retries: 5,
416 initial_delay_ms: 1_000,
417 max_delay_ms: 10_000,
418 backoff_factor: 2.0,
419 jitter_ms: 1_000,
420 operation_timeout_ms: Some(30_000), immediate_first: true,
422 max_elapsed_ms: Some(120_000), };
424 RetryManager::new(config)
425}
426
427#[cfg(test)]
428mod test_utils {
429 #[derive(Debug, thiserror::Error)]
430 pub enum TestError {
431 #[error("Retryable error: {0}")]
432 Retryable(String),
433 #[error("Non-retryable error: {0}")]
434 NonRetryable(String),
435 #[error("Timeout error: {0}")]
436 Timeout(String),
437 }
438
439 pub fn should_retry_test_error(error: &TestError) -> bool {
440 matches!(error, TestError::Retryable(_))
441 }
442
443 pub fn create_test_error(msg: String) -> TestError {
444 TestError::Timeout(msg)
445 }
446}
447
448#[cfg(test)]
449mod tests {
450 use std::sync::{
451 Arc,
452 atomic::{AtomicU32, Ordering},
453 };
454
455 use nautilus_core::MUTEX_POISONED;
456 use rstest::rstest;
457
458 use super::{test_utils::*, *};
459
460 const MAX_WAIT_ITERS: usize = 10_000;
461 const MAX_ADVANCE_ITERS: usize = 10_000;
462
463 pub(crate) async fn yield_until<F>(mut condition: F)
464 where
465 F: FnMut() -> bool,
466 {
467 for _ in 0..MAX_WAIT_ITERS {
468 if condition() {
469 return;
470 }
471 tokio::task::yield_now().await;
472 }
473
474 panic!("yield_until timed out waiting for condition");
475 }
476
477 pub(crate) async fn advance_until<F>(mut condition: F)
478 where
479 F: FnMut() -> bool,
480 {
481 for _ in 0..MAX_ADVANCE_ITERS {
482 if condition() {
483 return;
484 }
485 tokio::time::advance(Duration::from_millis(1)).await;
486 tokio::task::yield_now().await;
487 }
488
489 panic!("advance_until timed out waiting for condition");
490 }
491
492 #[rstest]
493 fn test_retry_config_default() {
494 let config = RetryConfig::default();
495 assert_eq!(config.max_retries, 3);
496 assert_eq!(config.initial_delay_ms, 1_000);
497 assert_eq!(config.max_delay_ms, 10_000);
498 assert_eq!(config.backoff_factor, 2.0);
499 assert_eq!(config.jitter_ms, 100);
500 assert_eq!(config.operation_timeout_ms, Some(30_000));
501 assert!(!config.immediate_first);
502 assert_eq!(config.max_elapsed_ms, None);
503 }
504
505 #[tokio::test]
506 async fn test_retry_manager_success_first_attempt() {
507 let manager = RetryManager::new(RetryConfig::default());
508
509 let result = manager
510 .execute_with_retry(
511 "test_operation",
512 || async { Ok::<i32, TestError>(42) },
513 should_retry_test_error,
514 create_test_error,
515 )
516 .await;
517
518 assert_eq!(result.unwrap(), 42);
519 }
520
521 #[tokio::test]
522 async fn test_retry_manager_non_retryable_error() {
523 let manager = RetryManager::new(RetryConfig::default());
524
525 let result = manager
526 .execute_with_retry(
527 "test_operation",
528 || async { Err::<i32, TestError>(TestError::NonRetryable("test".to_string())) },
529 should_retry_test_error,
530 create_test_error,
531 )
532 .await;
533
534 assert!(result.is_err());
535 assert!(matches!(result.unwrap_err(), TestError::NonRetryable(_)));
536 }
537
538 #[tokio::test]
539 async fn test_retry_manager_retryable_error_exhausted() {
540 let config = RetryConfig {
541 max_retries: 2,
542 initial_delay_ms: 10,
543 max_delay_ms: 50,
544 backoff_factor: 2.0,
545 jitter_ms: 0,
546 operation_timeout_ms: None,
547 immediate_first: false,
548 max_elapsed_ms: None,
549 };
550 let manager = RetryManager::new(config);
551
552 let result = manager
553 .execute_with_retry(
554 "test_operation",
555 || async { Err::<i32, TestError>(TestError::Retryable("test".to_string())) },
556 should_retry_test_error,
557 create_test_error,
558 )
559 .await;
560
561 assert!(result.is_err());
562 assert!(matches!(result.unwrap_err(), TestError::Retryable(_)));
563 }
564
565 #[tokio::test]
566 async fn test_timeout_path() {
567 let config = RetryConfig {
568 max_retries: 2,
569 initial_delay_ms: 10,
570 max_delay_ms: 50,
571 backoff_factor: 2.0,
572 jitter_ms: 0,
573 operation_timeout_ms: Some(50),
574 immediate_first: false,
575 max_elapsed_ms: None,
576 };
577 let manager = RetryManager::new(config);
578
579 let result = manager
580 .execute_with_retry(
581 "test_timeout",
582 || async {
583 tokio::time::sleep(Duration::from_millis(100)).await;
584 Ok::<i32, TestError>(42)
585 },
586 should_retry_test_error,
587 create_test_error,
588 )
589 .await;
590
591 assert!(result.is_err());
592 assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
593 }
594
595 #[tokio::test]
596 async fn test_max_elapsed_time_budget() {
597 let config = RetryConfig {
598 max_retries: 10,
599 initial_delay_ms: 50,
600 max_delay_ms: 100,
601 backoff_factor: 2.0,
602 jitter_ms: 0,
603 operation_timeout_ms: None,
604 immediate_first: false,
605 max_elapsed_ms: Some(200),
606 };
607 let manager = RetryManager::new(config);
608
609 let start = tokio::time::Instant::now();
610 let result = manager
611 .execute_with_retry(
612 "test_budget",
613 || async { Err::<i32, TestError>(TestError::Retryable("test".to_string())) },
614 should_retry_test_error,
615 create_test_error,
616 )
617 .await;
618
619 let elapsed = start.elapsed();
620 assert!(result.is_err());
621 assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
622 assert!(elapsed.as_millis() >= 150);
623 assert!(elapsed.as_millis() < 1000);
624 }
625
626 #[tokio::test]
627 async fn test_budget_exceeded_message_format() {
628 let config = RetryConfig {
629 max_retries: 5,
630 initial_delay_ms: 10,
631 max_delay_ms: 20,
632 backoff_factor: 1.0,
633 jitter_ms: 0,
634 operation_timeout_ms: None,
635 immediate_first: false,
636 max_elapsed_ms: Some(35),
637 };
638 let manager = RetryManager::new(config);
639
640 let result = manager
641 .execute_with_retry(
642 "test_budget_msg",
643 || async { Err::<i32, TestError>(TestError::Retryable("test".to_string())) },
644 should_retry_test_error,
645 create_test_error,
646 )
647 .await;
648
649 assert!(result.is_err());
650 let error_msg = result.unwrap_err().to_string();
651
652 assert!(error_msg.contains("Retry budget exceeded"));
653 assert!(error_msg.contains("/6)"));
654
655 if let Some(captures) = error_msg.strip_prefix("Timeout error: Retry budget exceeded (")
656 && let Some(nums) = captures.strip_suffix(")")
657 {
658 let parts: Vec<&str> = nums.split('/').collect();
659 assert_eq!(parts.len(), 2);
660 let current: u32 = parts[0].parse().unwrap();
661 let total: u32 = parts[1].parse().unwrap();
662
663 assert_eq!(total, 6, "Total should be max_retries + 1");
664 assert!(current <= total, "Current attempt should not exceed total");
665 assert!(current >= 1, "Current attempt should be at least 1");
666 }
667 }
668
669 #[tokio::test(start_paused = true)]
670 async fn test_budget_exceeded_edge_cases() {
671 let config = RetryConfig {
672 max_retries: 2,
673 initial_delay_ms: 50,
674 max_delay_ms: 100,
675 backoff_factor: 1.0,
676 jitter_ms: 0,
677 operation_timeout_ms: None,
678 immediate_first: false,
679 max_elapsed_ms: Some(100),
680 };
681 let manager = RetryManager::new(config);
682
683 let attempt_count = Arc::new(AtomicU32::new(0));
684 let count_clone = attempt_count.clone();
685
686 let handle = tokio::spawn(async move {
687 manager
688 .execute_with_retry(
689 "test_first_attempt",
690 move || {
691 let count = count_clone.clone();
692 async move {
693 count.fetch_add(1, Ordering::SeqCst);
694 Err::<i32, TestError>(TestError::Retryable("test".to_string()))
695 }
696 },
697 should_retry_test_error,
698 create_test_error,
699 )
700 .await
701 });
702
703 yield_until(|| attempt_count.load(Ordering::SeqCst) >= 1).await;
705
706 tokio::time::advance(Duration::from_millis(101)).await;
708 tokio::task::yield_now().await;
709
710 let result = handle.await.unwrap();
711 assert!(result.is_err());
712 let error_msg = result.unwrap_err().to_string();
713
714 assert!(
716 error_msg.contains("(2/3)"),
717 "Expected (2/3) but got: {error_msg}"
718 );
719 }
720
721 #[tokio::test]
722 async fn test_budget_exceeded_no_overflow() {
723 let config = RetryConfig {
724 max_retries: u32::MAX,
725 initial_delay_ms: 10,
726 max_delay_ms: 20,
727 backoff_factor: 1.0,
728 jitter_ms: 0,
729 operation_timeout_ms: None,
730 immediate_first: false,
731 max_elapsed_ms: Some(1),
732 };
733 let manager = RetryManager::new(config);
734
735 let result = manager
736 .execute_with_retry(
737 "test_overflow",
738 || async { Err::<i32, TestError>(TestError::Retryable("test".to_string())) },
739 should_retry_test_error,
740 create_test_error,
741 )
742 .await;
743
744 assert!(result.is_err());
745 let error_msg = result.unwrap_err().to_string();
746
747 assert!(error_msg.contains("Retry budget exceeded"));
749 assert!(error_msg.contains(&format!("/{}", u32::MAX)));
750 }
751
752 #[rstest]
753 fn test_http_retry_manager_config() {
754 let manager = create_http_retry_manager::<TestError>();
755 assert_eq!(manager.config.max_retries, 3);
756 assert!(!manager.config.immediate_first);
757 assert_eq!(manager.config.max_elapsed_ms, Some(180_000));
758 }
759
760 #[rstest]
761 fn test_websocket_retry_manager_config() {
762 let manager = create_websocket_retry_manager::<TestError>();
763 assert_eq!(manager.config.max_retries, 5);
764 assert!(manager.config.immediate_first);
765 assert_eq!(manager.config.max_elapsed_ms, Some(120_000));
766 }
767
768 #[tokio::test]
769 async fn test_timeout_respects_retry_predicate() {
770 let config = RetryConfig {
771 max_retries: 3,
772 initial_delay_ms: 10,
773 max_delay_ms: 50,
774 backoff_factor: 2.0,
775 jitter_ms: 0,
776 operation_timeout_ms: Some(50),
777 immediate_first: false,
778 max_elapsed_ms: None,
779 };
780 let manager = RetryManager::new(config);
781
782 let should_not_retry_timeouts = |error: &TestError| !matches!(error, TestError::Timeout(_));
784
785 let result = manager
786 .execute_with_retry(
787 "test_timeout_non_retryable",
788 || async {
789 tokio::time::sleep(Duration::from_millis(100)).await;
790 Ok::<i32, TestError>(42)
791 },
792 should_not_retry_timeouts,
793 create_test_error,
794 )
795 .await;
796
797 assert!(result.is_err());
799 assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
800 }
801
802 #[tokio::test]
803 async fn test_timeout_retries_when_predicate_allows() {
804 let config = RetryConfig {
805 max_retries: 2,
806 initial_delay_ms: 10,
807 max_delay_ms: 50,
808 backoff_factor: 2.0,
809 jitter_ms: 0,
810 operation_timeout_ms: Some(50),
811 immediate_first: false,
812 max_elapsed_ms: None,
813 };
814 let manager = RetryManager::new(config);
815
816 let should_retry_timeouts = |error: &TestError| matches!(error, TestError::Timeout(_));
818
819 let start = tokio::time::Instant::now();
820 let result = manager
821 .execute_with_retry(
822 "test_timeout_retryable",
823 || async {
824 tokio::time::sleep(Duration::from_millis(100)).await;
825 Ok::<i32, TestError>(42)
826 },
827 should_retry_timeouts,
828 create_test_error,
829 )
830 .await;
831
832 let elapsed = start.elapsed();
833
834 assert!(result.is_err());
836 assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
837 assert!(elapsed.as_millis() > 80); }
840
841 #[tokio::test]
842 async fn test_successful_retry_after_failures() {
843 let config = RetryConfig {
844 max_retries: 3,
845 initial_delay_ms: 10,
846 max_delay_ms: 50,
847 backoff_factor: 2.0,
848 jitter_ms: 0,
849 operation_timeout_ms: None,
850 immediate_first: false,
851 max_elapsed_ms: None,
852 };
853 let manager = RetryManager::new(config);
854
855 let attempt_counter = Arc::new(AtomicU32::new(0));
856 let counter_clone = attempt_counter.clone();
857
858 let result = manager
859 .execute_with_retry(
860 "test_eventual_success",
861 move || {
862 let counter = counter_clone.clone();
863 async move {
864 let attempts = counter.fetch_add(1, Ordering::SeqCst);
865 if attempts < 2 {
866 Err(TestError::Retryable("temporary failure".to_string()))
867 } else {
868 Ok(42)
869 }
870 }
871 },
872 should_retry_test_error,
873 create_test_error,
874 )
875 .await;
876
877 assert_eq!(result.unwrap(), 42);
878 assert_eq!(attempt_counter.load(Ordering::SeqCst), 3);
879 }
880
881 #[tokio::test(start_paused = true)]
882 async fn test_immediate_first_retry() {
883 let config = RetryConfig {
884 max_retries: 2,
885 initial_delay_ms: 100,
886 max_delay_ms: 200,
887 backoff_factor: 2.0,
888 jitter_ms: 0,
889 operation_timeout_ms: None,
890 immediate_first: true,
891 max_elapsed_ms: None,
892 };
893 let manager = RetryManager::new(config);
894
895 let attempt_times = Arc::new(std::sync::Mutex::new(Vec::new()));
896 let times_clone = attempt_times.clone();
897 let start = tokio::time::Instant::now();
898
899 let handle = tokio::spawn({
900 let times_clone = times_clone.clone();
901 async move {
902 let _ = manager
903 .execute_with_retry(
904 "test_immediate",
905 move || {
906 let times = times_clone.clone();
907 async move {
908 times.lock().expect(MUTEX_POISONED).push(start.elapsed());
909 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
910 }
911 },
912 should_retry_test_error,
913 create_test_error,
914 )
915 .await;
916 }
917 });
918
919 yield_until(|| attempt_times.lock().expect(MUTEX_POISONED).len() >= 2).await;
921
922 tokio::time::advance(Duration::from_millis(100)).await;
924 tokio::task::yield_now().await;
925
926 yield_until(|| attempt_times.lock().expect(MUTEX_POISONED).len() >= 3).await;
928
929 handle.await.unwrap();
930
931 let times = attempt_times.lock().expect(MUTEX_POISONED);
932 assert_eq!(times.len(), 3); assert!(times[1] <= Duration::from_millis(1));
936 assert!(times[2] >= Duration::from_millis(100));
938 assert!(times[2] <= Duration::from_millis(110));
939 }
940
941 #[tokio::test]
942 async fn test_operation_without_timeout() {
943 let config = RetryConfig {
944 max_retries: 2,
945 initial_delay_ms: 10,
946 max_delay_ms: 50,
947 backoff_factor: 2.0,
948 jitter_ms: 0,
949 operation_timeout_ms: None, immediate_first: false,
951 max_elapsed_ms: None,
952 };
953 let manager = RetryManager::new(config);
954
955 let start = tokio::time::Instant::now();
956 let result = manager
957 .execute_with_retry(
958 "test_no_timeout",
959 || async {
960 tokio::time::sleep(Duration::from_millis(50)).await;
961 Ok::<i32, TestError>(42)
962 },
963 should_retry_test_error,
964 create_test_error,
965 )
966 .await;
967
968 let elapsed = start.elapsed();
969 assert_eq!(result.unwrap(), 42);
970 assert!(elapsed.as_millis() >= 30);
972 assert!(elapsed.as_millis() < 200);
973 }
974
975 #[tokio::test]
976 async fn test_zero_retries() {
977 let config = RetryConfig {
978 max_retries: 0,
979 initial_delay_ms: 10,
980 max_delay_ms: 50,
981 backoff_factor: 2.0,
982 jitter_ms: 0,
983 operation_timeout_ms: None,
984 immediate_first: false,
985 max_elapsed_ms: None,
986 };
987 let manager = RetryManager::new(config);
988
989 let attempt_counter = Arc::new(AtomicU32::new(0));
990 let counter_clone = attempt_counter.clone();
991
992 let result = manager
993 .execute_with_retry(
994 "test_no_retries",
995 move || {
996 let counter = counter_clone.clone();
997 async move {
998 counter.fetch_add(1, Ordering::SeqCst);
999 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1000 }
1001 },
1002 should_retry_test_error,
1003 create_test_error,
1004 )
1005 .await;
1006
1007 assert!(result.is_err());
1008 assert_eq!(attempt_counter.load(Ordering::SeqCst), 1);
1010 }
1011
1012 #[tokio::test(start_paused = true)]
1013 async fn test_jitter_applied() {
1014 let config = RetryConfig {
1015 max_retries: 2,
1016 initial_delay_ms: 50,
1017 max_delay_ms: 100,
1018 backoff_factor: 2.0,
1019 jitter_ms: 50, operation_timeout_ms: None,
1021 immediate_first: false,
1022 max_elapsed_ms: None,
1023 };
1024 let manager = RetryManager::new(config);
1025
1026 let delays = Arc::new(std::sync::Mutex::new(Vec::new()));
1027 let delays_clone = delays.clone();
1028 let last_time = Arc::new(std::sync::Mutex::new(tokio::time::Instant::now()));
1029 let last_time_clone = last_time.clone();
1030
1031 let handle = tokio::spawn({
1032 let delays_clone = delays_clone.clone();
1033 async move {
1034 let _ = manager
1035 .execute_with_retry(
1036 "test_jitter",
1037 move || {
1038 let delays = delays_clone.clone();
1039 let last_time = last_time_clone.clone();
1040 async move {
1041 let now = tokio::time::Instant::now();
1042 let delay = {
1043 let mut last = last_time.lock().expect(MUTEX_POISONED);
1044 let d = now.duration_since(*last);
1045 *last = now;
1046 d
1047 };
1048 delays.lock().expect(MUTEX_POISONED).push(delay);
1049 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1050 }
1051 },
1052 should_retry_test_error,
1053 create_test_error,
1054 )
1055 .await;
1056 }
1057 });
1058
1059 yield_until(|| !delays.lock().expect(MUTEX_POISONED).is_empty()).await;
1060 advance_until(|| delays.lock().expect(MUTEX_POISONED).len() >= 2).await;
1061 advance_until(|| delays.lock().expect(MUTEX_POISONED).len() >= 3).await;
1062
1063 handle.await.unwrap();
1064
1065 let delays = delays.lock().expect(MUTEX_POISONED);
1066 for delay in delays.iter().skip(1) {
1068 assert!(delay.as_millis() >= 50);
1070 assert!(delay.as_millis() <= 151);
1072 }
1073 }
1074
1075 #[tokio::test]
1076 async fn test_max_elapsed_stops_early() {
1077 let config = RetryConfig {
1078 max_retries: 100, initial_delay_ms: 50,
1080 max_delay_ms: 100,
1081 backoff_factor: 1.5,
1082 jitter_ms: 0,
1083 operation_timeout_ms: None,
1084 immediate_first: false,
1085 max_elapsed_ms: Some(150), };
1087 let manager = RetryManager::new(config);
1088
1089 let attempt_counter = Arc::new(AtomicU32::new(0));
1090 let counter_clone = attempt_counter.clone();
1091
1092 let start = tokio::time::Instant::now();
1093 let result = manager
1094 .execute_with_retry(
1095 "test_elapsed_limit",
1096 move || {
1097 let counter = counter_clone.clone();
1098 async move {
1099 counter.fetch_add(1, Ordering::SeqCst);
1100 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1101 }
1102 },
1103 should_retry_test_error,
1104 create_test_error,
1105 )
1106 .await;
1107
1108 let elapsed = start.elapsed();
1109 assert!(result.is_err());
1110 assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
1111
1112 let attempts = attempt_counter.load(Ordering::SeqCst);
1114 assert!(attempts < 10); assert!(elapsed.as_millis() >= 100);
1116 }
1117
1118 #[tokio::test]
1119 async fn test_mixed_errors_retry_behavior() {
1120 let config = RetryConfig {
1121 max_retries: 5,
1122 initial_delay_ms: 10,
1123 max_delay_ms: 50,
1124 backoff_factor: 2.0,
1125 jitter_ms: 0,
1126 operation_timeout_ms: None,
1127 immediate_first: false,
1128 max_elapsed_ms: None,
1129 };
1130 let manager = RetryManager::new(config);
1131
1132 let attempt_counter = Arc::new(AtomicU32::new(0));
1133 let counter_clone = attempt_counter.clone();
1134
1135 let result = manager
1136 .execute_with_retry(
1137 "test_mixed_errors",
1138 move || {
1139 let counter = counter_clone.clone();
1140 async move {
1141 let attempts = counter.fetch_add(1, Ordering::SeqCst);
1142 match attempts {
1143 0 => Err(TestError::Retryable("retry 1".to_string())),
1144 1 => Err(TestError::Retryable("retry 2".to_string())),
1145 2 => Err(TestError::NonRetryable("stop here".to_string())),
1146 _ => Ok(42),
1147 }
1148 }
1149 },
1150 should_retry_test_error,
1151 create_test_error,
1152 )
1153 .await;
1154
1155 assert!(result.is_err());
1156 assert!(matches!(result.unwrap_err(), TestError::NonRetryable(_)));
1157 assert_eq!(attempt_counter.load(Ordering::SeqCst), 3);
1159 }
1160
1161 #[tokio::test]
1162 async fn test_cancellation_during_retry_delay() {
1163 use tokio_util::sync::CancellationToken;
1164
1165 let config = RetryConfig {
1166 max_retries: 10,
1167 initial_delay_ms: 500, max_delay_ms: 1000,
1169 backoff_factor: 2.0,
1170 jitter_ms: 0,
1171 operation_timeout_ms: None,
1172 immediate_first: false,
1173 max_elapsed_ms: None,
1174 };
1175 let manager = RetryManager::new(config);
1176
1177 let token = CancellationToken::new();
1178 let token_clone = token.clone();
1179
1180 tokio::spawn(async move {
1182 tokio::time::sleep(Duration::from_millis(100)).await;
1183 token_clone.cancel();
1184 });
1185
1186 let attempt_counter = Arc::new(AtomicU32::new(0));
1187 let counter_clone = attempt_counter.clone();
1188
1189 let start = tokio::time::Instant::now();
1190 let result = manager
1191 .execute_with_retry_with_cancel(
1192 "test_cancellation",
1193 move || {
1194 let counter = counter_clone.clone();
1195 async move {
1196 counter.fetch_add(1, Ordering::SeqCst);
1197 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1198 }
1199 },
1200 should_retry_test_error,
1201 create_test_error,
1202 &token,
1203 )
1204 .await;
1205
1206 let elapsed = start.elapsed();
1207
1208 assert!(result.is_err());
1210 let error_msg = format!("{}", result.unwrap_err());
1211 assert!(error_msg.contains("canceled"));
1212
1213 assert!(elapsed.as_millis() < 600);
1215
1216 let attempts = attempt_counter.load(Ordering::SeqCst);
1218 assert!(attempts >= 1);
1219 }
1220
1221 #[tokio::test]
1222 async fn test_cancellation_during_operation_execution() {
1223 use tokio_util::sync::CancellationToken;
1224
1225 let config = RetryConfig {
1226 max_retries: 5,
1227 initial_delay_ms: 50,
1228 max_delay_ms: 100,
1229 backoff_factor: 2.0,
1230 jitter_ms: 0,
1231 operation_timeout_ms: None,
1232 immediate_first: false,
1233 max_elapsed_ms: None,
1234 };
1235 let manager = RetryManager::new(config);
1236
1237 let token = CancellationToken::new();
1238 let token_clone = token.clone();
1239
1240 tokio::spawn(async move {
1242 tokio::time::sleep(Duration::from_millis(50)).await;
1243 token_clone.cancel();
1244 });
1245
1246 let start = tokio::time::Instant::now();
1247 let result = manager
1248 .execute_with_retry_with_cancel(
1249 "test_cancellation_during_op",
1250 || async {
1251 tokio::time::sleep(Duration::from_millis(200)).await;
1253 Ok::<i32, TestError>(42)
1254 },
1255 should_retry_test_error,
1256 create_test_error,
1257 &token,
1258 )
1259 .await;
1260
1261 let elapsed = start.elapsed();
1262
1263 assert!(result.is_err());
1265 let error_msg = format!("{}", result.unwrap_err());
1266 assert!(error_msg.contains("canceled"));
1267
1268 assert!(elapsed.as_millis() < 250);
1270 }
1271
1272 #[tokio::test]
1273 async fn test_cancellation_error_message() {
1274 use tokio_util::sync::CancellationToken;
1275
1276 let config = RetryConfig::default();
1277 let manager = RetryManager::new(config);
1278
1279 let token = CancellationToken::new();
1280 token.cancel(); let result = manager
1283 .execute_with_retry_with_cancel(
1284 "test_operation",
1285 || async { Ok::<i32, TestError>(42) },
1286 should_retry_test_error,
1287 create_test_error,
1288 &token,
1289 )
1290 .await;
1291
1292 assert!(result.is_err());
1293 let error_msg = format!("{}", result.unwrap_err());
1294 assert!(error_msg.contains("canceled"));
1295 }
1296}
1297
1298#[cfg(test)]
1299mod proptest_tests {
1300 use std::sync::{
1301 Arc,
1302 atomic::{AtomicU32, Ordering},
1303 };
1304
1305 use nautilus_core::MUTEX_POISONED;
1306 use proptest::prelude::*;
1307 use rstest::rstest;
1309
1310 use super::{
1311 test_utils::*,
1312 tests::{advance_until, yield_until},
1313 *,
1314 };
1315
1316 proptest! {
1317 #[rstest]
1318 fn test_retry_config_valid_ranges(
1319 max_retries in 0u32..100,
1320 initial_delay_ms in 1u64..10_000,
1321 max_delay_ms in 1u64..60_000,
1322 backoff_factor in 1.0f64..10.0,
1323 jitter_ms in 0u64..1_000,
1324 operation_timeout_ms in prop::option::of(1u64..120_000),
1325 immediate_first in any::<bool>(),
1326 max_elapsed_ms in prop::option::of(1u64..300_000)
1327 ) {
1328 let max_delay_ms = max_delay_ms.max(initial_delay_ms);
1330
1331 let config = RetryConfig {
1332 max_retries,
1333 initial_delay_ms,
1334 max_delay_ms,
1335 backoff_factor,
1336 jitter_ms,
1337 operation_timeout_ms,
1338 immediate_first,
1339 max_elapsed_ms,
1340 };
1341
1342 let _manager = RetryManager::<std::io::Error>::new(config);
1344 }
1345
1346 #[rstest]
1347 fn test_retry_attempts_bounded(
1348 max_retries in 0u32..5,
1349 initial_delay_ms in 1u64..10,
1350 backoff_factor in 1.0f64..2.0,
1351 ) {
1352 let rt = tokio::runtime::Builder::new_current_thread()
1353 .enable_time()
1354 .build()
1355 .unwrap();
1356
1357 let config = RetryConfig {
1358 max_retries,
1359 initial_delay_ms,
1360 max_delay_ms: initial_delay_ms * 2,
1361 backoff_factor,
1362 jitter_ms: 0,
1363 operation_timeout_ms: None,
1364 immediate_first: false,
1365 max_elapsed_ms: None,
1366 };
1367
1368 let manager = RetryManager::new(config);
1369 let attempt_counter = Arc::new(AtomicU32::new(0));
1370 let counter_clone = attempt_counter.clone();
1371
1372 let _result = rt.block_on(manager.execute_with_retry(
1373 "prop_test",
1374 move || {
1375 let counter = counter_clone.clone();
1376 async move {
1377 counter.fetch_add(1, Ordering::SeqCst);
1378 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1379 }
1380 },
1381 |e: &TestError| matches!(e, TestError::Retryable(_)),
1382 TestError::Timeout,
1383 ));
1384
1385 let attempts = attempt_counter.load(Ordering::SeqCst);
1386 prop_assert_eq!(attempts, max_retries + 1);
1388 }
1389
1390 #[rstest]
1391 fn test_timeout_always_respected(
1392 timeout_ms in 10u64..50,
1393 operation_delay_ms in 60u64..100,
1394 ) {
1395 let rt = tokio::runtime::Builder::new_current_thread()
1396 .enable_time()
1397 .start_paused(true)
1398 .build()
1399 .unwrap();
1400
1401 let config = RetryConfig {
1402 max_retries: 0, initial_delay_ms: 10,
1404 max_delay_ms: 100,
1405 backoff_factor: 2.0,
1406 jitter_ms: 0,
1407 operation_timeout_ms: Some(timeout_ms),
1408 immediate_first: false,
1409 max_elapsed_ms: None,
1410 };
1411
1412 let manager = RetryManager::new(config);
1413
1414 let result = rt.block_on(async {
1415 let operation_future = manager.execute_with_retry(
1416 "timeout_test",
1417 move || async move {
1418 tokio::time::sleep(Duration::from_millis(operation_delay_ms)).await;
1419 Ok::<i32, TestError>(42)
1420 },
1421 |_: &TestError| true,
1422 TestError::Timeout,
1423 );
1424
1425 tokio::time::advance(Duration::from_millis(timeout_ms + 10)).await;
1427 operation_future.await
1428 });
1429
1430 prop_assert!(result.is_err());
1432 prop_assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
1433 }
1434
1435 #[rstest]
1436 fn test_max_elapsed_always_respected(
1437 max_elapsed_ms in 20u64..50,
1438 delay_per_retry in 15u64..30,
1439 max_retries in 10u32..20,
1440 ) {
1441 let rt = tokio::runtime::Builder::new_current_thread()
1442 .enable_time()
1443 .start_paused(true)
1444 .build()
1445 .unwrap();
1446
1447 let config = RetryConfig {
1449 max_retries,
1450 initial_delay_ms: delay_per_retry,
1451 max_delay_ms: delay_per_retry * 2,
1452 backoff_factor: 1.0, jitter_ms: 0,
1454 operation_timeout_ms: None,
1455 immediate_first: false,
1456 max_elapsed_ms: Some(max_elapsed_ms),
1457 };
1458
1459 let manager = RetryManager::new(config);
1460 let attempt_counter = Arc::new(AtomicU32::new(0));
1461 let counter_clone = attempt_counter.clone();
1462
1463 let result = rt.block_on(async {
1464 let operation_future = manager.execute_with_retry(
1465 "elapsed_test",
1466 move || {
1467 let counter = counter_clone.clone();
1468 async move {
1469 counter.fetch_add(1, Ordering::SeqCst);
1470 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1471 }
1472 },
1473 |e: &TestError| matches!(e, TestError::Retryable(_)),
1474 TestError::Timeout,
1475 );
1476
1477 tokio::time::advance(Duration::from_millis(max_elapsed_ms + delay_per_retry)).await;
1479 operation_future.await
1480 });
1481
1482 let attempts = attempt_counter.load(Ordering::SeqCst);
1483
1484 prop_assert!(result.is_err());
1486 prop_assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
1487
1488 prop_assert!(attempts <= max_retries + 1);
1490 }
1491
1492 #[rstest]
1493 fn test_jitter_bounds(
1494 jitter_ms in 0u64..20,
1495 base_delay_ms in 10u64..30,
1496 ) {
1497 let rt = tokio::runtime::Builder::new_current_thread()
1498 .enable_time()
1499 .start_paused(true)
1500 .build()
1501 .unwrap();
1502
1503 let config = RetryConfig {
1504 max_retries: 2,
1505 initial_delay_ms: base_delay_ms,
1506 max_delay_ms: base_delay_ms * 2,
1507 backoff_factor: 1.0, jitter_ms,
1509 operation_timeout_ms: None,
1510 immediate_first: false,
1511 max_elapsed_ms: None,
1512 };
1513
1514 let manager = RetryManager::new(config);
1515 let attempt_times = Arc::new(std::sync::Mutex::new(Vec::new()));
1516 let attempt_times_for_block = attempt_times.clone();
1517
1518 rt.block_on(async move {
1519 let attempt_times_for_wait = attempt_times_for_block.clone();
1520 let handle = tokio::spawn({
1521 let attempt_times_for_task = attempt_times_for_block.clone();
1522 let manager = manager;
1523 async move {
1524 let start_time = tokio::time::Instant::now();
1525 let _ = manager
1526 .execute_with_retry(
1527 "jitter_test",
1528 move || {
1529 let attempt_times_inner = attempt_times_for_task.clone();
1530 async move {
1531 attempt_times_inner
1532 .lock()
1533 .unwrap()
1534 .push(start_time.elapsed());
1535 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1536 }
1537 },
1538 |e: &TestError| matches!(e, TestError::Retryable(_)),
1539 TestError::Timeout,
1540 )
1541 .await;
1542 }
1543 });
1544
1545 yield_until(|| !attempt_times_for_wait.lock().expect(MUTEX_POISONED).is_empty()).await;
1546 advance_until(|| attempt_times_for_wait.lock().expect(MUTEX_POISONED).len() >= 2).await;
1547 advance_until(|| attempt_times_for_wait.lock().expect(MUTEX_POISONED).len() >= 3).await;
1548
1549 handle.await.unwrap();
1550 });
1551
1552 let times = attempt_times.lock().expect(MUTEX_POISONED);
1553
1554 prop_assert!(times.len() >= 2);
1556
1557 prop_assert!(times[0].as_millis() < 5);
1559
1560 for i in 1..times.len() {
1562 let delay_from_previous = if i == 1 {
1563 times[i] - times[0]
1564 } else {
1565 times[i] - times[i - 1]
1566 };
1567
1568 prop_assert!(
1570 delay_from_previous.as_millis() >= base_delay_ms as u128,
1571 "Retry {} delay {}ms is less than base {}ms",
1572 i, delay_from_previous.as_millis(), base_delay_ms
1573 );
1574
1575 prop_assert!(
1577 delay_from_previous.as_millis() <= (base_delay_ms + jitter_ms + 1) as u128,
1578 "Retry {} delay {}ms exceeds base {} + jitter {}",
1579 i, delay_from_previous.as_millis(), base_delay_ms, jitter_ms
1580 );
1581 }
1582 }
1583
1584 #[rstest]
1585 fn test_immediate_first_property(
1586 immediate_first in any::<bool>(),
1587 initial_delay_ms in 10u64..30,
1588 ) {
1589 let rt = tokio::runtime::Builder::new_current_thread()
1590 .enable_time()
1591 .start_paused(true)
1592 .build()
1593 .unwrap();
1594
1595 let config = RetryConfig {
1596 max_retries: 2,
1597 initial_delay_ms,
1598 max_delay_ms: initial_delay_ms * 2,
1599 backoff_factor: 2.0,
1600 jitter_ms: 0,
1601 operation_timeout_ms: None,
1602 immediate_first,
1603 max_elapsed_ms: None,
1604 };
1605
1606 let manager = RetryManager::new(config);
1607 let attempt_times = Arc::new(std::sync::Mutex::new(Vec::new()));
1608 let attempt_times_for_block = attempt_times.clone();
1609
1610 rt.block_on(async move {
1611 let attempt_times_for_wait = attempt_times_for_block.clone();
1612 let handle = tokio::spawn({
1613 let attempt_times_for_task = attempt_times_for_block.clone();
1614 let manager = manager;
1615 async move {
1616 let start = tokio::time::Instant::now();
1617 let _ = manager
1618 .execute_with_retry(
1619 "immediate_test",
1620 move || {
1621 let attempt_times_inner = attempt_times_for_task.clone();
1622 async move {
1623 let elapsed = start.elapsed();
1624 attempt_times_inner.lock().expect(MUTEX_POISONED).push(elapsed);
1625 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1626 }
1627 },
1628 |e: &TestError| matches!(e, TestError::Retryable(_)),
1629 TestError::Timeout,
1630 )
1631 .await;
1632 }
1633 });
1634
1635 yield_until(|| !attempt_times_for_wait.lock().expect(MUTEX_POISONED).is_empty()).await;
1636 advance_until(|| attempt_times_for_wait.lock().expect(MUTEX_POISONED).len() >= 2).await;
1637 advance_until(|| attempt_times_for_wait.lock().expect(MUTEX_POISONED).len() >= 3).await;
1638
1639 handle.await.unwrap();
1640 });
1641
1642 let times = attempt_times.lock().expect(MUTEX_POISONED);
1643 prop_assert!(times.len() >= 2);
1644
1645 if immediate_first {
1646 prop_assert!(times[1].as_millis() < 20,
1648 "With immediate_first=true, first retry took {}ms",
1649 times[1].as_millis());
1650 } else {
1651 prop_assert!(times[1].as_millis() >= (initial_delay_ms - 1) as u128,
1653 "With immediate_first=false, first retry was too fast: {}ms",
1654 times[1].as_millis());
1655 }
1656 }
1657
1658 #[rstest]
1659 fn test_non_retryable_stops_immediately(
1660 attempt_before_non_retryable in 0usize..3,
1661 max_retries in 3u32..5,
1662 ) {
1663 let rt = tokio::runtime::Builder::new_current_thread()
1664 .enable_time()
1665 .build()
1666 .unwrap();
1667
1668 let config = RetryConfig {
1669 max_retries,
1670 initial_delay_ms: 10,
1671 max_delay_ms: 100,
1672 backoff_factor: 2.0,
1673 jitter_ms: 0,
1674 operation_timeout_ms: None,
1675 immediate_first: false,
1676 max_elapsed_ms: None,
1677 };
1678
1679 let manager = RetryManager::new(config);
1680 let attempt_counter = Arc::new(AtomicU32::new(0));
1681 let counter_clone = attempt_counter.clone();
1682
1683 let result: Result<i32, TestError> = rt.block_on(manager.execute_with_retry(
1684 "non_retryable_test",
1685 move || {
1686 let counter = counter_clone.clone();
1687 async move {
1688 let attempts = counter.fetch_add(1, Ordering::SeqCst) as usize;
1689 if attempts == attempt_before_non_retryable {
1690 Err(TestError::NonRetryable("stop".to_string()))
1691 } else {
1692 Err(TestError::Retryable("retry".to_string()))
1693 }
1694 }
1695 },
1696 |e: &TestError| matches!(e, TestError::Retryable(_)),
1697 TestError::Timeout,
1698 ));
1699
1700 let attempts = attempt_counter.load(Ordering::SeqCst) as usize;
1701
1702 prop_assert!(result.is_err());
1703 prop_assert!(matches!(result.unwrap_err(), TestError::NonRetryable(_)));
1704 prop_assert_eq!(attempts, attempt_before_non_retryable + 1);
1706 }
1707
1708 #[rstest]
1709 fn test_cancellation_stops_immediately(
1710 cancel_after_ms in 10u64..100,
1711 initial_delay_ms in 200u64..500,
1712 ) {
1713 use tokio_util::sync::CancellationToken;
1714
1715 let rt = tokio::runtime::Builder::new_current_thread()
1716 .enable_time()
1717 .start_paused(true)
1718 .build()
1719 .unwrap();
1720
1721 let config = RetryConfig {
1722 max_retries: 10,
1723 initial_delay_ms,
1724 max_delay_ms: initial_delay_ms * 2,
1725 backoff_factor: 2.0,
1726 jitter_ms: 0,
1727 operation_timeout_ms: None,
1728 immediate_first: false,
1729 max_elapsed_ms: None,
1730 };
1731
1732 let manager = RetryManager::new(config);
1733 let token = CancellationToken::new();
1734 let token_clone = token.clone();
1735
1736 let result: Result<i32, TestError> = rt.block_on(async {
1737 tokio::spawn(async move {
1739 tokio::time::sleep(Duration::from_millis(cancel_after_ms)).await;
1740 token_clone.cancel();
1741 });
1742
1743 let operation_future = manager.execute_with_retry_with_cancel(
1744 "cancellation_test",
1745 || async {
1746 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1747 },
1748 |e: &TestError| matches!(e, TestError::Retryable(_)),
1749 create_test_error,
1750 &token,
1751 );
1752
1753 tokio::time::advance(Duration::from_millis(cancel_after_ms + 10)).await;
1755 operation_future.await
1756 });
1757
1758 prop_assert!(result.is_err());
1760 let error_msg = format!("{}", result.unwrap_err());
1761 prop_assert!(error_msg.contains("canceled"));
1762 }
1763
1764 #[rstest]
1765 fn test_budget_clamp_prevents_overshoot(
1766 max_elapsed_ms in 10u64..30,
1767 delay_per_retry in 20u64..50,
1768 ) {
1769 let rt = tokio::runtime::Builder::new_current_thread()
1770 .enable_time()
1771 .start_paused(true)
1772 .build()
1773 .unwrap();
1774
1775 let config = RetryConfig {
1777 max_retries: 5,
1778 initial_delay_ms: delay_per_retry,
1779 max_delay_ms: delay_per_retry * 2,
1780 backoff_factor: 1.0,
1781 jitter_ms: 0,
1782 operation_timeout_ms: None,
1783 immediate_first: false,
1784 max_elapsed_ms: Some(max_elapsed_ms),
1785 };
1786
1787 let manager = RetryManager::new(config);
1788
1789 let _result = rt.block_on(async {
1790 let operation_future = manager.execute_with_retry(
1791 "budget_clamp_test",
1792 || async {
1793 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1795 },
1796 |e: &TestError| matches!(e, TestError::Retryable(_)),
1797 create_test_error,
1798 );
1799
1800 tokio::time::advance(Duration::from_millis(max_elapsed_ms + delay_per_retry)).await;
1802 operation_future.await
1803 });
1804
1805 }
1808
1809 #[rstest]
1810 fn test_success_on_kth_attempt(
1811 k in 1usize..5,
1812 initial_delay_ms in 5u64..20,
1813 ) {
1814 let rt = tokio::runtime::Builder::new_current_thread()
1815 .enable_time()
1816 .start_paused(true)
1817 .build()
1818 .unwrap();
1819
1820 let config = RetryConfig {
1821 max_retries: 10, initial_delay_ms,
1823 max_delay_ms: initial_delay_ms * 4,
1824 backoff_factor: 2.0,
1825 jitter_ms: 0,
1826 operation_timeout_ms: None,
1827 immediate_first: false,
1828 max_elapsed_ms: None,
1829 };
1830
1831 let manager = RetryManager::new(config);
1832 let attempt_counter = Arc::new(AtomicU32::new(0));
1833 let counter_clone = attempt_counter.clone();
1834 let target_k = k;
1835
1836 let (result, _elapsed) = rt.block_on(async {
1837 let start = tokio::time::Instant::now();
1838
1839 let operation_future = manager.execute_with_retry(
1840 "kth_attempt_test",
1841 move || {
1842 let counter = counter_clone.clone();
1843 async move {
1844 let attempt = counter.fetch_add(1, Ordering::SeqCst) as usize;
1845 if attempt + 1 == target_k {
1846 Ok(42)
1847 } else {
1848 Err(TestError::Retryable("retry".to_string()))
1849 }
1850 }
1851 },
1852 |e: &TestError| matches!(e, TestError::Retryable(_)),
1853 create_test_error,
1854 );
1855
1856 for _ in 0..k {
1858 tokio::time::advance(Duration::from_millis(initial_delay_ms * 4)).await;
1859 }
1860
1861 let result = operation_future.await;
1862 let elapsed = start.elapsed();
1863
1864 (result, elapsed)
1865 });
1866
1867 let attempts = attempt_counter.load(Ordering::SeqCst) as usize;
1868
1869 prop_assert!(result.is_ok());
1871 prop_assert_eq!(result.unwrap(), 42);
1872 prop_assert_eq!(attempts, k);
1873 }
1874 }
1875}