nautilus_network/
retry.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Generic retry mechanism for network operations.
17
18use std::{future::Future, marker::PhantomData, time::Duration};
19
20use tokio_util::sync::CancellationToken;
21
22use crate::backoff::ExponentialBackoff;
23
24/// Configuration for retry behavior.
25#[derive(Debug, Clone)]
26pub struct RetryConfig {
27    /// Maximum number of retry attempts (total attempts = 1 initial + `max_retries`).
28    pub max_retries: u32,
29    /// Initial delay between retries in milliseconds.
30    pub initial_delay_ms: u64,
31    /// Maximum delay between retries in milliseconds.
32    pub max_delay_ms: u64,
33    /// Backoff multiplier factor.
34    pub backoff_factor: f64,
35    /// Maximum jitter in milliseconds to add to delays.
36    pub jitter_ms: u64,
37    /// Optional timeout for individual operations in milliseconds.
38    /// If None, no timeout is applied.
39    pub operation_timeout_ms: Option<u64>,
40    /// Whether the first retry should happen immediately without delay.
41    /// Should be false for HTTP/order operations, true for connection operations.
42    pub immediate_first: bool,
43    /// Optional maximum total elapsed time for all retries in milliseconds.
44    /// If exceeded, retries stop even if `max_retries` hasn't been reached.
45    pub max_elapsed_ms: Option<u64>,
46}
47
48impl Default for RetryConfig {
49    fn default() -> Self {
50        Self {
51            max_retries: 3,
52            initial_delay_ms: 1_000,
53            max_delay_ms: 10_000,
54            backoff_factor: 2.0,
55            jitter_ms: 100,
56            operation_timeout_ms: Some(30_000),
57            immediate_first: false,
58            max_elapsed_ms: None,
59        }
60    }
61}
62
63/// Generic retry manager for network operations.
64///
65/// Stateless and thread-safe - each operation maintains its own backoff state.
66#[derive(Clone, Debug)]
67pub struct RetryManager<E> {
68    config: RetryConfig,
69    _phantom: PhantomData<E>,
70}
71
72impl<E> RetryManager<E>
73where
74    E: std::error::Error,
75{
76    /// Creates a new retry manager with the given configuration.
77    ///
78    /// # Errors
79    ///
80    /// Returns an error if the configuration is invalid.
81    pub const fn new(config: RetryConfig) -> anyhow::Result<Self> {
82        Ok(Self {
83            config,
84            _phantom: PhantomData,
85        })
86    }
87
88    /// Executes an operation with retry logic and optional cancellation.
89    ///
90    /// Cancellation is checked at three points:
91    /// (1) Before each operation attempt.
92    /// (2) During operation execution (via `tokio::select!`).
93    /// (3) During retry delays.
94    ///
95    /// This means cancellation may be delayed by up to one operation timeout if it occurs mid-execution.
96    ///
97    /// # Errors
98    ///
99    /// Returns an error if the operation fails after exhausting all retries,
100    /// if the operation times out, if creating the backoff state fails, or if canceled.
101    pub async fn execute_with_retry_inner<F, Fut, T>(
102        &self,
103        operation_name: &str,
104        mut operation: F,
105        should_retry: impl Fn(&E) -> bool,
106        create_error: impl Fn(String) -> E,
107        cancel: Option<&CancellationToken>,
108    ) -> Result<T, E>
109    where
110        F: FnMut() -> Fut,
111        Fut: Future<Output = Result<T, E>>,
112    {
113        let mut backoff = ExponentialBackoff::new(
114            Duration::from_millis(self.config.initial_delay_ms),
115            Duration::from_millis(self.config.max_delay_ms),
116            self.config.backoff_factor,
117            self.config.jitter_ms,
118            self.config.immediate_first,
119        )
120        .map_err(|e| create_error(format!("Invalid configuration: {e}")))?;
121
122        let mut attempt = 0;
123        let start_time = tokio::time::Instant::now();
124
125        loop {
126            if let Some(token) = cancel
127                && token.is_cancelled()
128            {
129                tracing::debug!(
130                    operation = %operation_name,
131                    attempts = attempt,
132                    "Operation canceled"
133                );
134                return Err(create_error("canceled".to_string()));
135            }
136
137            if let Some(max_elapsed_ms) = self.config.max_elapsed_ms {
138                let elapsed = start_time.elapsed();
139                if elapsed.as_millis() >= u128::from(max_elapsed_ms) {
140                    let timeout_error = create_error("Budget exceeded".to_string());
141                    tracing::trace!(
142                        operation = %operation_name,
143                        attempts = attempt + 1,
144                        budget_ms = max_elapsed_ms,
145                        "Retry budget exceeded"
146                    );
147                    return Err(timeout_error);
148                }
149            }
150
151            let result = match (self.config.operation_timeout_ms, cancel) {
152                (Some(timeout_ms), Some(token)) => {
153                    tokio::select! {
154                        result = tokio::time::timeout(Duration::from_millis(timeout_ms), operation()) => result,
155                        () = token.cancelled() => {
156                            tracing::debug!(
157                                operation = %operation_name,
158                                "Operation canceled during execution"
159                            );
160                            return Err(create_error("canceled".to_string()));
161                        }
162                    }
163                }
164                (Some(timeout_ms), None) => {
165                    tokio::time::timeout(Duration::from_millis(timeout_ms), operation()).await
166                }
167                (None, Some(token)) => {
168                    tokio::select! {
169                        result = operation() => Ok(result),
170                        () = token.cancelled() => {
171                            tracing::debug!(
172                                operation = %operation_name,
173                                "Operation canceled during execution"
174                            );
175                            return Err(create_error("canceled".to_string()));
176                        }
177                    }
178                }
179                (None, None) => Ok(operation().await),
180            };
181
182            match result {
183                Ok(Ok(success)) => {
184                    if attempt > 0 {
185                        tracing::trace!(
186                            operation = %operation_name,
187                            attempts = attempt + 1,
188                            "Retry succeeded"
189                        );
190                    }
191                    return Ok(success);
192                }
193                Ok(Err(error)) => {
194                    if !should_retry(&error) {
195                        tracing::trace!(
196                            operation = %operation_name,
197                            error = %error,
198                            "Non-retryable error"
199                        );
200                        return Err(error);
201                    }
202
203                    if attempt >= self.config.max_retries {
204                        tracing::trace!(
205                            operation = %operation_name,
206                            attempts = attempt + 1,
207                            error = %error,
208                            "Retries exhausted"
209                        );
210                        return Err(error);
211                    }
212
213                    let mut delay = backoff.next_duration();
214
215                    if let Some(max_elapsed_ms) = self.config.max_elapsed_ms {
216                        let elapsed = start_time.elapsed();
217                        let remaining =
218                            Duration::from_millis(max_elapsed_ms).saturating_sub(elapsed);
219
220                        if remaining.is_zero() {
221                            let timeout_error = create_error("Budget exceeded".to_string());
222                            tracing::trace!(
223                                operation = %operation_name,
224                                attempts = attempt + 1,
225                                budget_ms = max_elapsed_ms,
226                                "Retry budget exceeded"
227                            );
228                            return Err(timeout_error);
229                        }
230
231                        delay = delay.min(remaining);
232                    }
233
234                    tracing::trace!(
235                        operation = %operation_name,
236                        attempt = attempt + 1,
237                        delay_ms = delay.as_millis() as u64,
238                        error = %error,
239                        "Retrying after failure"
240                    );
241
242                    // Yield even on zero-delay to avoid busy-wait loop
243                    if delay.is_zero() {
244                        tokio::task::yield_now().await;
245                        attempt += 1;
246                        continue;
247                    }
248
249                    if let Some(token) = cancel {
250                        tokio::select! {
251                            () = tokio::time::sleep(delay) => {},
252                            () = token.cancelled() => {
253                                tracing::debug!(
254                                    operation = %operation_name,
255                                    attempt = attempt + 1,
256                                    "Operation canceled during retry delay"
257                                );
258                                return Err(create_error("canceled".to_string()));
259                            }
260                        }
261                    } else {
262                        tokio::time::sleep(delay).await;
263                    }
264                    attempt += 1;
265                }
266                Err(_timeout) => {
267                    let timeout_error = create_error(format!(
268                        "Timed out after {}ms",
269                        self.config.operation_timeout_ms.unwrap_or(0)
270                    ));
271
272                    if !should_retry(&timeout_error) {
273                        tracing::trace!(
274                            operation = %operation_name,
275                            error = %timeout_error,
276                            "Non-retryable timeout"
277                        );
278                        return Err(timeout_error);
279                    }
280
281                    if attempt >= self.config.max_retries {
282                        tracing::trace!(
283                            operation = %operation_name,
284                            attempts = attempt + 1,
285                            error = %timeout_error,
286                            "Retries exhausted after timeout"
287                        );
288                        return Err(timeout_error);
289                    }
290
291                    let mut delay = backoff.next_duration();
292
293                    if let Some(max_elapsed_ms) = self.config.max_elapsed_ms {
294                        let elapsed = start_time.elapsed();
295                        let remaining =
296                            Duration::from_millis(max_elapsed_ms).saturating_sub(elapsed);
297
298                        if remaining.is_zero() {
299                            let budget_error = create_error("Budget exceeded".to_string());
300                            tracing::trace!(
301                                operation = %operation_name,
302                                attempts = attempt + 1,
303                                budget_ms = max_elapsed_ms,
304                                "Retry budget exceeded"
305                            );
306                            return Err(budget_error);
307                        }
308
309                        delay = delay.min(remaining);
310                    }
311
312                    tracing::trace!(
313                        operation = %operation_name,
314                        attempt = attempt + 1,
315                        delay_ms = delay.as_millis() as u64,
316                        error = %timeout_error,
317                        "Retrying after timeout"
318                    );
319
320                    // Yield even on zero-delay to avoid busy-wait loop
321                    if delay.is_zero() {
322                        tokio::task::yield_now().await;
323                        attempt += 1;
324                        continue;
325                    }
326
327                    if let Some(token) = cancel {
328                        tokio::select! {
329                            () = tokio::time::sleep(delay) => {},
330                            () = token.cancelled() => {
331                                tracing::debug!(
332                                    operation = %operation_name,
333                                    attempt = attempt + 1,
334                                    "Operation canceled during retry delay"
335                                );
336                                return Err(create_error("canceled".to_string()));
337                            }
338                        }
339                    } else {
340                        tokio::time::sleep(delay).await;
341                    }
342                    attempt += 1;
343                }
344            }
345        }
346    }
347
348    /// Executes an operation with retry logic.
349    ///
350    /// # Errors
351    ///
352    /// Returns an error if the operation fails after exhausting all retries,
353    /// if the operation times out, or if creating the backoff state fails.
354    pub async fn execute_with_retry<F, Fut, T>(
355        &self,
356        operation_name: &str,
357        operation: F,
358        should_retry: impl Fn(&E) -> bool,
359        create_error: impl Fn(String) -> E,
360    ) -> Result<T, E>
361    where
362        F: FnMut() -> Fut,
363        Fut: Future<Output = Result<T, E>>,
364    {
365        self.execute_with_retry_inner(operation_name, operation, should_retry, create_error, None)
366            .await
367    }
368
369    /// Executes an operation with retry logic and cancellation support.
370    ///
371    /// # Errors
372    ///
373    /// Returns an error if the operation fails after exhausting all retries,
374    /// if the operation times out, if creating the backoff state fails, or if canceled.
375    pub async fn execute_with_retry_with_cancel<F, Fut, T>(
376        &self,
377        operation_name: &str,
378        operation: F,
379        should_retry: impl Fn(&E) -> bool,
380        create_error: impl Fn(String) -> E,
381        cancellation_token: &CancellationToken,
382    ) -> Result<T, E>
383    where
384        F: FnMut() -> Fut,
385        Fut: Future<Output = Result<T, E>>,
386    {
387        self.execute_with_retry_inner(
388            operation_name,
389            operation,
390            should_retry,
391            create_error,
392            Some(cancellation_token),
393        )
394        .await
395    }
396}
397
398/// Convenience function to create a retry manager with default configuration.
399///
400/// # Errors
401///
402/// Returns an error if the default configuration is invalid.
403pub fn create_default_retry_manager<E>() -> anyhow::Result<RetryManager<E>>
404where
405    E: std::error::Error,
406{
407    RetryManager::new(RetryConfig::default())
408}
409
410/// Convenience function to create a retry manager for HTTP operations.
411///
412/// # Errors
413///
414/// Returns an error if the HTTP configuration is invalid.
415pub const fn create_http_retry_manager<E>() -> anyhow::Result<RetryManager<E>>
416where
417    E: std::error::Error,
418{
419    let config = RetryConfig {
420        max_retries: 3,
421        initial_delay_ms: 1_000,
422        max_delay_ms: 10_000,
423        backoff_factor: 2.0,
424        jitter_ms: 1_000,
425        operation_timeout_ms: Some(60_000), // 60s for HTTP requests
426        immediate_first: false,
427        max_elapsed_ms: Some(180_000), // 3 minutes total budget
428    };
429    RetryManager::new(config)
430}
431
432/// Convenience function to create a retry manager for WebSocket operations.
433///
434/// # Errors
435///
436/// Returns an error if the WebSocket configuration is invalid.
437pub const fn create_websocket_retry_manager<E>() -> anyhow::Result<RetryManager<E>>
438where
439    E: std::error::Error,
440{
441    let config = RetryConfig {
442        max_retries: 5,
443        initial_delay_ms: 1_000,
444        max_delay_ms: 10_000,
445        backoff_factor: 2.0,
446        jitter_ms: 1_000,
447        operation_timeout_ms: Some(30_000), // 30s for WebSocket operations
448        immediate_first: true,
449        max_elapsed_ms: Some(120_000), // 2 minutes total budget
450    };
451    RetryManager::new(config)
452}
453
454////////////////////////////////////////////////////////////////////////////////
455// Tests
456////////////////////////////////////////////////////////////////////////////////
457
458#[cfg(test)]
459mod test_utils {
460    #[derive(Debug, thiserror::Error)]
461    pub enum TestError {
462        #[error("Retryable error: {0}")]
463        Retryable(String),
464        #[error("Non-retryable error: {0}")]
465        NonRetryable(String),
466        #[error("Timeout error: {0}")]
467        Timeout(String),
468    }
469
470    pub fn should_retry_test_error(error: &TestError) -> bool {
471        matches!(error, TestError::Retryable(_))
472    }
473
474    pub fn create_test_error(msg: String) -> TestError {
475        TestError::Timeout(msg)
476    }
477}
478
479#[cfg(test)]
480mod tests {
481    use std::sync::{
482        Arc,
483        atomic::{AtomicU32, Ordering},
484    };
485
486    use rstest::rstest;
487
488    use super::{test_utils::*, *};
489
490    const MAX_WAIT_ITERS: usize = 10_000;
491    const MAX_ADVANCE_ITERS: usize = 10_000;
492
493    pub(crate) async fn yield_until<F>(mut condition: F)
494    where
495        F: FnMut() -> bool,
496    {
497        for _ in 0..MAX_WAIT_ITERS {
498            if condition() {
499                return;
500            }
501            tokio::task::yield_now().await;
502        }
503
504        panic!("yield_until timed out waiting for condition");
505    }
506
507    pub(crate) async fn advance_until<F>(mut condition: F)
508    where
509        F: FnMut() -> bool,
510    {
511        for _ in 0..MAX_ADVANCE_ITERS {
512            if condition() {
513                return;
514            }
515            tokio::time::advance(Duration::from_millis(1)).await;
516            tokio::task::yield_now().await;
517        }
518
519        panic!("advance_until timed out waiting for condition");
520    }
521
522    #[rstest]
523    fn test_retry_config_default() {
524        let config = RetryConfig::default();
525        assert_eq!(config.max_retries, 3);
526        assert_eq!(config.initial_delay_ms, 1_000);
527        assert_eq!(config.max_delay_ms, 10_000);
528        assert_eq!(config.backoff_factor, 2.0);
529        assert_eq!(config.jitter_ms, 100);
530        assert_eq!(config.operation_timeout_ms, Some(30_000));
531        assert!(!config.immediate_first);
532        assert_eq!(config.max_elapsed_ms, None);
533    }
534
535    #[tokio::test]
536    async fn test_retry_manager_success_first_attempt() {
537        let manager = RetryManager::new(RetryConfig::default()).unwrap();
538
539        let result = manager
540            .execute_with_retry(
541                "test_operation",
542                || async { Ok::<i32, TestError>(42) },
543                should_retry_test_error,
544                create_test_error,
545            )
546            .await;
547
548        assert_eq!(result.unwrap(), 42);
549    }
550
551    #[tokio::test]
552    async fn test_retry_manager_non_retryable_error() {
553        let manager = RetryManager::new(RetryConfig::default()).unwrap();
554
555        let result = manager
556            .execute_with_retry(
557                "test_operation",
558                || async { Err::<i32, TestError>(TestError::NonRetryable("test".to_string())) },
559                should_retry_test_error,
560                create_test_error,
561            )
562            .await;
563
564        assert!(result.is_err());
565        assert!(matches!(result.unwrap_err(), TestError::NonRetryable(_)));
566    }
567
568    #[tokio::test]
569    async fn test_retry_manager_retryable_error_exhausted() {
570        let config = RetryConfig {
571            max_retries: 2,
572            initial_delay_ms: 10,
573            max_delay_ms: 50,
574            backoff_factor: 2.0,
575            jitter_ms: 0,
576            operation_timeout_ms: None,
577            immediate_first: false,
578            max_elapsed_ms: None,
579        };
580        let manager = RetryManager::new(config).unwrap();
581
582        let result = manager
583            .execute_with_retry(
584                "test_operation",
585                || async { Err::<i32, TestError>(TestError::Retryable("test".to_string())) },
586                should_retry_test_error,
587                create_test_error,
588            )
589            .await;
590
591        assert!(result.is_err());
592        assert!(matches!(result.unwrap_err(), TestError::Retryable(_)));
593    }
594
595    #[tokio::test]
596    async fn test_timeout_path() {
597        let config = RetryConfig {
598            max_retries: 2,
599            initial_delay_ms: 10,
600            max_delay_ms: 50,
601            backoff_factor: 2.0,
602            jitter_ms: 0,
603            operation_timeout_ms: Some(50),
604            immediate_first: false,
605            max_elapsed_ms: None,
606        };
607        let manager = RetryManager::new(config).unwrap();
608
609        let result = manager
610            .execute_with_retry(
611                "test_timeout",
612                || async {
613                    tokio::time::sleep(Duration::from_millis(100)).await;
614                    Ok::<i32, TestError>(42)
615                },
616                should_retry_test_error,
617                create_test_error,
618            )
619            .await;
620
621        assert!(result.is_err());
622        assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
623    }
624
625    #[tokio::test]
626    async fn test_max_elapsed_time_budget() {
627        let config = RetryConfig {
628            max_retries: 10,
629            initial_delay_ms: 50,
630            max_delay_ms: 100,
631            backoff_factor: 2.0,
632            jitter_ms: 0,
633            operation_timeout_ms: None,
634            immediate_first: false,
635            max_elapsed_ms: Some(200),
636        };
637        let manager = RetryManager::new(config).unwrap();
638
639        let start = tokio::time::Instant::now();
640        let result = manager
641            .execute_with_retry(
642                "test_budget",
643                || async { Err::<i32, TestError>(TestError::Retryable("test".to_string())) },
644                should_retry_test_error,
645                create_test_error,
646            )
647            .await;
648
649        let elapsed = start.elapsed();
650        assert!(result.is_err());
651        assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
652        assert!(elapsed.as_millis() >= 150);
653        assert!(elapsed.as_millis() < 1000);
654    }
655
656    #[rstest]
657    fn test_http_retry_manager_config() {
658        let manager = create_http_retry_manager::<TestError>().unwrap();
659        assert_eq!(manager.config.max_retries, 3);
660        assert!(!manager.config.immediate_first);
661        assert_eq!(manager.config.max_elapsed_ms, Some(180_000));
662    }
663
664    #[rstest]
665    fn test_websocket_retry_manager_config() {
666        let manager = create_websocket_retry_manager::<TestError>().unwrap();
667        assert_eq!(manager.config.max_retries, 5);
668        assert!(manager.config.immediate_first);
669        assert_eq!(manager.config.max_elapsed_ms, Some(120_000));
670    }
671
672    #[tokio::test]
673    async fn test_timeout_respects_retry_predicate() {
674        let config = RetryConfig {
675            max_retries: 3,
676            initial_delay_ms: 10,
677            max_delay_ms: 50,
678            backoff_factor: 2.0,
679            jitter_ms: 0,
680            operation_timeout_ms: Some(50),
681            immediate_first: false,
682            max_elapsed_ms: None,
683        };
684        let manager = RetryManager::new(config).unwrap();
685
686        // Test with retry predicate that rejects timeouts
687        let should_not_retry_timeouts = |error: &TestError| !matches!(error, TestError::Timeout(_));
688
689        let result = manager
690            .execute_with_retry(
691                "test_timeout_non_retryable",
692                || async {
693                    tokio::time::sleep(Duration::from_millis(100)).await;
694                    Ok::<i32, TestError>(42)
695                },
696                should_not_retry_timeouts,
697                create_test_error,
698            )
699            .await;
700
701        // Should fail immediately without retries since timeout is non-retryable
702        assert!(result.is_err());
703        assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
704    }
705
706    #[tokio::test]
707    async fn test_timeout_retries_when_predicate_allows() {
708        let config = RetryConfig {
709            max_retries: 2,
710            initial_delay_ms: 10,
711            max_delay_ms: 50,
712            backoff_factor: 2.0,
713            jitter_ms: 0,
714            operation_timeout_ms: Some(50),
715            immediate_first: false,
716            max_elapsed_ms: None,
717        };
718        let manager = RetryManager::new(config).unwrap();
719
720        // Test with retry predicate that allows timeouts
721        let should_retry_timeouts = |error: &TestError| matches!(error, TestError::Timeout(_));
722
723        let start = tokio::time::Instant::now();
724        let result = manager
725            .execute_with_retry(
726                "test_timeout_retryable",
727                || async {
728                    tokio::time::sleep(Duration::from_millis(100)).await;
729                    Ok::<i32, TestError>(42)
730                },
731                should_retry_timeouts,
732                create_test_error,
733            )
734            .await;
735
736        let elapsed = start.elapsed();
737
738        // Should fail after retries (not immediately)
739        assert!(result.is_err());
740        assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
741        // Should have taken time for retries (at least 2 timeouts + delays)
742        assert!(elapsed.as_millis() > 80); // More than just one timeout
743    }
744
745    #[tokio::test]
746    async fn test_successful_retry_after_failures() {
747        let config = RetryConfig {
748            max_retries: 3,
749            initial_delay_ms: 10,
750            max_delay_ms: 50,
751            backoff_factor: 2.0,
752            jitter_ms: 0,
753            operation_timeout_ms: None,
754            immediate_first: false,
755            max_elapsed_ms: None,
756        };
757        let manager = RetryManager::new(config).unwrap();
758
759        let attempt_counter = Arc::new(AtomicU32::new(0));
760        let counter_clone = attempt_counter.clone();
761
762        let result = manager
763            .execute_with_retry(
764                "test_eventual_success",
765                move || {
766                    let counter = counter_clone.clone();
767                    async move {
768                        let attempts = counter.fetch_add(1, Ordering::SeqCst);
769                        if attempts < 2 {
770                            Err(TestError::Retryable("temporary failure".to_string()))
771                        } else {
772                            Ok(42)
773                        }
774                    }
775                },
776                should_retry_test_error,
777                create_test_error,
778            )
779            .await;
780
781        assert_eq!(result.unwrap(), 42);
782        assert_eq!(attempt_counter.load(Ordering::SeqCst), 3);
783    }
784
785    #[tokio::test(start_paused = true)]
786    async fn test_immediate_first_retry() {
787        let config = RetryConfig {
788            max_retries: 2,
789            initial_delay_ms: 100,
790            max_delay_ms: 200,
791            backoff_factor: 2.0,
792            jitter_ms: 0,
793            operation_timeout_ms: None,
794            immediate_first: true,
795            max_elapsed_ms: None,
796        };
797        let manager = RetryManager::new(config).unwrap();
798
799        let attempt_times = Arc::new(std::sync::Mutex::new(Vec::new()));
800        let times_clone = attempt_times.clone();
801        let start = tokio::time::Instant::now();
802
803        let handle = tokio::spawn({
804            let times_clone = times_clone.clone();
805            async move {
806                let _ = manager
807                    .execute_with_retry(
808                        "test_immediate",
809                        move || {
810                            let times = times_clone.clone();
811                            async move {
812                                times.lock().unwrap().push(start.elapsed());
813                                Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
814                            }
815                        },
816                        should_retry_test_error,
817                        create_test_error,
818                    )
819                    .await;
820            }
821        });
822
823        // Allow initial attempt and immediate retry to run without advancing time
824        yield_until(|| attempt_times.lock().unwrap().len() >= 2).await;
825
826        // Advance time for the next backoff interval
827        tokio::time::advance(Duration::from_millis(100)).await;
828        tokio::task::yield_now().await;
829
830        // Wait for the final retry to be recorded
831        yield_until(|| attempt_times.lock().unwrap().len() >= 3).await;
832
833        handle.await.unwrap();
834
835        let times = attempt_times.lock().unwrap();
836        assert_eq!(times.len(), 3); // Initial + 2 retries
837
838        // First retry should be immediate (within 1ms tolerance)
839        assert!(times[1] <= Duration::from_millis(1));
840        // Second retry should have backoff delay (at least 100ms from start)
841        assert!(times[2] >= Duration::from_millis(100));
842        assert!(times[2] <= Duration::from_millis(110));
843    }
844
845    #[tokio::test]
846    async fn test_operation_without_timeout() {
847        let config = RetryConfig {
848            max_retries: 2,
849            initial_delay_ms: 10,
850            max_delay_ms: 50,
851            backoff_factor: 2.0,
852            jitter_ms: 0,
853            operation_timeout_ms: None, // No timeout
854            immediate_first: false,
855            max_elapsed_ms: None,
856        };
857        let manager = RetryManager::new(config).unwrap();
858
859        let start = tokio::time::Instant::now();
860        let result = manager
861            .execute_with_retry(
862                "test_no_timeout",
863                || async {
864                    tokio::time::sleep(Duration::from_millis(50)).await;
865                    Ok::<i32, TestError>(42)
866                },
867                should_retry_test_error,
868                create_test_error,
869            )
870            .await;
871
872        let elapsed = start.elapsed();
873        assert_eq!(result.unwrap(), 42);
874        // Should complete without timing out
875        assert!(elapsed.as_millis() >= 30);
876        assert!(elapsed.as_millis() < 200);
877    }
878
879    #[tokio::test]
880    async fn test_zero_retries() {
881        let config = RetryConfig {
882            max_retries: 0,
883            initial_delay_ms: 10,
884            max_delay_ms: 50,
885            backoff_factor: 2.0,
886            jitter_ms: 0,
887            operation_timeout_ms: None,
888            immediate_first: false,
889            max_elapsed_ms: None,
890        };
891        let manager = RetryManager::new(config).unwrap();
892
893        let attempt_counter = Arc::new(AtomicU32::new(0));
894        let counter_clone = attempt_counter.clone();
895
896        let result = manager
897            .execute_with_retry(
898                "test_no_retries",
899                move || {
900                    let counter = counter_clone.clone();
901                    async move {
902                        counter.fetch_add(1, Ordering::SeqCst);
903                        Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
904                    }
905                },
906                should_retry_test_error,
907                create_test_error,
908            )
909            .await;
910
911        assert!(result.is_err());
912        // Should only attempt once (no retries)
913        assert_eq!(attempt_counter.load(Ordering::SeqCst), 1);
914    }
915
916    #[tokio::test(start_paused = true)]
917    async fn test_jitter_applied() {
918        let config = RetryConfig {
919            max_retries: 2,
920            initial_delay_ms: 50,
921            max_delay_ms: 100,
922            backoff_factor: 2.0,
923            jitter_ms: 50, // Significant jitter
924            operation_timeout_ms: None,
925            immediate_first: false,
926            max_elapsed_ms: None,
927        };
928        let manager = RetryManager::new(config).unwrap();
929
930        let delays = Arc::new(std::sync::Mutex::new(Vec::new()));
931        let delays_clone = delays.clone();
932        let last_time = Arc::new(std::sync::Mutex::new(tokio::time::Instant::now()));
933        let last_time_clone = last_time.clone();
934
935        let handle = tokio::spawn({
936            let delays_clone = delays_clone.clone();
937            async move {
938                let _ = manager
939                    .execute_with_retry(
940                        "test_jitter",
941                        move || {
942                            let delays = delays_clone.clone();
943                            let last_time = last_time_clone.clone();
944                            async move {
945                                let now = tokio::time::Instant::now();
946                                let delay = {
947                                    let mut last = last_time.lock().unwrap();
948                                    let d = now.duration_since(*last);
949                                    *last = now;
950                                    d
951                                };
952                                delays.lock().unwrap().push(delay);
953                                Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
954                            }
955                        },
956                        should_retry_test_error,
957                        create_test_error,
958                    )
959                    .await;
960            }
961        });
962
963        yield_until(|| !delays.lock().unwrap().is_empty()).await;
964        advance_until(|| delays.lock().unwrap().len() >= 2).await;
965        advance_until(|| delays.lock().unwrap().len() >= 3).await;
966
967        handle.await.unwrap();
968
969        let delays = delays.lock().unwrap();
970        // Skip the first delay (initial attempt)
971        for delay in delays.iter().skip(1) {
972            // Each delay should be at least the base delay (50ms for first retry)
973            assert!(delay.as_millis() >= 50);
974            // But no more than base + jitter (allow small tolerance for step advance)
975            assert!(delay.as_millis() <= 151);
976        }
977    }
978
979    #[tokio::test]
980    async fn test_max_elapsed_stops_early() {
981        let config = RetryConfig {
982            max_retries: 100, // Very high retry count
983            initial_delay_ms: 50,
984            max_delay_ms: 100,
985            backoff_factor: 1.5,
986            jitter_ms: 0,
987            operation_timeout_ms: None,
988            immediate_first: false,
989            max_elapsed_ms: Some(150), // Should stop after ~3 attempts
990        };
991        let manager = RetryManager::new(config).unwrap();
992
993        let attempt_counter = Arc::new(AtomicU32::new(0));
994        let counter_clone = attempt_counter.clone();
995
996        let start = tokio::time::Instant::now();
997        let result = manager
998            .execute_with_retry(
999                "test_elapsed_limit",
1000                move || {
1001                    let counter = counter_clone.clone();
1002                    async move {
1003                        counter.fetch_add(1, Ordering::SeqCst);
1004                        Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1005                    }
1006                },
1007                should_retry_test_error,
1008                create_test_error,
1009            )
1010            .await;
1011
1012        let elapsed = start.elapsed();
1013        assert!(result.is_err());
1014        assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
1015
1016        // Should have stopped due to time limit, not retry count
1017        let attempts = attempt_counter.load(Ordering::SeqCst);
1018        assert!(attempts < 10); // Much less than max_retries
1019        assert!(elapsed.as_millis() >= 100);
1020    }
1021
1022    #[tokio::test]
1023    async fn test_mixed_errors_retry_behavior() {
1024        let config = RetryConfig {
1025            max_retries: 5,
1026            initial_delay_ms: 10,
1027            max_delay_ms: 50,
1028            backoff_factor: 2.0,
1029            jitter_ms: 0,
1030            operation_timeout_ms: None,
1031            immediate_first: false,
1032            max_elapsed_ms: None,
1033        };
1034        let manager = RetryManager::new(config).unwrap();
1035
1036        let attempt_counter = Arc::new(AtomicU32::new(0));
1037        let counter_clone = attempt_counter.clone();
1038
1039        let result = manager
1040            .execute_with_retry(
1041                "test_mixed_errors",
1042                move || {
1043                    let counter = counter_clone.clone();
1044                    async move {
1045                        let attempts = counter.fetch_add(1, Ordering::SeqCst);
1046                        match attempts {
1047                            0 => Err(TestError::Retryable("retry 1".to_string())),
1048                            1 => Err(TestError::Retryable("retry 2".to_string())),
1049                            2 => Err(TestError::NonRetryable("stop here".to_string())),
1050                            _ => Ok(42),
1051                        }
1052                    }
1053                },
1054                should_retry_test_error,
1055                create_test_error,
1056            )
1057            .await;
1058
1059        assert!(result.is_err());
1060        assert!(matches!(result.unwrap_err(), TestError::NonRetryable(_)));
1061        // Should stop at the non-retryable error
1062        assert_eq!(attempt_counter.load(Ordering::SeqCst), 3);
1063    }
1064
1065    #[tokio::test]
1066    async fn test_cancellation_during_retry_delay() {
1067        use tokio_util::sync::CancellationToken;
1068
1069        let config = RetryConfig {
1070            max_retries: 10,
1071            initial_delay_ms: 500, // Long delay to ensure cancellation happens during sleep
1072            max_delay_ms: 1000,
1073            backoff_factor: 2.0,
1074            jitter_ms: 0,
1075            operation_timeout_ms: None,
1076            immediate_first: false,
1077            max_elapsed_ms: None,
1078        };
1079        let manager = RetryManager::new(config).unwrap();
1080
1081        let token = CancellationToken::new();
1082        let token_clone = token.clone();
1083
1084        // Cancel after a short delay
1085        tokio::spawn(async move {
1086            tokio::time::sleep(Duration::from_millis(100)).await;
1087            token_clone.cancel();
1088        });
1089
1090        let attempt_counter = Arc::new(AtomicU32::new(0));
1091        let counter_clone = attempt_counter.clone();
1092
1093        let start = tokio::time::Instant::now();
1094        let result = manager
1095            .execute_with_retry_with_cancel(
1096                "test_cancellation",
1097                move || {
1098                    let counter = counter_clone.clone();
1099                    async move {
1100                        counter.fetch_add(1, Ordering::SeqCst);
1101                        Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1102                    }
1103                },
1104                should_retry_test_error,
1105                create_test_error,
1106                &token,
1107            )
1108            .await;
1109
1110        let elapsed = start.elapsed();
1111
1112        // Should be canceled quickly
1113        assert!(result.is_err());
1114        let error_msg = format!("{}", result.unwrap_err());
1115        assert!(error_msg.contains("canceled"));
1116
1117        // Should not have taken the full delay time
1118        assert!(elapsed.as_millis() < 600);
1119
1120        // Should have made at least one attempt
1121        let attempts = attempt_counter.load(Ordering::SeqCst);
1122        assert!(attempts >= 1);
1123    }
1124
1125    #[tokio::test]
1126    async fn test_cancellation_during_operation_execution() {
1127        use tokio_util::sync::CancellationToken;
1128
1129        let config = RetryConfig {
1130            max_retries: 5,
1131            initial_delay_ms: 50,
1132            max_delay_ms: 100,
1133            backoff_factor: 2.0,
1134            jitter_ms: 0,
1135            operation_timeout_ms: None,
1136            immediate_first: false,
1137            max_elapsed_ms: None,
1138        };
1139        let manager = RetryManager::new(config).unwrap();
1140
1141        let token = CancellationToken::new();
1142        let token_clone = token.clone();
1143
1144        // Cancel after a short delay
1145        tokio::spawn(async move {
1146            tokio::time::sleep(Duration::from_millis(50)).await;
1147            token_clone.cancel();
1148        });
1149
1150        let start = tokio::time::Instant::now();
1151        let result = manager
1152            .execute_with_retry_with_cancel(
1153                "test_cancellation_during_op",
1154                || async {
1155                    // Long-running operation
1156                    tokio::time::sleep(Duration::from_millis(200)).await;
1157                    Ok::<i32, TestError>(42)
1158                },
1159                should_retry_test_error,
1160                create_test_error,
1161                &token,
1162            )
1163            .await;
1164
1165        let elapsed = start.elapsed();
1166
1167        // Should be canceled during the operation
1168        assert!(result.is_err());
1169        let error_msg = format!("{}", result.unwrap_err());
1170        assert!(error_msg.contains("canceled"));
1171
1172        // Should not have completed the long operation
1173        assert!(elapsed.as_millis() < 250);
1174    }
1175
1176    #[tokio::test]
1177    async fn test_cancellation_error_message() {
1178        use tokio_util::sync::CancellationToken;
1179
1180        let config = RetryConfig::default();
1181        let manager = RetryManager::new(config).unwrap();
1182
1183        let token = CancellationToken::new();
1184        token.cancel(); // Pre-cancel for immediate cancellation
1185
1186        let result = manager
1187            .execute_with_retry_with_cancel(
1188                "test_operation",
1189                || async { Ok::<i32, TestError>(42) },
1190                should_retry_test_error,
1191                create_test_error,
1192                &token,
1193            )
1194            .await;
1195
1196        assert!(result.is_err());
1197        let error_msg = format!("{}", result.unwrap_err());
1198        assert!(error_msg.contains("canceled"));
1199    }
1200}
1201
1202#[cfg(test)]
1203mod proptest_tests {
1204    use std::sync::{
1205        Arc,
1206        atomic::{AtomicU32, Ordering},
1207    };
1208
1209    use proptest::prelude::*;
1210    // Import rstest attribute macro used within proptest! tests
1211    use rstest::rstest;
1212
1213    use super::{
1214        test_utils::*,
1215        tests::{advance_until, yield_until},
1216        *,
1217    };
1218
1219    proptest! {
1220        #[rstest]
1221        fn test_retry_config_valid_ranges(
1222            max_retries in 0u32..100,
1223            initial_delay_ms in 1u64..10_000,
1224            max_delay_ms in 1u64..60_000,
1225            backoff_factor in 1.0f64..10.0,
1226            jitter_ms in 0u64..1_000,
1227            operation_timeout_ms in prop::option::of(1u64..120_000),
1228            immediate_first in any::<bool>(),
1229            max_elapsed_ms in prop::option::of(1u64..300_000)
1230        ) {
1231            // Ensure max_delay >= initial_delay for valid config
1232            let max_delay_ms = max_delay_ms.max(initial_delay_ms);
1233
1234            let config = RetryConfig {
1235                max_retries,
1236                initial_delay_ms,
1237                max_delay_ms,
1238                backoff_factor,
1239                jitter_ms,
1240                operation_timeout_ms,
1241                immediate_first,
1242                max_elapsed_ms,
1243            };
1244
1245            // Should always be able to create a RetryManager with valid config
1246            let manager = RetryManager::<std::io::Error>::new(config);
1247            prop_assert!(manager.is_ok());
1248        }
1249
1250        #[rstest]
1251        fn test_retry_attempts_bounded(
1252            max_retries in 0u32..5,
1253            initial_delay_ms in 1u64..10,
1254            backoff_factor in 1.0f64..2.0,
1255        ) {
1256            let rt = tokio::runtime::Builder::new_current_thread()
1257                .enable_time()
1258                .build()
1259                .unwrap();
1260
1261            let config = RetryConfig {
1262                max_retries,
1263                initial_delay_ms,
1264                max_delay_ms: initial_delay_ms * 2,
1265                backoff_factor,
1266                jitter_ms: 0,
1267                operation_timeout_ms: None,
1268                immediate_first: false,
1269                max_elapsed_ms: None,
1270            };
1271
1272            let manager = RetryManager::new(config).unwrap();
1273            let attempt_counter = Arc::new(AtomicU32::new(0));
1274            let counter_clone = attempt_counter.clone();
1275
1276            let _result = rt.block_on(manager.execute_with_retry(
1277                "prop_test",
1278                move || {
1279                    let counter = counter_clone.clone();
1280                    async move {
1281                        counter.fetch_add(1, Ordering::SeqCst);
1282                        Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1283                    }
1284                },
1285                |e: &TestError| matches!(e, TestError::Retryable(_)),
1286                TestError::Timeout,
1287            ));
1288
1289            let attempts = attempt_counter.load(Ordering::SeqCst);
1290            // Total attempts should be 1 (initial) + max_retries
1291            prop_assert_eq!(attempts, max_retries + 1);
1292        }
1293
1294        #[rstest]
1295        fn test_timeout_always_respected(
1296            timeout_ms in 10u64..50,
1297            operation_delay_ms in 60u64..100,
1298        ) {
1299            let rt = tokio::runtime::Builder::new_current_thread()
1300                .enable_time()
1301                .start_paused(true)
1302                .build()
1303                .unwrap();
1304
1305            let config = RetryConfig {
1306                max_retries: 0, // No retries to isolate timeout behavior
1307                initial_delay_ms: 10,
1308                max_delay_ms: 100,
1309                backoff_factor: 2.0,
1310                jitter_ms: 0,
1311                operation_timeout_ms: Some(timeout_ms),
1312                immediate_first: false,
1313                max_elapsed_ms: None,
1314            };
1315
1316            let manager = RetryManager::new(config).unwrap();
1317
1318            let result = rt.block_on(async {
1319                let operation_future = manager.execute_with_retry(
1320                    "timeout_test",
1321                    move || async move {
1322                        tokio::time::sleep(Duration::from_millis(operation_delay_ms)).await;
1323                        Ok::<i32, TestError>(42)
1324                    },
1325                    |_: &TestError| true,
1326                    TestError::Timeout,
1327                );
1328
1329                // Advance time to trigger timeout
1330                tokio::time::advance(Duration::from_millis(timeout_ms + 10)).await;
1331                operation_future.await
1332            });
1333
1334            // Operation should timeout
1335            prop_assert!(result.is_err());
1336            prop_assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
1337        }
1338
1339        #[rstest]
1340        fn test_max_elapsed_always_respected(
1341            max_elapsed_ms in 20u64..50,
1342            delay_per_retry in 15u64..30,
1343            max_retries in 10u32..20,
1344        ) {
1345            let rt = tokio::runtime::Builder::new_current_thread()
1346                .enable_time()
1347                .start_paused(true)
1348                .build()
1349                .unwrap();
1350
1351            // Set up config where we would exceed max_elapsed_ms before max_retries
1352            let config = RetryConfig {
1353                max_retries,
1354                initial_delay_ms: delay_per_retry,
1355                max_delay_ms: delay_per_retry * 2,
1356                backoff_factor: 1.0, // No backoff to make timing predictable
1357                jitter_ms: 0,
1358                operation_timeout_ms: None,
1359                immediate_first: false,
1360                max_elapsed_ms: Some(max_elapsed_ms),
1361            };
1362
1363            let manager = RetryManager::new(config).unwrap();
1364            let attempt_counter = Arc::new(AtomicU32::new(0));
1365            let counter_clone = attempt_counter.clone();
1366
1367            let result = rt.block_on(async {
1368                let operation_future = manager.execute_with_retry(
1369                    "elapsed_test",
1370                    move || {
1371                        let counter = counter_clone.clone();
1372                        async move {
1373                            counter.fetch_add(1, Ordering::SeqCst);
1374                            Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1375                        }
1376                    },
1377                    |e: &TestError| matches!(e, TestError::Retryable(_)),
1378                    TestError::Timeout,
1379                );
1380
1381                // Advance time past max_elapsed_ms
1382                tokio::time::advance(Duration::from_millis(max_elapsed_ms + delay_per_retry)).await;
1383                operation_future.await
1384            });
1385
1386            let attempts = attempt_counter.load(Ordering::SeqCst);
1387
1388            // Should have failed with timeout error
1389            prop_assert!(result.is_err());
1390            prop_assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
1391
1392            // Should have stopped before exhausting all retries
1393            prop_assert!(attempts <= max_retries + 1);
1394        }
1395
1396        #[rstest]
1397        fn test_jitter_bounds(
1398            jitter_ms in 0u64..20,
1399            base_delay_ms in 10u64..30,
1400        ) {
1401            let rt = tokio::runtime::Builder::new_current_thread()
1402                .enable_time()
1403                .start_paused(true)
1404                .build()
1405                .unwrap();
1406
1407            let config = RetryConfig {
1408                max_retries: 2,
1409                initial_delay_ms: base_delay_ms,
1410                max_delay_ms: base_delay_ms * 2,
1411                backoff_factor: 1.0, // No backoff to isolate jitter
1412                jitter_ms,
1413                operation_timeout_ms: None,
1414                immediate_first: false,
1415                max_elapsed_ms: None,
1416            };
1417
1418            let manager = RetryManager::new(config).unwrap();
1419            let attempt_times = Arc::new(std::sync::Mutex::new(Vec::new()));
1420            let attempt_times_for_block = attempt_times.clone();
1421
1422            rt.block_on(async move {
1423                let attempt_times_for_wait = attempt_times_for_block.clone();
1424                let handle = tokio::spawn({
1425                    let attempt_times_for_task = attempt_times_for_block.clone();
1426                    let manager = manager;
1427                    async move {
1428                        let start_time = tokio::time::Instant::now();
1429                        let _ = manager
1430                            .execute_with_retry(
1431                                "jitter_test",
1432                                move || {
1433                                    let attempt_times_inner = attempt_times_for_task.clone();
1434                                    async move {
1435                                        attempt_times_inner
1436                                            .lock()
1437                                            .unwrap()
1438                                            .push(start_time.elapsed());
1439                                        Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1440                                    }
1441                                },
1442                                |e: &TestError| matches!(e, TestError::Retryable(_)),
1443                                TestError::Timeout,
1444                            )
1445                            .await;
1446                    }
1447                });
1448
1449                yield_until(|| !attempt_times_for_wait.lock().unwrap().is_empty()).await;
1450                advance_until(|| attempt_times_for_wait.lock().unwrap().len() >= 2).await;
1451                advance_until(|| attempt_times_for_wait.lock().unwrap().len() >= 3).await;
1452
1453                handle.await.unwrap();
1454            });
1455
1456            let times = attempt_times.lock().unwrap();
1457
1458            // We expect at least 2 attempts total (initial + at least 1 retry)
1459            prop_assert!(times.len() >= 2);
1460
1461            // First attempt should be immediate (no delay)
1462            prop_assert!(times[0].as_millis() < 5);
1463
1464            // Check subsequent retries have appropriate delays
1465            for i in 1..times.len() {
1466                let delay_from_previous = if i == 1 {
1467                    times[i] - times[0]
1468                } else {
1469                    times[i] - times[i - 1]
1470                };
1471
1472                // The delay should be at least base_delay_ms
1473                prop_assert!(
1474                    delay_from_previous.as_millis() >= base_delay_ms as u128,
1475                    "Retry {} delay {}ms is less than base {}ms",
1476                    i, delay_from_previous.as_millis(), base_delay_ms
1477                );
1478
1479                // Delay should be at most base_delay + jitter
1480                prop_assert!(
1481                    delay_from_previous.as_millis() <= (base_delay_ms + jitter_ms + 1) as u128,
1482                    "Retry {} delay {}ms exceeds base {} + jitter {}",
1483                    i, delay_from_previous.as_millis(), base_delay_ms, jitter_ms
1484                );
1485            }
1486        }
1487
1488        #[rstest]
1489        fn test_immediate_first_property(
1490            immediate_first in any::<bool>(),
1491            initial_delay_ms in 10u64..30,
1492        ) {
1493            let rt = tokio::runtime::Builder::new_current_thread()
1494                .enable_time()
1495                .start_paused(true)
1496                .build()
1497                .unwrap();
1498
1499            let config = RetryConfig {
1500                max_retries: 2,
1501                initial_delay_ms,
1502                max_delay_ms: initial_delay_ms * 2,
1503                backoff_factor: 2.0,
1504                jitter_ms: 0,
1505                operation_timeout_ms: None,
1506                immediate_first,
1507                max_elapsed_ms: None,
1508            };
1509
1510            let manager = RetryManager::new(config).unwrap();
1511            let attempt_times = Arc::new(std::sync::Mutex::new(Vec::new()));
1512            let attempt_times_for_block = attempt_times.clone();
1513
1514            rt.block_on(async move {
1515                let attempt_times_for_wait = attempt_times_for_block.clone();
1516                let handle = tokio::spawn({
1517                    let attempt_times_for_task = attempt_times_for_block.clone();
1518                    let manager = manager;
1519                    async move {
1520                        let start = tokio::time::Instant::now();
1521                        let _ = manager
1522                            .execute_with_retry(
1523                                "immediate_test",
1524                                move || {
1525                                    let attempt_times_inner = attempt_times_for_task.clone();
1526                                    async move {
1527                                        let elapsed = start.elapsed();
1528                                        attempt_times_inner.lock().unwrap().push(elapsed);
1529                                        Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1530                                    }
1531                                },
1532                                |e: &TestError| matches!(e, TestError::Retryable(_)),
1533                                TestError::Timeout,
1534                            )
1535                            .await;
1536                    }
1537                });
1538
1539                yield_until(|| !attempt_times_for_wait.lock().unwrap().is_empty()).await;
1540                advance_until(|| attempt_times_for_wait.lock().unwrap().len() >= 2).await;
1541                advance_until(|| attempt_times_for_wait.lock().unwrap().len() >= 3).await;
1542
1543                handle.await.unwrap();
1544            });
1545
1546            let times = attempt_times.lock().unwrap();
1547            prop_assert!(times.len() >= 2);
1548
1549            if immediate_first {
1550                // First retry should be immediate
1551                prop_assert!(times[1].as_millis() < 20,
1552                    "With immediate_first=true, first retry took {}ms",
1553                    times[1].as_millis());
1554            } else {
1555                // First retry should have delay
1556                prop_assert!(times[1].as_millis() >= (initial_delay_ms - 1) as u128,
1557                    "With immediate_first=false, first retry was too fast: {}ms",
1558                    times[1].as_millis());
1559            }
1560        }
1561
1562        #[rstest]
1563        fn test_non_retryable_stops_immediately(
1564            attempt_before_non_retryable in 0usize..3,
1565            max_retries in 3u32..5,
1566        ) {
1567            let rt = tokio::runtime::Builder::new_current_thread()
1568                .enable_time()
1569                .build()
1570                .unwrap();
1571
1572            let config = RetryConfig {
1573                max_retries,
1574                initial_delay_ms: 10,
1575                max_delay_ms: 100,
1576                backoff_factor: 2.0,
1577                jitter_ms: 0,
1578                operation_timeout_ms: None,
1579                immediate_first: false,
1580                max_elapsed_ms: None,
1581            };
1582
1583            let manager = RetryManager::new(config).unwrap();
1584            let attempt_counter = Arc::new(AtomicU32::new(0));
1585            let counter_clone = attempt_counter.clone();
1586
1587            let result: Result<i32, TestError> = rt.block_on(manager.execute_with_retry(
1588                "non_retryable_test",
1589                move || {
1590                    let counter = counter_clone.clone();
1591                    async move {
1592                        let attempts = counter.fetch_add(1, Ordering::SeqCst) as usize;
1593                        if attempts == attempt_before_non_retryable {
1594                            Err(TestError::NonRetryable("stop".to_string()))
1595                        } else {
1596                            Err(TestError::Retryable("retry".to_string()))
1597                        }
1598                    }
1599                },
1600                |e: &TestError| matches!(e, TestError::Retryable(_)),
1601                TestError::Timeout,
1602            ));
1603
1604            let attempts = attempt_counter.load(Ordering::SeqCst) as usize;
1605
1606            prop_assert!(result.is_err());
1607            prop_assert!(matches!(result.unwrap_err(), TestError::NonRetryable(_)));
1608            // Should stop exactly when non-retryable error occurs
1609            prop_assert_eq!(attempts, attempt_before_non_retryable + 1);
1610        }
1611
1612        #[rstest]
1613        fn test_cancellation_stops_immediately(
1614            cancel_after_ms in 10u64..100,
1615            initial_delay_ms in 200u64..500,
1616        ) {
1617            use tokio_util::sync::CancellationToken;
1618
1619            let rt = tokio::runtime::Builder::new_current_thread()
1620                .enable_time()
1621                .start_paused(true)
1622                .build()
1623                .unwrap();
1624
1625            let config = RetryConfig {
1626                max_retries: 10,
1627                initial_delay_ms,
1628                max_delay_ms: initial_delay_ms * 2,
1629                backoff_factor: 2.0,
1630                jitter_ms: 0,
1631                operation_timeout_ms: None,
1632                immediate_first: false,
1633                max_elapsed_ms: None,
1634            };
1635
1636            let manager = RetryManager::new(config).unwrap();
1637            let token = CancellationToken::new();
1638            let token_clone = token.clone();
1639
1640            let result: Result<i32, TestError> = rt.block_on(async {
1641                // Spawn cancellation task
1642                tokio::spawn(async move {
1643                    tokio::time::sleep(Duration::from_millis(cancel_after_ms)).await;
1644                    token_clone.cancel();
1645                });
1646
1647                let operation_future = manager.execute_with_retry_with_cancel(
1648                    "cancellation_test",
1649                    || async {
1650                        Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1651                    },
1652                    |e: &TestError| matches!(e, TestError::Retryable(_)),
1653                    create_test_error,
1654                    &token,
1655                );
1656
1657                // Advance time to trigger cancellation
1658                tokio::time::advance(Duration::from_millis(cancel_after_ms + 10)).await;
1659                operation_future.await
1660            });
1661
1662            // Should be canceled
1663            prop_assert!(result.is_err());
1664            let error_msg = format!("{}", result.unwrap_err());
1665            prop_assert!(error_msg.contains("canceled"));
1666        }
1667
1668        #[rstest]
1669        fn test_budget_clamp_prevents_overshoot(
1670            max_elapsed_ms in 10u64..30,
1671            delay_per_retry in 20u64..50,
1672        ) {
1673            let rt = tokio::runtime::Builder::new_current_thread()
1674                .enable_time()
1675                .start_paused(true)
1676                .build()
1677                .unwrap();
1678
1679            // Configure so that first retry delay would exceed budget
1680            let config = RetryConfig {
1681                max_retries: 5,
1682                initial_delay_ms: delay_per_retry,
1683                max_delay_ms: delay_per_retry * 2,
1684                backoff_factor: 1.0,
1685                jitter_ms: 0,
1686                operation_timeout_ms: None,
1687                immediate_first: false,
1688                max_elapsed_ms: Some(max_elapsed_ms),
1689            };
1690
1691            let manager = RetryManager::new(config).unwrap();
1692
1693            let _result = rt.block_on(async {
1694                let operation_future = manager.execute_with_retry(
1695                    "budget_clamp_test",
1696                    || async {
1697                        // Fast operation to focus on delay timing
1698                        Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1699                    },
1700                    |e: &TestError| matches!(e, TestError::Retryable(_)),
1701                    create_test_error,
1702                );
1703
1704                // Advance time past max_elapsed_ms
1705                tokio::time::advance(Duration::from_millis(max_elapsed_ms + delay_per_retry)).await;
1706                operation_future.await
1707            });
1708
1709            // With deterministic time, operation completes without wall-clock delay
1710            // The budget constraint is still enforced by the retry manager
1711        }
1712
1713        #[rstest]
1714        fn test_success_on_kth_attempt(
1715            k in 1usize..5,
1716            initial_delay_ms in 5u64..20,
1717        ) {
1718            let rt = tokio::runtime::Builder::new_current_thread()
1719                .enable_time()
1720                .start_paused(true)
1721                .build()
1722                .unwrap();
1723
1724            let config = RetryConfig {
1725                max_retries: 10, // More than k
1726                initial_delay_ms,
1727                max_delay_ms: initial_delay_ms * 4,
1728                backoff_factor: 2.0,
1729                jitter_ms: 0,
1730                operation_timeout_ms: None,
1731                immediate_first: false,
1732                max_elapsed_ms: None,
1733            };
1734
1735            let manager = RetryManager::new(config).unwrap();
1736            let attempt_counter = Arc::new(AtomicU32::new(0));
1737            let counter_clone = attempt_counter.clone();
1738            let target_k = k;
1739
1740            let (result, _elapsed) = rt.block_on(async {
1741                let start = tokio::time::Instant::now();
1742
1743                let operation_future = manager.execute_with_retry(
1744                    "kth_attempt_test",
1745                    move || {
1746                        let counter = counter_clone.clone();
1747                        async move {
1748                            let attempt = counter.fetch_add(1, Ordering::SeqCst) as usize;
1749                            if attempt + 1 == target_k {
1750                                Ok(42)
1751                            } else {
1752                                Err(TestError::Retryable("retry".to_string()))
1753                            }
1754                        }
1755                    },
1756                    |e: &TestError| matches!(e, TestError::Retryable(_)),
1757                    create_test_error,
1758                );
1759
1760                // Advance time to allow enough retries
1761                for _ in 0..k {
1762                    tokio::time::advance(Duration::from_millis(initial_delay_ms * 4)).await;
1763                }
1764
1765                let result = operation_future.await;
1766                let elapsed = start.elapsed();
1767
1768                (result, elapsed)
1769            });
1770
1771            let attempts = attempt_counter.load(Ordering::SeqCst) as usize;
1772
1773            // Using paused Tokio time (start_paused + advance); assert behavior only (no wall-clock timing)
1774            prop_assert!(result.is_ok());
1775            prop_assert_eq!(result.unwrap(), 42);
1776            prop_assert_eq!(attempts, k);
1777        }
1778    }
1779}