1use std::{future::Future, marker::PhantomData, time::Duration};
19
20use tokio_util::sync::CancellationToken;
21
22use crate::backoff::ExponentialBackoff;
23
24#[derive(Debug, Clone)]
26pub struct RetryConfig {
27 pub max_retries: u32,
29 pub initial_delay_ms: u64,
31 pub max_delay_ms: u64,
33 pub backoff_factor: f64,
35 pub jitter_ms: u64,
37 pub operation_timeout_ms: Option<u64>,
40 pub immediate_first: bool,
43 pub max_elapsed_ms: Option<u64>,
46}
47
48impl Default for RetryConfig {
49 fn default() -> Self {
50 Self {
51 max_retries: 3,
52 initial_delay_ms: 1_000,
53 max_delay_ms: 10_000,
54 backoff_factor: 2.0,
55 jitter_ms: 100,
56 operation_timeout_ms: Some(30_000),
57 immediate_first: false,
58 max_elapsed_ms: None,
59 }
60 }
61}
62
63#[derive(Clone, Debug)]
67pub struct RetryManager<E> {
68 config: RetryConfig,
69 _phantom: PhantomData<E>,
70}
71
72impl<E> RetryManager<E>
73where
74 E: std::error::Error,
75{
76 pub const fn new(config: RetryConfig) -> Self {
78 Self {
79 config,
80 _phantom: PhantomData,
81 }
82 }
83
84 #[inline(always)]
86 fn budget_exceeded_msg(&self, attempt: u32) -> String {
87 format!(
88 "Retry budget exceeded ({}/{})",
89 attempt.saturating_add(1),
90 self.config.max_retries.saturating_add(1)
91 )
92 }
93
94 pub async fn execute_with_retry_inner<F, Fut, T>(
108 &self,
109 operation_name: &str,
110 mut operation: F,
111 should_retry: impl Fn(&E) -> bool,
112 create_error: impl Fn(String) -> E,
113 cancel: Option<&CancellationToken>,
114 ) -> Result<T, E>
115 where
116 F: FnMut() -> Fut,
117 Fut: Future<Output = Result<T, E>>,
118 {
119 let mut backoff = ExponentialBackoff::new(
120 Duration::from_millis(self.config.initial_delay_ms),
121 Duration::from_millis(self.config.max_delay_ms),
122 self.config.backoff_factor,
123 self.config.jitter_ms,
124 self.config.immediate_first,
125 )
126 .map_err(|e| create_error(format!("Invalid configuration: {e}")))?;
127
128 let mut attempt = 0;
129 let start_time = tokio::time::Instant::now();
130
131 loop {
132 if let Some(token) = cancel
133 && token.is_cancelled()
134 {
135 log::debug!("Operation '{operation_name}' canceled after {attempt} attempts");
136 return Err(create_error("canceled".to_string()));
137 }
138
139 if let Some(max_elapsed_ms) = self.config.max_elapsed_ms {
140 let elapsed = start_time.elapsed();
141 if elapsed.as_millis() >= u128::from(max_elapsed_ms) {
142 return Err(create_error(self.budget_exceeded_msg(attempt)));
143 }
144 }
145
146 let result = match (self.config.operation_timeout_ms, cancel) {
147 (Some(timeout_ms), Some(token)) => {
148 tokio::select! {
149 result = tokio::time::timeout(Duration::from_millis(timeout_ms), operation()) => result,
150 () = token.cancelled() => {
151 log::debug!("Operation '{operation_name}' canceled during execution");
152 return Err(create_error("canceled".to_string()));
153 }
154 }
155 }
156 (Some(timeout_ms), None) => {
157 tokio::time::timeout(Duration::from_millis(timeout_ms), operation()).await
158 }
159 (None, Some(token)) => {
160 tokio::select! {
161 result = operation() => Ok(result),
162 () = token.cancelled() => {
163 log::debug!("Operation '{operation_name}' canceled during execution");
164 return Err(create_error("canceled".to_string()));
165 }
166 }
167 }
168 (None, None) => Ok(operation().await),
169 };
170
171 match result {
172 Ok(Ok(success)) => {
173 if attempt > 0 {
174 log::trace!(
175 "Operation '{operation_name}' succeeded after {} attempts",
176 attempt + 1
177 );
178 }
179 return Ok(success);
180 }
181 Ok(Err(e)) => {
182 if !should_retry(&e) {
183 log::trace!("Operation '{operation_name}' non-retryable error: {e}");
184 return Err(e);
185 }
186
187 if attempt >= self.config.max_retries {
188 log::trace!(
189 "Operation '{operation_name}' retries exhausted after {} attempts: {e}",
190 attempt + 1
191 );
192 return Err(e);
193 }
194
195 let mut delay = backoff.next_duration();
196
197 if let Some(max_elapsed_ms) = self.config.max_elapsed_ms {
198 let elapsed = start_time.elapsed();
199 let remaining =
200 Duration::from_millis(max_elapsed_ms).saturating_sub(elapsed);
201
202 if remaining.is_zero() {
203 return Err(create_error(self.budget_exceeded_msg(attempt)));
204 }
205
206 delay = delay.min(remaining);
207 }
208
209 log::trace!(
210 "Operation '{operation_name}' attempt {} failed, retrying in {}ms: {e}",
211 attempt + 1,
212 delay.as_millis()
213 );
214
215 if delay.is_zero() {
217 tokio::task::yield_now().await;
218 attempt += 1;
219 continue;
220 }
221
222 if let Some(token) = cancel {
223 tokio::select! {
224 () = tokio::time::sleep(delay) => {},
225 () = token.cancelled() => {
226 log::debug!("Operation '{operation_name}' canceled during retry delay (attempt {})", attempt + 1);
227 return Err(create_error("canceled".to_string()));
228 }
229 }
230 } else {
231 tokio::time::sleep(delay).await;
232 }
233 attempt += 1;
234 }
235 Err(_) => {
236 let e = create_error(format!(
237 "Timed out after {}ms",
238 self.config.operation_timeout_ms.unwrap_or(0)
239 ));
240
241 if !should_retry(&e) {
242 log::trace!("Operation '{operation_name}' non-retryable timeout: {e}");
243 return Err(e);
244 }
245
246 if attempt >= self.config.max_retries {
247 log::trace!(
248 "Operation '{operation_name}' retries exhausted after timeout ({} attempts): {e}",
249 attempt + 1
250 );
251 return Err(e);
252 }
253
254 let mut delay = backoff.next_duration();
255
256 if let Some(max_elapsed_ms) = self.config.max_elapsed_ms {
257 let elapsed = start_time.elapsed();
258 let remaining =
259 Duration::from_millis(max_elapsed_ms).saturating_sub(elapsed);
260
261 if remaining.is_zero() {
262 return Err(create_error(self.budget_exceeded_msg(attempt)));
263 }
264
265 delay = delay.min(remaining);
266 }
267
268 log::trace!(
269 "Operation '{operation_name}' attempt {} timed out, retrying in {}ms: {e}",
270 attempt + 1,
271 delay.as_millis()
272 );
273
274 if delay.is_zero() {
276 tokio::task::yield_now().await;
277 attempt += 1;
278 continue;
279 }
280
281 if let Some(token) = cancel {
282 tokio::select! {
283 () = tokio::time::sleep(delay) => {},
284 () = token.cancelled() => {
285 log::debug!("Operation '{operation_name}' canceled during retry delay (attempt {})", attempt + 1);
286 return Err(create_error("canceled".to_string()));
287 }
288 }
289 } else {
290 tokio::time::sleep(delay).await;
291 }
292 attempt += 1;
293 }
294 }
295 }
296 }
297
298 pub async fn execute_with_retry<F, Fut, T>(
305 &self,
306 operation_name: &str,
307 operation: F,
308 should_retry: impl Fn(&E) -> bool,
309 create_error: impl Fn(String) -> E,
310 ) -> Result<T, E>
311 where
312 F: FnMut() -> Fut,
313 Fut: Future<Output = Result<T, E>>,
314 {
315 self.execute_with_retry_inner(operation_name, operation, should_retry, create_error, None)
316 .await
317 }
318
319 pub async fn execute_with_retry_with_cancel<F, Fut, T>(
326 &self,
327 operation_name: &str,
328 operation: F,
329 should_retry: impl Fn(&E) -> bool,
330 create_error: impl Fn(String) -> E,
331 cancellation_token: &CancellationToken,
332 ) -> Result<T, E>
333 where
334 F: FnMut() -> Fut,
335 Fut: Future<Output = Result<T, E>>,
336 {
337 self.execute_with_retry_inner(
338 operation_name,
339 operation,
340 should_retry,
341 create_error,
342 Some(cancellation_token),
343 )
344 .await
345 }
346}
347
348pub fn create_default_retry_manager<E>() -> RetryManager<E>
350where
351 E: std::error::Error,
352{
353 RetryManager::new(RetryConfig::default())
354}
355
356pub const fn create_http_retry_manager<E>() -> RetryManager<E>
358where
359 E: std::error::Error,
360{
361 let config = RetryConfig {
362 max_retries: 3,
363 initial_delay_ms: 1_000,
364 max_delay_ms: 10_000,
365 backoff_factor: 2.0,
366 jitter_ms: 1_000,
367 operation_timeout_ms: Some(60_000), immediate_first: false,
369 max_elapsed_ms: Some(180_000), };
371 RetryManager::new(config)
372}
373
374pub const fn create_websocket_retry_manager<E>() -> RetryManager<E>
376where
377 E: std::error::Error,
378{
379 let config = RetryConfig {
380 max_retries: 5,
381 initial_delay_ms: 1_000,
382 max_delay_ms: 10_000,
383 backoff_factor: 2.0,
384 jitter_ms: 1_000,
385 operation_timeout_ms: Some(30_000), immediate_first: true,
387 max_elapsed_ms: Some(120_000), };
389 RetryManager::new(config)
390}
391
392#[cfg(test)]
393mod test_utils {
394 #[derive(Debug, thiserror::Error)]
395 pub enum TestError {
396 #[error("Retryable error: {0}")]
397 Retryable(String),
398 #[error("Non-retryable error: {0}")]
399 NonRetryable(String),
400 #[error("Timeout error: {0}")]
401 Timeout(String),
402 }
403
404 pub fn should_retry_test_error(error: &TestError) -> bool {
405 matches!(error, TestError::Retryable(_))
406 }
407
408 pub fn create_test_error(msg: String) -> TestError {
409 TestError::Timeout(msg)
410 }
411}
412
413#[cfg(test)]
414mod tests {
415 use std::sync::{
416 Arc,
417 atomic::{AtomicU32, Ordering},
418 };
419
420 use nautilus_core::MUTEX_POISONED;
421 use rstest::rstest;
422
423 use super::{test_utils::*, *};
424
425 const MAX_WAIT_ITERS: usize = 10_000;
426 const MAX_ADVANCE_ITERS: usize = 10_000;
427
428 pub(crate) async fn yield_until<F>(mut condition: F)
429 where
430 F: FnMut() -> bool,
431 {
432 for _ in 0..MAX_WAIT_ITERS {
433 if condition() {
434 return;
435 }
436 tokio::task::yield_now().await;
437 }
438
439 panic!("yield_until timed out waiting for condition");
440 }
441
442 pub(crate) async fn advance_until<F>(mut condition: F)
443 where
444 F: FnMut() -> bool,
445 {
446 for _ in 0..MAX_ADVANCE_ITERS {
447 if condition() {
448 return;
449 }
450 tokio::time::advance(Duration::from_millis(1)).await;
451 tokio::task::yield_now().await;
452 }
453
454 panic!("advance_until timed out waiting for condition");
455 }
456
457 #[rstest]
458 fn test_retry_config_default() {
459 let config = RetryConfig::default();
460 assert_eq!(config.max_retries, 3);
461 assert_eq!(config.initial_delay_ms, 1_000);
462 assert_eq!(config.max_delay_ms, 10_000);
463 assert_eq!(config.backoff_factor, 2.0);
464 assert_eq!(config.jitter_ms, 100);
465 assert_eq!(config.operation_timeout_ms, Some(30_000));
466 assert!(!config.immediate_first);
467 assert_eq!(config.max_elapsed_ms, None);
468 }
469
470 #[tokio::test]
471 async fn test_retry_manager_success_first_attempt() {
472 let manager = RetryManager::new(RetryConfig::default());
473
474 let result = manager
475 .execute_with_retry(
476 "test_operation",
477 || async { Ok::<i32, TestError>(42) },
478 should_retry_test_error,
479 create_test_error,
480 )
481 .await;
482
483 assert_eq!(result.unwrap(), 42);
484 }
485
486 #[tokio::test]
487 async fn test_retry_manager_non_retryable_error() {
488 let manager = RetryManager::new(RetryConfig::default());
489
490 let result = manager
491 .execute_with_retry(
492 "test_operation",
493 || async { Err::<i32, TestError>(TestError::NonRetryable("test".to_string())) },
494 should_retry_test_error,
495 create_test_error,
496 )
497 .await;
498
499 assert!(result.is_err());
500 assert!(matches!(result.unwrap_err(), TestError::NonRetryable(_)));
501 }
502
503 #[tokio::test]
504 async fn test_retry_manager_retryable_error_exhausted() {
505 let config = RetryConfig {
506 max_retries: 2,
507 initial_delay_ms: 10,
508 max_delay_ms: 50,
509 backoff_factor: 2.0,
510 jitter_ms: 0,
511 operation_timeout_ms: None,
512 immediate_first: false,
513 max_elapsed_ms: None,
514 };
515 let manager = RetryManager::new(config);
516
517 let result = manager
518 .execute_with_retry(
519 "test_operation",
520 || async { Err::<i32, TestError>(TestError::Retryable("test".to_string())) },
521 should_retry_test_error,
522 create_test_error,
523 )
524 .await;
525
526 assert!(result.is_err());
527 assert!(matches!(result.unwrap_err(), TestError::Retryable(_)));
528 }
529
530 #[tokio::test]
531 async fn test_timeout_path() {
532 let config = RetryConfig {
533 max_retries: 2,
534 initial_delay_ms: 10,
535 max_delay_ms: 50,
536 backoff_factor: 2.0,
537 jitter_ms: 0,
538 operation_timeout_ms: Some(50),
539 immediate_first: false,
540 max_elapsed_ms: None,
541 };
542 let manager = RetryManager::new(config);
543
544 let result = manager
545 .execute_with_retry(
546 "test_timeout",
547 || async {
548 tokio::time::sleep(Duration::from_millis(100)).await;
549 Ok::<i32, TestError>(42)
550 },
551 should_retry_test_error,
552 create_test_error,
553 )
554 .await;
555
556 assert!(result.is_err());
557 assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
558 }
559
560 #[tokio::test]
561 async fn test_max_elapsed_time_budget() {
562 let config = RetryConfig {
563 max_retries: 10,
564 initial_delay_ms: 50,
565 max_delay_ms: 100,
566 backoff_factor: 2.0,
567 jitter_ms: 0,
568 operation_timeout_ms: None,
569 immediate_first: false,
570 max_elapsed_ms: Some(200),
571 };
572 let manager = RetryManager::new(config);
573
574 let start = tokio::time::Instant::now();
575 let result = manager
576 .execute_with_retry(
577 "test_budget",
578 || async { Err::<i32, TestError>(TestError::Retryable("test".to_string())) },
579 should_retry_test_error,
580 create_test_error,
581 )
582 .await;
583
584 let elapsed = start.elapsed();
585 assert!(result.is_err());
586 assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
587 assert!(elapsed.as_millis() >= 150);
588 assert!(elapsed.as_millis() < 1000);
589 }
590
591 #[tokio::test]
592 async fn test_budget_exceeded_message_format() {
593 let config = RetryConfig {
594 max_retries: 5,
595 initial_delay_ms: 10,
596 max_delay_ms: 20,
597 backoff_factor: 1.0,
598 jitter_ms: 0,
599 operation_timeout_ms: None,
600 immediate_first: false,
601 max_elapsed_ms: Some(35),
602 };
603 let manager = RetryManager::new(config);
604
605 let result = manager
606 .execute_with_retry(
607 "test_budget_msg",
608 || async { Err::<i32, TestError>(TestError::Retryable("test".to_string())) },
609 should_retry_test_error,
610 create_test_error,
611 )
612 .await;
613
614 assert!(result.is_err());
615 let error_msg = result.unwrap_err().to_string();
616
617 assert!(error_msg.contains("Retry budget exceeded"));
618 assert!(error_msg.contains("/6)"));
619
620 if let Some(captures) = error_msg.strip_prefix("Timeout error: Retry budget exceeded (")
621 && let Some(nums) = captures.strip_suffix(")")
622 {
623 let parts: Vec<&str> = nums.split('/').collect();
624 assert_eq!(parts.len(), 2);
625 let current: u32 = parts[0].parse().unwrap();
626 let total: u32 = parts[1].parse().unwrap();
627
628 assert_eq!(total, 6, "Total should be max_retries + 1");
629 assert!(current <= total, "Current attempt should not exceed total");
630 assert!(current >= 1, "Current attempt should be at least 1");
631 }
632 }
633
634 #[tokio::test(start_paused = true)]
635 async fn test_budget_exceeded_edge_cases() {
636 let config = RetryConfig {
637 max_retries: 2,
638 initial_delay_ms: 50,
639 max_delay_ms: 100,
640 backoff_factor: 1.0,
641 jitter_ms: 0,
642 operation_timeout_ms: None,
643 immediate_first: false,
644 max_elapsed_ms: Some(100),
645 };
646 let manager = RetryManager::new(config);
647
648 let attempt_count = Arc::new(AtomicU32::new(0));
649 let count_clone = attempt_count.clone();
650
651 let handle = tokio::spawn(async move {
652 manager
653 .execute_with_retry(
654 "test_first_attempt",
655 move || {
656 let count = count_clone.clone();
657 async move {
658 count.fetch_add(1, Ordering::SeqCst);
659 Err::<i32, TestError>(TestError::Retryable("test".to_string()))
660 }
661 },
662 should_retry_test_error,
663 create_test_error,
664 )
665 .await
666 });
667
668 yield_until(|| attempt_count.load(Ordering::SeqCst) >= 1).await;
670
671 tokio::time::advance(Duration::from_millis(101)).await;
673 tokio::task::yield_now().await;
674
675 let result = handle.await.unwrap();
676 assert!(result.is_err());
677 let error_msg = result.unwrap_err().to_string();
678
679 assert!(
681 error_msg.contains("(2/3)"),
682 "Expected (2/3) but got: {error_msg}"
683 );
684 }
685
686 #[tokio::test]
687 async fn test_budget_exceeded_no_overflow() {
688 let config = RetryConfig {
689 max_retries: u32::MAX,
690 initial_delay_ms: 10,
691 max_delay_ms: 20,
692 backoff_factor: 1.0,
693 jitter_ms: 0,
694 operation_timeout_ms: None,
695 immediate_first: false,
696 max_elapsed_ms: Some(1),
697 };
698 let manager = RetryManager::new(config);
699
700 let result = manager
701 .execute_with_retry(
702 "test_overflow",
703 || async { Err::<i32, TestError>(TestError::Retryable("test".to_string())) },
704 should_retry_test_error,
705 create_test_error,
706 )
707 .await;
708
709 assert!(result.is_err());
710 let error_msg = result.unwrap_err().to_string();
711
712 assert!(error_msg.contains("Retry budget exceeded"));
714 assert!(error_msg.contains(&format!("/{}", u32::MAX)));
715 }
716
717 #[rstest]
718 fn test_http_retry_manager_config() {
719 let manager = create_http_retry_manager::<TestError>();
720 assert_eq!(manager.config.max_retries, 3);
721 assert!(!manager.config.immediate_first);
722 assert_eq!(manager.config.max_elapsed_ms, Some(180_000));
723 }
724
725 #[rstest]
726 fn test_websocket_retry_manager_config() {
727 let manager = create_websocket_retry_manager::<TestError>();
728 assert_eq!(manager.config.max_retries, 5);
729 assert!(manager.config.immediate_first);
730 assert_eq!(manager.config.max_elapsed_ms, Some(120_000));
731 }
732
733 #[tokio::test]
734 async fn test_timeout_respects_retry_predicate() {
735 let config = RetryConfig {
736 max_retries: 3,
737 initial_delay_ms: 10,
738 max_delay_ms: 50,
739 backoff_factor: 2.0,
740 jitter_ms: 0,
741 operation_timeout_ms: Some(50),
742 immediate_first: false,
743 max_elapsed_ms: None,
744 };
745 let manager = RetryManager::new(config);
746
747 let should_not_retry_timeouts = |error: &TestError| !matches!(error, TestError::Timeout(_));
749
750 let result = manager
751 .execute_with_retry(
752 "test_timeout_non_retryable",
753 || async {
754 tokio::time::sleep(Duration::from_millis(100)).await;
755 Ok::<i32, TestError>(42)
756 },
757 should_not_retry_timeouts,
758 create_test_error,
759 )
760 .await;
761
762 assert!(result.is_err());
764 assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
765 }
766
767 #[tokio::test]
768 async fn test_timeout_retries_when_predicate_allows() {
769 let config = RetryConfig {
770 max_retries: 2,
771 initial_delay_ms: 10,
772 max_delay_ms: 50,
773 backoff_factor: 2.0,
774 jitter_ms: 0,
775 operation_timeout_ms: Some(50),
776 immediate_first: false,
777 max_elapsed_ms: None,
778 };
779 let manager = RetryManager::new(config);
780
781 let should_retry_timeouts = |error: &TestError| matches!(error, TestError::Timeout(_));
783
784 let start = tokio::time::Instant::now();
785 let result = manager
786 .execute_with_retry(
787 "test_timeout_retryable",
788 || async {
789 tokio::time::sleep(Duration::from_millis(100)).await;
790 Ok::<i32, TestError>(42)
791 },
792 should_retry_timeouts,
793 create_test_error,
794 )
795 .await;
796
797 let elapsed = start.elapsed();
798
799 assert!(result.is_err());
801 assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
802 assert!(elapsed.as_millis() > 80); }
805
806 #[tokio::test]
807 async fn test_successful_retry_after_failures() {
808 let config = RetryConfig {
809 max_retries: 3,
810 initial_delay_ms: 10,
811 max_delay_ms: 50,
812 backoff_factor: 2.0,
813 jitter_ms: 0,
814 operation_timeout_ms: None,
815 immediate_first: false,
816 max_elapsed_ms: None,
817 };
818 let manager = RetryManager::new(config);
819
820 let attempt_counter = Arc::new(AtomicU32::new(0));
821 let counter_clone = attempt_counter.clone();
822
823 let result = manager
824 .execute_with_retry(
825 "test_eventual_success",
826 move || {
827 let counter = counter_clone.clone();
828 async move {
829 let attempts = counter.fetch_add(1, Ordering::SeqCst);
830 if attempts < 2 {
831 Err(TestError::Retryable("temporary failure".to_string()))
832 } else {
833 Ok(42)
834 }
835 }
836 },
837 should_retry_test_error,
838 create_test_error,
839 )
840 .await;
841
842 assert_eq!(result.unwrap(), 42);
843 assert_eq!(attempt_counter.load(Ordering::SeqCst), 3);
844 }
845
846 #[tokio::test(start_paused = true)]
847 async fn test_immediate_first_retry() {
848 let config = RetryConfig {
849 max_retries: 2,
850 initial_delay_ms: 100,
851 max_delay_ms: 200,
852 backoff_factor: 2.0,
853 jitter_ms: 0,
854 operation_timeout_ms: None,
855 immediate_first: true,
856 max_elapsed_ms: None,
857 };
858 let manager = RetryManager::new(config);
859
860 let attempt_times = Arc::new(std::sync::Mutex::new(Vec::new()));
861 let times_clone = attempt_times.clone();
862 let start = tokio::time::Instant::now();
863
864 let handle = tokio::spawn({
865 let times_clone = times_clone.clone();
866 async move {
867 let _ = manager
868 .execute_with_retry(
869 "test_immediate",
870 move || {
871 let times = times_clone.clone();
872 async move {
873 times.lock().expect(MUTEX_POISONED).push(start.elapsed());
874 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
875 }
876 },
877 should_retry_test_error,
878 create_test_error,
879 )
880 .await;
881 }
882 });
883
884 yield_until(|| attempt_times.lock().expect(MUTEX_POISONED).len() >= 2).await;
886
887 tokio::time::advance(Duration::from_millis(100)).await;
889 tokio::task::yield_now().await;
890
891 yield_until(|| attempt_times.lock().expect(MUTEX_POISONED).len() >= 3).await;
893
894 handle.await.unwrap();
895
896 let times = attempt_times.lock().expect(MUTEX_POISONED);
897 assert_eq!(times.len(), 3); assert!(times[1] <= Duration::from_millis(1));
901 assert!(times[2] >= Duration::from_millis(100));
903 assert!(times[2] <= Duration::from_millis(110));
904 }
905
906 #[tokio::test]
907 async fn test_operation_without_timeout() {
908 let config = RetryConfig {
909 max_retries: 2,
910 initial_delay_ms: 10,
911 max_delay_ms: 50,
912 backoff_factor: 2.0,
913 jitter_ms: 0,
914 operation_timeout_ms: None, immediate_first: false,
916 max_elapsed_ms: None,
917 };
918 let manager = RetryManager::new(config);
919
920 let start = tokio::time::Instant::now();
921 let result = manager
922 .execute_with_retry(
923 "test_no_timeout",
924 || async {
925 tokio::time::sleep(Duration::from_millis(50)).await;
926 Ok::<i32, TestError>(42)
927 },
928 should_retry_test_error,
929 create_test_error,
930 )
931 .await;
932
933 let elapsed = start.elapsed();
934 assert_eq!(result.unwrap(), 42);
935 assert!(elapsed.as_millis() >= 30);
937 assert!(elapsed.as_millis() < 200);
938 }
939
940 #[tokio::test]
941 async fn test_zero_retries() {
942 let config = RetryConfig {
943 max_retries: 0,
944 initial_delay_ms: 10,
945 max_delay_ms: 50,
946 backoff_factor: 2.0,
947 jitter_ms: 0,
948 operation_timeout_ms: None,
949 immediate_first: false,
950 max_elapsed_ms: None,
951 };
952 let manager = RetryManager::new(config);
953
954 let attempt_counter = Arc::new(AtomicU32::new(0));
955 let counter_clone = attempt_counter.clone();
956
957 let result = manager
958 .execute_with_retry(
959 "test_no_retries",
960 move || {
961 let counter = counter_clone.clone();
962 async move {
963 counter.fetch_add(1, Ordering::SeqCst);
964 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
965 }
966 },
967 should_retry_test_error,
968 create_test_error,
969 )
970 .await;
971
972 assert!(result.is_err());
973 assert_eq!(attempt_counter.load(Ordering::SeqCst), 1);
975 }
976
977 #[tokio::test(start_paused = true)]
978 async fn test_jitter_applied() {
979 let config = RetryConfig {
980 max_retries: 2,
981 initial_delay_ms: 50,
982 max_delay_ms: 100,
983 backoff_factor: 2.0,
984 jitter_ms: 50, operation_timeout_ms: None,
986 immediate_first: false,
987 max_elapsed_ms: None,
988 };
989 let manager = RetryManager::new(config);
990
991 let delays = Arc::new(std::sync::Mutex::new(Vec::new()));
992 let delays_clone = delays.clone();
993 let last_time = Arc::new(std::sync::Mutex::new(tokio::time::Instant::now()));
994 let last_time_clone = last_time.clone();
995
996 let handle = tokio::spawn({
997 let delays_clone = delays_clone.clone();
998 async move {
999 let _ = manager
1000 .execute_with_retry(
1001 "test_jitter",
1002 move || {
1003 let delays = delays_clone.clone();
1004 let last_time = last_time_clone.clone();
1005 async move {
1006 let now = tokio::time::Instant::now();
1007 let delay = {
1008 let mut last = last_time.lock().expect(MUTEX_POISONED);
1009 let d = now.duration_since(*last);
1010 *last = now;
1011 d
1012 };
1013 delays.lock().expect(MUTEX_POISONED).push(delay);
1014 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1015 }
1016 },
1017 should_retry_test_error,
1018 create_test_error,
1019 )
1020 .await;
1021 }
1022 });
1023
1024 yield_until(|| !delays.lock().expect(MUTEX_POISONED).is_empty()).await;
1025 advance_until(|| delays.lock().expect(MUTEX_POISONED).len() >= 2).await;
1026 advance_until(|| delays.lock().expect(MUTEX_POISONED).len() >= 3).await;
1027
1028 handle.await.unwrap();
1029
1030 let delays = delays.lock().expect(MUTEX_POISONED);
1031 for delay in delays.iter().skip(1) {
1033 assert!(delay.as_millis() >= 50);
1035 assert!(delay.as_millis() <= 151);
1037 }
1038 }
1039
1040 #[tokio::test]
1041 async fn test_max_elapsed_stops_early() {
1042 let config = RetryConfig {
1043 max_retries: 100, initial_delay_ms: 50,
1045 max_delay_ms: 100,
1046 backoff_factor: 1.5,
1047 jitter_ms: 0,
1048 operation_timeout_ms: None,
1049 immediate_first: false,
1050 max_elapsed_ms: Some(150), };
1052 let manager = RetryManager::new(config);
1053
1054 let attempt_counter = Arc::new(AtomicU32::new(0));
1055 let counter_clone = attempt_counter.clone();
1056
1057 let start = tokio::time::Instant::now();
1058 let result = manager
1059 .execute_with_retry(
1060 "test_elapsed_limit",
1061 move || {
1062 let counter = counter_clone.clone();
1063 async move {
1064 counter.fetch_add(1, Ordering::SeqCst);
1065 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1066 }
1067 },
1068 should_retry_test_error,
1069 create_test_error,
1070 )
1071 .await;
1072
1073 let elapsed = start.elapsed();
1074 assert!(result.is_err());
1075 assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
1076
1077 let attempts = attempt_counter.load(Ordering::SeqCst);
1079 assert!(attempts < 10); assert!(elapsed.as_millis() >= 100);
1081 }
1082
1083 #[tokio::test]
1084 async fn test_mixed_errors_retry_behavior() {
1085 let config = RetryConfig {
1086 max_retries: 5,
1087 initial_delay_ms: 10,
1088 max_delay_ms: 50,
1089 backoff_factor: 2.0,
1090 jitter_ms: 0,
1091 operation_timeout_ms: None,
1092 immediate_first: false,
1093 max_elapsed_ms: None,
1094 };
1095 let manager = RetryManager::new(config);
1096
1097 let attempt_counter = Arc::new(AtomicU32::new(0));
1098 let counter_clone = attempt_counter.clone();
1099
1100 let result = manager
1101 .execute_with_retry(
1102 "test_mixed_errors",
1103 move || {
1104 let counter = counter_clone.clone();
1105 async move {
1106 let attempts = counter.fetch_add(1, Ordering::SeqCst);
1107 match attempts {
1108 0 => Err(TestError::Retryable("retry 1".to_string())),
1109 1 => Err(TestError::Retryable("retry 2".to_string())),
1110 2 => Err(TestError::NonRetryable("stop here".to_string())),
1111 _ => Ok(42),
1112 }
1113 }
1114 },
1115 should_retry_test_error,
1116 create_test_error,
1117 )
1118 .await;
1119
1120 assert!(result.is_err());
1121 assert!(matches!(result.unwrap_err(), TestError::NonRetryable(_)));
1122 assert_eq!(attempt_counter.load(Ordering::SeqCst), 3);
1124 }
1125
1126 #[tokio::test]
1127 async fn test_cancellation_during_retry_delay() {
1128 use tokio_util::sync::CancellationToken;
1129
1130 let config = RetryConfig {
1131 max_retries: 10,
1132 initial_delay_ms: 500, max_delay_ms: 1000,
1134 backoff_factor: 2.0,
1135 jitter_ms: 0,
1136 operation_timeout_ms: None,
1137 immediate_first: false,
1138 max_elapsed_ms: None,
1139 };
1140 let manager = RetryManager::new(config);
1141
1142 let token = CancellationToken::new();
1143 let token_clone = token.clone();
1144
1145 tokio::spawn(async move {
1147 tokio::time::sleep(Duration::from_millis(100)).await;
1148 token_clone.cancel();
1149 });
1150
1151 let attempt_counter = Arc::new(AtomicU32::new(0));
1152 let counter_clone = attempt_counter.clone();
1153
1154 let start = tokio::time::Instant::now();
1155 let result = manager
1156 .execute_with_retry_with_cancel(
1157 "test_cancellation",
1158 move || {
1159 let counter = counter_clone.clone();
1160 async move {
1161 counter.fetch_add(1, Ordering::SeqCst);
1162 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1163 }
1164 },
1165 should_retry_test_error,
1166 create_test_error,
1167 &token,
1168 )
1169 .await;
1170
1171 let elapsed = start.elapsed();
1172
1173 assert!(result.is_err());
1175 let error_msg = format!("{}", result.unwrap_err());
1176 assert!(error_msg.contains("canceled"));
1177
1178 assert!(elapsed.as_millis() < 600);
1180
1181 let attempts = attempt_counter.load(Ordering::SeqCst);
1183 assert!(attempts >= 1);
1184 }
1185
1186 #[tokio::test]
1187 async fn test_cancellation_during_operation_execution() {
1188 use tokio_util::sync::CancellationToken;
1189
1190 let config = RetryConfig {
1191 max_retries: 5,
1192 initial_delay_ms: 50,
1193 max_delay_ms: 100,
1194 backoff_factor: 2.0,
1195 jitter_ms: 0,
1196 operation_timeout_ms: None,
1197 immediate_first: false,
1198 max_elapsed_ms: None,
1199 };
1200 let manager = RetryManager::new(config);
1201
1202 let token = CancellationToken::new();
1203 let token_clone = token.clone();
1204
1205 tokio::spawn(async move {
1207 tokio::time::sleep(Duration::from_millis(50)).await;
1208 token_clone.cancel();
1209 });
1210
1211 let start = tokio::time::Instant::now();
1212 let result = manager
1213 .execute_with_retry_with_cancel(
1214 "test_cancellation_during_op",
1215 || async {
1216 tokio::time::sleep(Duration::from_millis(200)).await;
1218 Ok::<i32, TestError>(42)
1219 },
1220 should_retry_test_error,
1221 create_test_error,
1222 &token,
1223 )
1224 .await;
1225
1226 let elapsed = start.elapsed();
1227
1228 assert!(result.is_err());
1230 let error_msg = format!("{}", result.unwrap_err());
1231 assert!(error_msg.contains("canceled"));
1232
1233 assert!(elapsed.as_millis() < 250);
1235 }
1236
1237 #[tokio::test]
1238 async fn test_cancellation_error_message() {
1239 use tokio_util::sync::CancellationToken;
1240
1241 let config = RetryConfig::default();
1242 let manager = RetryManager::new(config);
1243
1244 let token = CancellationToken::new();
1245 token.cancel(); let result = manager
1248 .execute_with_retry_with_cancel(
1249 "test_operation",
1250 || async { Ok::<i32, TestError>(42) },
1251 should_retry_test_error,
1252 create_test_error,
1253 &token,
1254 )
1255 .await;
1256
1257 assert!(result.is_err());
1258 let error_msg = format!("{}", result.unwrap_err());
1259 assert!(error_msg.contains("canceled"));
1260 }
1261}
1262
1263#[cfg(test)]
1264mod proptest_tests {
1265 use std::sync::{
1266 Arc,
1267 atomic::{AtomicU32, Ordering},
1268 };
1269
1270 use nautilus_core::MUTEX_POISONED;
1271 use proptest::prelude::*;
1272 use rstest::rstest;
1274
1275 use super::{
1276 test_utils::*,
1277 tests::{advance_until, yield_until},
1278 *,
1279 };
1280
1281 proptest! {
1282 #[rstest]
1283 fn test_retry_config_valid_ranges(
1284 max_retries in 0u32..100,
1285 initial_delay_ms in 1u64..10_000,
1286 max_delay_ms in 1u64..60_000,
1287 backoff_factor in 1.0f64..10.0,
1288 jitter_ms in 0u64..1_000,
1289 operation_timeout_ms in prop::option::of(1u64..120_000),
1290 immediate_first in any::<bool>(),
1291 max_elapsed_ms in prop::option::of(1u64..300_000)
1292 ) {
1293 let max_delay_ms = max_delay_ms.max(initial_delay_ms);
1295
1296 let config = RetryConfig {
1297 max_retries,
1298 initial_delay_ms,
1299 max_delay_ms,
1300 backoff_factor,
1301 jitter_ms,
1302 operation_timeout_ms,
1303 immediate_first,
1304 max_elapsed_ms,
1305 };
1306
1307 let _manager = RetryManager::<std::io::Error>::new(config);
1309 }
1310
1311 #[rstest]
1312 fn test_retry_attempts_bounded(
1313 max_retries in 0u32..5,
1314 initial_delay_ms in 1u64..10,
1315 backoff_factor in 1.0f64..2.0,
1316 ) {
1317 let rt = tokio::runtime::Builder::new_current_thread()
1318 .enable_time()
1319 .build()
1320 .unwrap();
1321
1322 let config = RetryConfig {
1323 max_retries,
1324 initial_delay_ms,
1325 max_delay_ms: initial_delay_ms * 2,
1326 backoff_factor,
1327 jitter_ms: 0,
1328 operation_timeout_ms: None,
1329 immediate_first: false,
1330 max_elapsed_ms: None,
1331 };
1332
1333 let manager = RetryManager::new(config);
1334 let attempt_counter = Arc::new(AtomicU32::new(0));
1335 let counter_clone = attempt_counter.clone();
1336
1337 let _result = rt.block_on(manager.execute_with_retry(
1338 "prop_test",
1339 move || {
1340 let counter = counter_clone.clone();
1341 async move {
1342 counter.fetch_add(1, Ordering::SeqCst);
1343 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1344 }
1345 },
1346 |e: &TestError| matches!(e, TestError::Retryable(_)),
1347 TestError::Timeout,
1348 ));
1349
1350 let attempts = attempt_counter.load(Ordering::SeqCst);
1351 prop_assert_eq!(attempts, max_retries + 1);
1353 }
1354
1355 #[rstest]
1356 fn test_timeout_always_respected(
1357 timeout_ms in 10u64..50,
1358 operation_delay_ms in 60u64..100,
1359 ) {
1360 let rt = tokio::runtime::Builder::new_current_thread()
1361 .enable_time()
1362 .start_paused(true)
1363 .build()
1364 .unwrap();
1365
1366 let config = RetryConfig {
1367 max_retries: 0, initial_delay_ms: 10,
1369 max_delay_ms: 100,
1370 backoff_factor: 2.0,
1371 jitter_ms: 0,
1372 operation_timeout_ms: Some(timeout_ms),
1373 immediate_first: false,
1374 max_elapsed_ms: None,
1375 };
1376
1377 let manager = RetryManager::new(config);
1378
1379 let result = rt.block_on(async {
1380 let operation_future = manager.execute_with_retry(
1381 "timeout_test",
1382 move || async move {
1383 tokio::time::sleep(Duration::from_millis(operation_delay_ms)).await;
1384 Ok::<i32, TestError>(42)
1385 },
1386 |_: &TestError| true,
1387 TestError::Timeout,
1388 );
1389
1390 tokio::time::advance(Duration::from_millis(timeout_ms + 10)).await;
1392 operation_future.await
1393 });
1394
1395 prop_assert!(result.is_err());
1397 prop_assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
1398 }
1399
1400 #[rstest]
1401 fn test_max_elapsed_always_respected(
1402 max_elapsed_ms in 20u64..50,
1403 delay_per_retry in 15u64..30,
1404 max_retries in 10u32..20,
1405 ) {
1406 let rt = tokio::runtime::Builder::new_current_thread()
1407 .enable_time()
1408 .start_paused(true)
1409 .build()
1410 .unwrap();
1411
1412 let config = RetryConfig {
1414 max_retries,
1415 initial_delay_ms: delay_per_retry,
1416 max_delay_ms: delay_per_retry * 2,
1417 backoff_factor: 1.0, jitter_ms: 0,
1419 operation_timeout_ms: None,
1420 immediate_first: false,
1421 max_elapsed_ms: Some(max_elapsed_ms),
1422 };
1423
1424 let manager = RetryManager::new(config);
1425 let attempt_counter = Arc::new(AtomicU32::new(0));
1426 let counter_clone = attempt_counter.clone();
1427
1428 let result = rt.block_on(async {
1429 let operation_future = manager.execute_with_retry(
1430 "elapsed_test",
1431 move || {
1432 let counter = counter_clone.clone();
1433 async move {
1434 counter.fetch_add(1, Ordering::SeqCst);
1435 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1436 }
1437 },
1438 |e: &TestError| matches!(e, TestError::Retryable(_)),
1439 TestError::Timeout,
1440 );
1441
1442 tokio::time::advance(Duration::from_millis(max_elapsed_ms + delay_per_retry)).await;
1444 operation_future.await
1445 });
1446
1447 let attempts = attempt_counter.load(Ordering::SeqCst);
1448
1449 prop_assert!(result.is_err());
1451 prop_assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
1452
1453 prop_assert!(attempts <= max_retries + 1);
1455 }
1456
1457 #[rstest]
1458 fn test_jitter_bounds(
1459 jitter_ms in 0u64..20,
1460 base_delay_ms in 10u64..30,
1461 ) {
1462 let rt = tokio::runtime::Builder::new_current_thread()
1463 .enable_time()
1464 .start_paused(true)
1465 .build()
1466 .unwrap();
1467
1468 let config = RetryConfig {
1469 max_retries: 2,
1470 initial_delay_ms: base_delay_ms,
1471 max_delay_ms: base_delay_ms * 2,
1472 backoff_factor: 1.0, jitter_ms,
1474 operation_timeout_ms: None,
1475 immediate_first: false,
1476 max_elapsed_ms: None,
1477 };
1478
1479 let manager = RetryManager::new(config);
1480 let attempt_times = Arc::new(std::sync::Mutex::new(Vec::new()));
1481 let attempt_times_for_block = attempt_times.clone();
1482
1483 rt.block_on(async move {
1484 let attempt_times_for_wait = attempt_times_for_block.clone();
1485 let handle = tokio::spawn({
1486 let attempt_times_for_task = attempt_times_for_block.clone();
1487 let manager = manager;
1488 async move {
1489 let start_time = tokio::time::Instant::now();
1490 let _ = manager
1491 .execute_with_retry(
1492 "jitter_test",
1493 move || {
1494 let attempt_times_inner = attempt_times_for_task.clone();
1495 async move {
1496 attempt_times_inner
1497 .lock()
1498 .unwrap()
1499 .push(start_time.elapsed());
1500 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1501 }
1502 },
1503 |e: &TestError| matches!(e, TestError::Retryable(_)),
1504 TestError::Timeout,
1505 )
1506 .await;
1507 }
1508 });
1509
1510 yield_until(|| !attempt_times_for_wait.lock().expect(MUTEX_POISONED).is_empty()).await;
1511 advance_until(|| attempt_times_for_wait.lock().expect(MUTEX_POISONED).len() >= 2).await;
1512 advance_until(|| attempt_times_for_wait.lock().expect(MUTEX_POISONED).len() >= 3).await;
1513
1514 handle.await.unwrap();
1515 });
1516
1517 let times = attempt_times.lock().expect(MUTEX_POISONED);
1518
1519 prop_assert!(times.len() >= 2);
1521
1522 prop_assert!(times[0].as_millis() < 5);
1524
1525 for i in 1..times.len() {
1527 let delay_from_previous = if i == 1 {
1528 times[i] - times[0]
1529 } else {
1530 times[i] - times[i - 1]
1531 };
1532
1533 prop_assert!(
1535 delay_from_previous.as_millis() >= base_delay_ms as u128,
1536 "Retry {} delay {}ms is less than base {}ms",
1537 i, delay_from_previous.as_millis(), base_delay_ms
1538 );
1539
1540 prop_assert!(
1542 delay_from_previous.as_millis() <= (base_delay_ms + jitter_ms + 1) as u128,
1543 "Retry {} delay {}ms exceeds base {} + jitter {}",
1544 i, delay_from_previous.as_millis(), base_delay_ms, jitter_ms
1545 );
1546 }
1547 }
1548
1549 #[rstest]
1550 fn test_immediate_first_property(
1551 immediate_first in any::<bool>(),
1552 initial_delay_ms in 10u64..30,
1553 ) {
1554 let rt = tokio::runtime::Builder::new_current_thread()
1555 .enable_time()
1556 .start_paused(true)
1557 .build()
1558 .unwrap();
1559
1560 let config = RetryConfig {
1561 max_retries: 2,
1562 initial_delay_ms,
1563 max_delay_ms: initial_delay_ms * 2,
1564 backoff_factor: 2.0,
1565 jitter_ms: 0,
1566 operation_timeout_ms: None,
1567 immediate_first,
1568 max_elapsed_ms: None,
1569 };
1570
1571 let manager = RetryManager::new(config);
1572 let attempt_times = Arc::new(std::sync::Mutex::new(Vec::new()));
1573 let attempt_times_for_block = attempt_times.clone();
1574
1575 rt.block_on(async move {
1576 let attempt_times_for_wait = attempt_times_for_block.clone();
1577 let handle = tokio::spawn({
1578 let attempt_times_for_task = attempt_times_for_block.clone();
1579 let manager = manager;
1580 async move {
1581 let start = tokio::time::Instant::now();
1582 let _ = manager
1583 .execute_with_retry(
1584 "immediate_test",
1585 move || {
1586 let attempt_times_inner = attempt_times_for_task.clone();
1587 async move {
1588 let elapsed = start.elapsed();
1589 attempt_times_inner.lock().expect(MUTEX_POISONED).push(elapsed);
1590 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1591 }
1592 },
1593 |e: &TestError| matches!(e, TestError::Retryable(_)),
1594 TestError::Timeout,
1595 )
1596 .await;
1597 }
1598 });
1599
1600 yield_until(|| !attempt_times_for_wait.lock().expect(MUTEX_POISONED).is_empty()).await;
1601 advance_until(|| attempt_times_for_wait.lock().expect(MUTEX_POISONED).len() >= 2).await;
1602 advance_until(|| attempt_times_for_wait.lock().expect(MUTEX_POISONED).len() >= 3).await;
1603
1604 handle.await.unwrap();
1605 });
1606
1607 let times = attempt_times.lock().expect(MUTEX_POISONED);
1608 prop_assert!(times.len() >= 2);
1609
1610 if immediate_first {
1611 prop_assert!(times[1].as_millis() < 20,
1613 "With immediate_first=true, first retry took {}ms",
1614 times[1].as_millis());
1615 } else {
1616 prop_assert!(times[1].as_millis() >= (initial_delay_ms - 1) as u128,
1618 "With immediate_first=false, first retry was too fast: {}ms",
1619 times[1].as_millis());
1620 }
1621 }
1622
1623 #[rstest]
1624 fn test_non_retryable_stops_immediately(
1625 attempt_before_non_retryable in 0usize..3,
1626 max_retries in 3u32..5,
1627 ) {
1628 let rt = tokio::runtime::Builder::new_current_thread()
1629 .enable_time()
1630 .build()
1631 .unwrap();
1632
1633 let config = RetryConfig {
1634 max_retries,
1635 initial_delay_ms: 10,
1636 max_delay_ms: 100,
1637 backoff_factor: 2.0,
1638 jitter_ms: 0,
1639 operation_timeout_ms: None,
1640 immediate_first: false,
1641 max_elapsed_ms: None,
1642 };
1643
1644 let manager = RetryManager::new(config);
1645 let attempt_counter = Arc::new(AtomicU32::new(0));
1646 let counter_clone = attempt_counter.clone();
1647
1648 let result: Result<i32, TestError> = rt.block_on(manager.execute_with_retry(
1649 "non_retryable_test",
1650 move || {
1651 let counter = counter_clone.clone();
1652 async move {
1653 let attempts = counter.fetch_add(1, Ordering::SeqCst) as usize;
1654 if attempts == attempt_before_non_retryable {
1655 Err(TestError::NonRetryable("stop".to_string()))
1656 } else {
1657 Err(TestError::Retryable("retry".to_string()))
1658 }
1659 }
1660 },
1661 |e: &TestError| matches!(e, TestError::Retryable(_)),
1662 TestError::Timeout,
1663 ));
1664
1665 let attempts = attempt_counter.load(Ordering::SeqCst) as usize;
1666
1667 prop_assert!(result.is_err());
1668 prop_assert!(matches!(result.unwrap_err(), TestError::NonRetryable(_)));
1669 prop_assert_eq!(attempts, attempt_before_non_retryable + 1);
1671 }
1672
1673 #[rstest]
1674 fn test_cancellation_stops_immediately(
1675 cancel_after_ms in 10u64..100,
1676 initial_delay_ms in 200u64..500,
1677 ) {
1678 use tokio_util::sync::CancellationToken;
1679
1680 let rt = tokio::runtime::Builder::new_current_thread()
1681 .enable_time()
1682 .start_paused(true)
1683 .build()
1684 .unwrap();
1685
1686 let config = RetryConfig {
1687 max_retries: 10,
1688 initial_delay_ms,
1689 max_delay_ms: initial_delay_ms * 2,
1690 backoff_factor: 2.0,
1691 jitter_ms: 0,
1692 operation_timeout_ms: None,
1693 immediate_first: false,
1694 max_elapsed_ms: None,
1695 };
1696
1697 let manager = RetryManager::new(config);
1698 let token = CancellationToken::new();
1699 let token_clone = token.clone();
1700
1701 let result: Result<i32, TestError> = rt.block_on(async {
1702 tokio::spawn(async move {
1704 tokio::time::sleep(Duration::from_millis(cancel_after_ms)).await;
1705 token_clone.cancel();
1706 });
1707
1708 let operation_future = manager.execute_with_retry_with_cancel(
1709 "cancellation_test",
1710 || async {
1711 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1712 },
1713 |e: &TestError| matches!(e, TestError::Retryable(_)),
1714 create_test_error,
1715 &token,
1716 );
1717
1718 tokio::time::advance(Duration::from_millis(cancel_after_ms + 10)).await;
1720 operation_future.await
1721 });
1722
1723 prop_assert!(result.is_err());
1725 let error_msg = format!("{}", result.unwrap_err());
1726 prop_assert!(error_msg.contains("canceled"));
1727 }
1728
1729 #[rstest]
1730 fn test_budget_clamp_prevents_overshoot(
1731 max_elapsed_ms in 10u64..30,
1732 delay_per_retry in 20u64..50,
1733 ) {
1734 let rt = tokio::runtime::Builder::new_current_thread()
1735 .enable_time()
1736 .start_paused(true)
1737 .build()
1738 .unwrap();
1739
1740 let config = RetryConfig {
1742 max_retries: 5,
1743 initial_delay_ms: delay_per_retry,
1744 max_delay_ms: delay_per_retry * 2,
1745 backoff_factor: 1.0,
1746 jitter_ms: 0,
1747 operation_timeout_ms: None,
1748 immediate_first: false,
1749 max_elapsed_ms: Some(max_elapsed_ms),
1750 };
1751
1752 let manager = RetryManager::new(config);
1753
1754 let _result = rt.block_on(async {
1755 let operation_future = manager.execute_with_retry(
1756 "budget_clamp_test",
1757 || async {
1758 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1760 },
1761 |e: &TestError| matches!(e, TestError::Retryable(_)),
1762 create_test_error,
1763 );
1764
1765 tokio::time::advance(Duration::from_millis(max_elapsed_ms + delay_per_retry)).await;
1767 operation_future.await
1768 });
1769
1770 }
1773
1774 #[rstest]
1775 fn test_success_on_kth_attempt(
1776 k in 1usize..5,
1777 initial_delay_ms in 5u64..20,
1778 ) {
1779 let rt = tokio::runtime::Builder::new_current_thread()
1780 .enable_time()
1781 .start_paused(true)
1782 .build()
1783 .unwrap();
1784
1785 let config = RetryConfig {
1786 max_retries: 10, initial_delay_ms,
1788 max_delay_ms: initial_delay_ms * 4,
1789 backoff_factor: 2.0,
1790 jitter_ms: 0,
1791 operation_timeout_ms: None,
1792 immediate_first: false,
1793 max_elapsed_ms: None,
1794 };
1795
1796 let manager = RetryManager::new(config);
1797 let attempt_counter = Arc::new(AtomicU32::new(0));
1798 let counter_clone = attempt_counter.clone();
1799 let target_k = k;
1800
1801 let (result, _elapsed) = rt.block_on(async {
1802 let start = tokio::time::Instant::now();
1803
1804 let operation_future = manager.execute_with_retry(
1805 "kth_attempt_test",
1806 move || {
1807 let counter = counter_clone.clone();
1808 async move {
1809 let attempt = counter.fetch_add(1, Ordering::SeqCst) as usize;
1810 if attempt + 1 == target_k {
1811 Ok(42)
1812 } else {
1813 Err(TestError::Retryable("retry".to_string()))
1814 }
1815 }
1816 },
1817 |e: &TestError| matches!(e, TestError::Retryable(_)),
1818 create_test_error,
1819 );
1820
1821 for _ in 0..k {
1823 tokio::time::advance(Duration::from_millis(initial_delay_ms * 4)).await;
1824 }
1825
1826 let result = operation_future.await;
1827 let elapsed = start.elapsed();
1828
1829 (result, elapsed)
1830 });
1831
1832 let attempts = attempt_counter.load(Ordering::SeqCst) as usize;
1833
1834 prop_assert!(result.is_ok());
1836 prop_assert_eq!(result.unwrap(), 42);
1837 prop_assert_eq!(attempts, k);
1838 }
1839 }
1840}