nautilus_network/
retry.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Generic retry mechanism for network operations.
17
18use std::{future::Future, marker::PhantomData, time::Duration};
19
20use tokio_util::sync::CancellationToken;
21
22use crate::backoff::ExponentialBackoff;
23
24/// Configuration for retry behavior.
25#[derive(Debug, Clone)]
26pub struct RetryConfig {
27    /// Maximum number of retry attempts (total attempts = 1 initial + `max_retries`).
28    pub max_retries: u32,
29    /// Initial delay between retries in milliseconds.
30    pub initial_delay_ms: u64,
31    /// Maximum delay between retries in milliseconds.
32    pub max_delay_ms: u64,
33    /// Backoff multiplier factor.
34    pub backoff_factor: f64,
35    /// Maximum jitter in milliseconds to add to delays.
36    pub jitter_ms: u64,
37    /// Optional timeout for individual operations in milliseconds.
38    /// If None, no timeout is applied.
39    pub operation_timeout_ms: Option<u64>,
40    /// Whether the first retry should happen immediately without delay.
41    /// Should be false for HTTP/order operations, true for connection operations.
42    pub immediate_first: bool,
43    /// Optional maximum total elapsed time for all retries in milliseconds.
44    /// If exceeded, retries stop even if `max_retries` hasn't been reached.
45    pub max_elapsed_ms: Option<u64>,
46}
47
48impl Default for RetryConfig {
49    fn default() -> Self {
50        Self {
51            max_retries: 3,
52            initial_delay_ms: 1_000,
53            max_delay_ms: 10_000,
54            backoff_factor: 2.0,
55            jitter_ms: 100,
56            operation_timeout_ms: Some(30_000),
57            immediate_first: false,
58            max_elapsed_ms: None,
59        }
60    }
61}
62
63/// Generic retry manager for network operations.
64///
65/// Stateless and thread-safe - each operation maintains its own backoff state.
66#[derive(Clone, Debug)]
67pub struct RetryManager<E> {
68    config: RetryConfig,
69    _phantom: PhantomData<E>,
70}
71
72impl<E> RetryManager<E>
73where
74    E: std::error::Error,
75{
76    /// Creates a new retry manager with the given configuration.
77    ///
78    /// # Errors
79    ///
80    /// This function will return an error if the configuration is invalid.
81    pub const fn new(config: RetryConfig) -> anyhow::Result<Self> {
82        Ok(Self {
83            config,
84            _phantom: PhantomData,
85        })
86    }
87
88    /// Executes an operation with retry logic and optional cancellation.
89    ///
90    /// # Errors
91    ///
92    /// This function will return an error if the operation fails after exhausting all retries,
93    /// if the operation times out, if creating the backoff state fails, or if canceled.
94    pub async fn execute_with_retry_inner<F, Fut, T>(
95        &self,
96        operation_name: &str,
97        mut operation: F,
98        should_retry: impl Fn(&E) -> bool,
99        create_error: impl Fn(String) -> E,
100        cancel: Option<&CancellationToken>,
101    ) -> Result<T, E>
102    where
103        F: FnMut() -> Fut,
104        Fut: Future<Output = Result<T, E>>,
105    {
106        let mut backoff = ExponentialBackoff::new(
107            Duration::from_millis(self.config.initial_delay_ms),
108            Duration::from_millis(self.config.max_delay_ms),
109            self.config.backoff_factor,
110            self.config.jitter_ms,
111            self.config.immediate_first,
112        )
113        .map_err(|e| create_error(format!("Invalid configuration: {e}")))?;
114
115        let mut attempt = 0;
116        let start_time = tokio::time::Instant::now();
117
118        loop {
119            if let Some(token) = cancel
120                && token.is_cancelled()
121            {
122                tracing::debug!(
123                    operation = %operation_name,
124                    attempts = attempt,
125                    "Operation canceled"
126                );
127                return Err(create_error("canceled".to_string()));
128            }
129
130            if let Some(max_elapsed_ms) = self.config.max_elapsed_ms {
131                let elapsed = start_time.elapsed();
132                if elapsed.as_millis() >= u128::from(max_elapsed_ms) {
133                    let timeout_error = create_error("Budget exceeded".to_string());
134                    tracing::trace!(
135                        operation = %operation_name,
136                        attempts = attempt + 1,
137                        budget_ms = max_elapsed_ms,
138                        "Retry budget exceeded"
139                    );
140                    return Err(timeout_error);
141                }
142            }
143
144            let result = match (self.config.operation_timeout_ms, cancel) {
145                (Some(timeout_ms), Some(token)) => {
146                    tokio::select! {
147                        result = tokio::time::timeout(Duration::from_millis(timeout_ms), operation()) => result,
148                        () = token.cancelled() => {
149                            tracing::debug!(
150                                operation = %operation_name,
151                                "Operation canceled during execution"
152                            );
153                            return Err(create_error("canceled".to_string()));
154                        }
155                    }
156                }
157                (Some(timeout_ms), None) => {
158                    tokio::time::timeout(Duration::from_millis(timeout_ms), operation()).await
159                }
160                (None, Some(token)) => {
161                    tokio::select! {
162                        result = operation() => Ok(result),
163                        () = token.cancelled() => {
164                            tracing::debug!(
165                                operation = %operation_name,
166                                "Operation canceled during execution"
167                            );
168                            return Err(create_error("canceled".to_string()));
169                        }
170                    }
171                }
172                (None, None) => Ok(operation().await),
173            };
174
175            match result {
176                Ok(Ok(success)) => {
177                    if attempt > 0 {
178                        tracing::trace!(
179                            operation = %operation_name,
180                            attempts = attempt + 1,
181                            "Retry succeeded"
182                        );
183                    }
184                    return Ok(success);
185                }
186                Ok(Err(error)) => {
187                    if !should_retry(&error) {
188                        tracing::trace!(
189                            operation = %operation_name,
190                            error = %error,
191                            "Non-retryable error"
192                        );
193                        return Err(error);
194                    }
195
196                    if attempt >= self.config.max_retries {
197                        tracing::trace!(
198                            operation = %operation_name,
199                            attempts = attempt + 1,
200                            error = %error,
201                            "Retries exhausted"
202                        );
203                        return Err(error);
204                    }
205
206                    let mut delay = backoff.next_duration();
207
208                    if let Some(max_elapsed_ms) = self.config.max_elapsed_ms {
209                        let elapsed = start_time.elapsed();
210                        let remaining =
211                            Duration::from_millis(max_elapsed_ms).saturating_sub(elapsed);
212
213                        if remaining.is_zero() {
214                            let timeout_error = create_error("Budget exceeded".to_string());
215                            tracing::trace!(
216                                operation = %operation_name,
217                                attempts = attempt + 1,
218                                budget_ms = max_elapsed_ms,
219                                "Retry budget exceeded"
220                            );
221                            return Err(timeout_error);
222                        }
223
224                        delay = delay.min(remaining);
225                    }
226
227                    tracing::trace!(
228                        operation = %operation_name,
229                        attempt = attempt + 1,
230                        delay_ms = delay.as_millis() as u64,
231                        error = %error,
232                        "Retrying after failure"
233                    );
234
235                    // Skip zero-delay sleep to avoid unnecessary yield
236                    if delay.is_zero() {
237                        attempt += 1;
238                        continue;
239                    }
240
241                    if let Some(token) = cancel {
242                        tokio::select! {
243                            () = tokio::time::sleep(delay) => {},
244                            () = token.cancelled() => {
245                                tracing::debug!(
246                                    operation = %operation_name,
247                                    attempt = attempt + 1,
248                                    "Operation canceled during retry delay"
249                                );
250                                return Err(create_error("canceled".to_string()));
251                            }
252                        }
253                    } else {
254                        tokio::time::sleep(delay).await;
255                    }
256                    attempt += 1;
257                }
258                Err(_timeout) => {
259                    let timeout_error = create_error(format!(
260                        "Timed out after {}ms",
261                        self.config.operation_timeout_ms.unwrap_or(0)
262                    ));
263
264                    if !should_retry(&timeout_error) {
265                        tracing::trace!(
266                            operation = %operation_name,
267                            error = %timeout_error,
268                            "Non-retryable timeout"
269                        );
270                        return Err(timeout_error);
271                    }
272
273                    if attempt >= self.config.max_retries {
274                        tracing::trace!(
275                            operation = %operation_name,
276                            attempts = attempt + 1,
277                            error = %timeout_error,
278                            "Retries exhausted after timeout"
279                        );
280                        return Err(timeout_error);
281                    }
282
283                    let mut delay = backoff.next_duration();
284
285                    if let Some(max_elapsed_ms) = self.config.max_elapsed_ms {
286                        let elapsed = start_time.elapsed();
287                        let remaining =
288                            Duration::from_millis(max_elapsed_ms).saturating_sub(elapsed);
289
290                        if remaining.is_zero() {
291                            let budget_error = create_error("Budget exceeded".to_string());
292                            tracing::trace!(
293                                operation = %operation_name,
294                                attempts = attempt + 1,
295                                budget_ms = max_elapsed_ms,
296                                "Retry budget exceeded"
297                            );
298                            return Err(budget_error);
299                        }
300
301                        delay = delay.min(remaining);
302                    }
303
304                    tracing::trace!(
305                        operation = %operation_name,
306                        attempt = attempt + 1,
307                        delay_ms = delay.as_millis() as u64,
308                        error = %timeout_error,
309                        "Retrying after timeout"
310                    );
311
312                    // Skip zero-delay sleep to avoid unnecessary yield
313                    if delay.is_zero() {
314                        attempt += 1;
315                        continue;
316                    }
317
318                    if let Some(token) = cancel {
319                        tokio::select! {
320                            () = tokio::time::sleep(delay) => {},
321                            () = token.cancelled() => {
322                                tracing::debug!(
323                                    operation = %operation_name,
324                                    attempt = attempt + 1,
325                                    "Operation canceled during retry delay"
326                                );
327                                return Err(create_error("canceled".to_string()));
328                            }
329                        }
330                    } else {
331                        tokio::time::sleep(delay).await;
332                    }
333                    attempt += 1;
334                }
335            }
336        }
337    }
338
339    /// Executes an operation with retry logic.
340    ///
341    /// # Errors
342    ///
343    /// This function will return an error if the operation fails after exhausting all retries,
344    /// if the operation times out, or if creating the backoff state fails.
345    pub async fn execute_with_retry<F, Fut, T>(
346        &self,
347        operation_name: &str,
348        operation: F,
349        should_retry: impl Fn(&E) -> bool,
350        create_error: impl Fn(String) -> E,
351    ) -> Result<T, E>
352    where
353        F: FnMut() -> Fut,
354        Fut: Future<Output = Result<T, E>>,
355    {
356        self.execute_with_retry_inner(operation_name, operation, should_retry, create_error, None)
357            .await
358    }
359
360    /// Executes an operation with retry logic and cancellation support.
361    ///
362    /// # Errors
363    ///
364    /// This function will return an error if the operation fails after exhausting all retries,
365    /// if the operation times out, if creating the backoff state fails, or if canceled.
366    pub async fn execute_with_retry_with_cancel<F, Fut, T>(
367        &self,
368        operation_name: &str,
369        operation: F,
370        should_retry: impl Fn(&E) -> bool,
371        create_error: impl Fn(String) -> E,
372        cancellation_token: &CancellationToken,
373    ) -> Result<T, E>
374    where
375        F: FnMut() -> Fut,
376        Fut: Future<Output = Result<T, E>>,
377    {
378        self.execute_with_retry_inner(
379            operation_name,
380            operation,
381            should_retry,
382            create_error,
383            Some(cancellation_token),
384        )
385        .await
386    }
387}
388
389/// Convenience function to create a retry manager with default configuration.
390///
391/// # Errors
392///
393/// This function will return an error if the default configuration is invalid.
394pub fn create_default_retry_manager<E>() -> anyhow::Result<RetryManager<E>>
395where
396    E: std::error::Error,
397{
398    RetryManager::new(RetryConfig::default())
399}
400
401/// Convenience function to create a retry manager for HTTP operations.
402///
403/// # Errors
404///
405/// This function will return an error if the HTTP configuration is invalid.
406pub const fn create_http_retry_manager<E>() -> anyhow::Result<RetryManager<E>>
407where
408    E: std::error::Error,
409{
410    let config = RetryConfig {
411        max_retries: 3,
412        initial_delay_ms: 1_000,
413        max_delay_ms: 10_000,
414        backoff_factor: 2.0,
415        jitter_ms: 1_000,
416        operation_timeout_ms: Some(60_000), // 60s for HTTP requests
417        immediate_first: false,
418        max_elapsed_ms: Some(180_000), // 3 minutes total budget
419    };
420    RetryManager::new(config)
421}
422
423/// Convenience function to create a retry manager for WebSocket operations.
424///
425/// # Errors
426///
427/// This function will return an error if the WebSocket configuration is invalid.
428pub const fn create_websocket_retry_manager<E>() -> anyhow::Result<RetryManager<E>>
429where
430    E: std::error::Error,
431{
432    let config = RetryConfig {
433        max_retries: 5,
434        initial_delay_ms: 1_000,
435        max_delay_ms: 10_000,
436        backoff_factor: 2.0,
437        jitter_ms: 1_000,
438        operation_timeout_ms: Some(30_000), // 30s for WebSocket operations
439        immediate_first: true,
440        max_elapsed_ms: Some(120_000), // 2 minutes total budget
441    };
442    RetryManager::new(config)
443}
444
445////////////////////////////////////////////////////////////////////////////////
446// Tests
447////////////////////////////////////////////////////////////////////////////////
448
449#[cfg(test)]
450mod test_utils {
451    #[derive(Debug, thiserror::Error)]
452    pub enum TestError {
453        #[error("Retryable error: {0}")]
454        Retryable(String),
455        #[error("Non-retryable error: {0}")]
456        NonRetryable(String),
457        #[error("Timeout error: {0}")]
458        Timeout(String),
459    }
460
461    pub fn should_retry_test_error(error: &TestError) -> bool {
462        matches!(error, TestError::Retryable(_))
463    }
464
465    pub fn create_test_error(msg: String) -> TestError {
466        TestError::Timeout(msg)
467    }
468}
469
470#[cfg(test)]
471mod tests {
472    use std::sync::{
473        Arc,
474        atomic::{AtomicU32, Ordering},
475    };
476
477    use rstest::rstest;
478
479    use super::{test_utils::*, *};
480
481    #[rstest]
482    fn test_retry_config_default() {
483        let config = RetryConfig::default();
484        assert_eq!(config.max_retries, 3);
485        assert_eq!(config.initial_delay_ms, 1_000);
486        assert_eq!(config.max_delay_ms, 10_000);
487        assert_eq!(config.backoff_factor, 2.0);
488        assert_eq!(config.jitter_ms, 100);
489        assert_eq!(config.operation_timeout_ms, Some(30_000));
490        assert!(!config.immediate_first);
491        assert_eq!(config.max_elapsed_ms, None);
492    }
493
494    #[tokio::test]
495    async fn test_retry_manager_success_first_attempt() {
496        let manager = RetryManager::new(RetryConfig::default()).unwrap();
497
498        let result = manager
499            .execute_with_retry(
500                "test_operation",
501                || async { Ok::<i32, TestError>(42) },
502                should_retry_test_error,
503                create_test_error,
504            )
505            .await;
506
507        assert_eq!(result.unwrap(), 42);
508    }
509
510    #[tokio::test]
511    async fn test_retry_manager_non_retryable_error() {
512        let manager = RetryManager::new(RetryConfig::default()).unwrap();
513
514        let result = manager
515            .execute_with_retry(
516                "test_operation",
517                || async { Err::<i32, TestError>(TestError::NonRetryable("test".to_string())) },
518                should_retry_test_error,
519                create_test_error,
520            )
521            .await;
522
523        assert!(result.is_err());
524        assert!(matches!(result.unwrap_err(), TestError::NonRetryable(_)));
525    }
526
527    #[tokio::test]
528    async fn test_retry_manager_retryable_error_exhausted() {
529        let config = RetryConfig {
530            max_retries: 2,
531            initial_delay_ms: 10,
532            max_delay_ms: 50,
533            backoff_factor: 2.0,
534            jitter_ms: 0,
535            operation_timeout_ms: None,
536            immediate_first: false,
537            max_elapsed_ms: None,
538        };
539        let manager = RetryManager::new(config).unwrap();
540
541        let result = manager
542            .execute_with_retry(
543                "test_operation",
544                || async { Err::<i32, TestError>(TestError::Retryable("test".to_string())) },
545                should_retry_test_error,
546                create_test_error,
547            )
548            .await;
549
550        assert!(result.is_err());
551        assert!(matches!(result.unwrap_err(), TestError::Retryable(_)));
552    }
553
554    #[tokio::test]
555    async fn test_timeout_path() {
556        let config = RetryConfig {
557            max_retries: 2,
558            initial_delay_ms: 10,
559            max_delay_ms: 50,
560            backoff_factor: 2.0,
561            jitter_ms: 0,
562            operation_timeout_ms: Some(50),
563            immediate_first: false,
564            max_elapsed_ms: None,
565        };
566        let manager = RetryManager::new(config).unwrap();
567
568        let result = manager
569            .execute_with_retry(
570                "test_timeout",
571                || async {
572                    tokio::time::sleep(Duration::from_millis(100)).await;
573                    Ok::<i32, TestError>(42)
574                },
575                should_retry_test_error,
576                create_test_error,
577            )
578            .await;
579
580        assert!(result.is_err());
581        assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
582    }
583
584    #[tokio::test]
585    async fn test_max_elapsed_time_budget() {
586        let config = RetryConfig {
587            max_retries: 10,
588            initial_delay_ms: 50,
589            max_delay_ms: 100,
590            backoff_factor: 2.0,
591            jitter_ms: 0,
592            operation_timeout_ms: None,
593            immediate_first: false,
594            max_elapsed_ms: Some(200),
595        };
596        let manager = RetryManager::new(config).unwrap();
597
598        let start = tokio::time::Instant::now();
599        let result = manager
600            .execute_with_retry(
601                "test_budget",
602                || async { Err::<i32, TestError>(TestError::Retryable("test".to_string())) },
603                should_retry_test_error,
604                create_test_error,
605            )
606            .await;
607
608        let elapsed = start.elapsed();
609        assert!(result.is_err());
610        assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
611        assert!(elapsed.as_millis() >= 150);
612        assert!(elapsed.as_millis() < 1000);
613    }
614
615    #[rstest]
616    fn test_http_retry_manager_config() {
617        let manager = create_http_retry_manager::<TestError>().unwrap();
618        assert_eq!(manager.config.max_retries, 3);
619        assert!(!manager.config.immediate_first);
620        assert_eq!(manager.config.max_elapsed_ms, Some(180_000));
621    }
622
623    #[rstest]
624    fn test_websocket_retry_manager_config() {
625        let manager = create_websocket_retry_manager::<TestError>().unwrap();
626        assert_eq!(manager.config.max_retries, 5);
627        assert!(manager.config.immediate_first);
628        assert_eq!(manager.config.max_elapsed_ms, Some(120_000));
629    }
630
631    #[tokio::test]
632    async fn test_timeout_respects_retry_predicate() {
633        let config = RetryConfig {
634            max_retries: 3,
635            initial_delay_ms: 10,
636            max_delay_ms: 50,
637            backoff_factor: 2.0,
638            jitter_ms: 0,
639            operation_timeout_ms: Some(50),
640            immediate_first: false,
641            max_elapsed_ms: None,
642        };
643        let manager = RetryManager::new(config).unwrap();
644
645        // Test with retry predicate that rejects timeouts
646        let should_not_retry_timeouts = |error: &TestError| !matches!(error, TestError::Timeout(_));
647
648        let result = manager
649            .execute_with_retry(
650                "test_timeout_non_retryable",
651                || async {
652                    tokio::time::sleep(Duration::from_millis(100)).await;
653                    Ok::<i32, TestError>(42)
654                },
655                should_not_retry_timeouts,
656                create_test_error,
657            )
658            .await;
659
660        // Should fail immediately without retries since timeout is non-retryable
661        assert!(result.is_err());
662        assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
663    }
664
665    #[tokio::test]
666    async fn test_timeout_retries_when_predicate_allows() {
667        let config = RetryConfig {
668            max_retries: 2,
669            initial_delay_ms: 10,
670            max_delay_ms: 50,
671            backoff_factor: 2.0,
672            jitter_ms: 0,
673            operation_timeout_ms: Some(50),
674            immediate_first: false,
675            max_elapsed_ms: None,
676        };
677        let manager = RetryManager::new(config).unwrap();
678
679        // Test with retry predicate that allows timeouts
680        let should_retry_timeouts = |error: &TestError| matches!(error, TestError::Timeout(_));
681
682        let start = tokio::time::Instant::now();
683        let result = manager
684            .execute_with_retry(
685                "test_timeout_retryable",
686                || async {
687                    tokio::time::sleep(Duration::from_millis(100)).await;
688                    Ok::<i32, TestError>(42)
689                },
690                should_retry_timeouts,
691                create_test_error,
692            )
693            .await;
694
695        let elapsed = start.elapsed();
696
697        // Should fail after retries (not immediately)
698        assert!(result.is_err());
699        assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
700        // Should have taken time for retries (at least 2 timeouts + delays)
701        assert!(elapsed.as_millis() > 80); // More than just one timeout
702    }
703
704    #[tokio::test]
705    async fn test_successful_retry_after_failures() {
706        let config = RetryConfig {
707            max_retries: 3,
708            initial_delay_ms: 10,
709            max_delay_ms: 50,
710            backoff_factor: 2.0,
711            jitter_ms: 0,
712            operation_timeout_ms: None,
713            immediate_first: false,
714            max_elapsed_ms: None,
715        };
716        let manager = RetryManager::new(config).unwrap();
717
718        let attempt_counter = Arc::new(AtomicU32::new(0));
719        let counter_clone = attempt_counter.clone();
720
721        let result = manager
722            .execute_with_retry(
723                "test_eventual_success",
724                move || {
725                    let counter = counter_clone.clone();
726                    async move {
727                        let attempts = counter.fetch_add(1, Ordering::SeqCst);
728                        if attempts < 2 {
729                            Err(TestError::Retryable("temporary failure".to_string()))
730                        } else {
731                            Ok(42)
732                        }
733                    }
734                },
735                should_retry_test_error,
736                create_test_error,
737            )
738            .await;
739
740        assert_eq!(result.unwrap(), 42);
741        assert_eq!(attempt_counter.load(Ordering::SeqCst), 3);
742    }
743
744    #[tokio::test]
745    #[ignore = "Non-deterministic timing test - TODO: Convert to use deterministic tokio time"]
746    async fn test_immediate_first_retry() {
747        let config = RetryConfig {
748            max_retries: 2,
749            initial_delay_ms: 100,
750            max_delay_ms: 200,
751            backoff_factor: 2.0,
752            jitter_ms: 0,
753            operation_timeout_ms: None,
754            immediate_first: true,
755            max_elapsed_ms: None,
756        };
757        let manager = RetryManager::new(config).unwrap();
758
759        let attempt_times = Arc::new(std::sync::Mutex::new(Vec::new()));
760        let times_clone = attempt_times.clone();
761        let start = tokio::time::Instant::now();
762
763        let _result = manager
764            .execute_with_retry(
765                "test_immediate",
766                move || {
767                    let times = times_clone.clone();
768                    async move {
769                        times.lock().unwrap().push(start.elapsed());
770                        Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
771                    }
772                },
773                should_retry_test_error,
774                create_test_error,
775            )
776            .await;
777
778        let times = attempt_times.lock().unwrap();
779        assert_eq!(times.len(), 3); // Initial + 2 retries
780
781        // First retry should be immediate (within 10ms)
782        assert!(times[1].as_millis() < 10);
783        // Second retry should have delay (at least 100ms from first retry)
784        assert!(times[2].as_millis() >= 100);
785    }
786
787    #[tokio::test]
788    async fn test_operation_without_timeout() {
789        let config = RetryConfig {
790            max_retries: 2,
791            initial_delay_ms: 10,
792            max_delay_ms: 50,
793            backoff_factor: 2.0,
794            jitter_ms: 0,
795            operation_timeout_ms: None, // No timeout
796            immediate_first: false,
797            max_elapsed_ms: None,
798        };
799        let manager = RetryManager::new(config).unwrap();
800
801        let start = tokio::time::Instant::now();
802        let result = manager
803            .execute_with_retry(
804                "test_no_timeout",
805                || async {
806                    tokio::time::sleep(Duration::from_millis(50)).await;
807                    Ok::<i32, TestError>(42)
808                },
809                should_retry_test_error,
810                create_test_error,
811            )
812            .await;
813
814        let elapsed = start.elapsed();
815        assert_eq!(result.unwrap(), 42);
816        // Should complete without timing out
817        assert!(elapsed.as_millis() >= 30);
818        assert!(elapsed.as_millis() < 200);
819    }
820
821    #[tokio::test]
822    async fn test_zero_retries() {
823        let config = RetryConfig {
824            max_retries: 0,
825            initial_delay_ms: 10,
826            max_delay_ms: 50,
827            backoff_factor: 2.0,
828            jitter_ms: 0,
829            operation_timeout_ms: None,
830            immediate_first: false,
831            max_elapsed_ms: None,
832        };
833        let manager = RetryManager::new(config).unwrap();
834
835        let attempt_counter = Arc::new(AtomicU32::new(0));
836        let counter_clone = attempt_counter.clone();
837
838        let result = manager
839            .execute_with_retry(
840                "test_no_retries",
841                move || {
842                    let counter = counter_clone.clone();
843                    async move {
844                        counter.fetch_add(1, Ordering::SeqCst);
845                        Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
846                    }
847                },
848                should_retry_test_error,
849                create_test_error,
850            )
851            .await;
852
853        assert!(result.is_err());
854        // Should only attempt once (no retries)
855        assert_eq!(attempt_counter.load(Ordering::SeqCst), 1);
856    }
857
858    #[tokio::test]
859    #[ignore = "Non-deterministic timing test - TODO: Convert to use deterministic tokio time"]
860    async fn test_jitter_applied() {
861        let config = RetryConfig {
862            max_retries: 2,
863            initial_delay_ms: 50,
864            max_delay_ms: 100,
865            backoff_factor: 2.0,
866            jitter_ms: 50, // Significant jitter
867            operation_timeout_ms: None,
868            immediate_first: false,
869            max_elapsed_ms: None,
870        };
871        let manager = RetryManager::new(config).unwrap();
872
873        let delays = Arc::new(std::sync::Mutex::new(Vec::new()));
874        let delays_clone = delays.clone();
875        let last_time = Arc::new(std::sync::Mutex::new(tokio::time::Instant::now()));
876        let last_time_clone = last_time.clone();
877
878        let _result = manager
879            .execute_with_retry(
880                "test_jitter",
881                move || {
882                    let delays = delays_clone.clone();
883                    let last_time = last_time_clone.clone();
884                    async move {
885                        let now = tokio::time::Instant::now();
886                        let delay = {
887                            let mut last = last_time.lock().unwrap();
888                            let d = now.duration_since(*last);
889                            *last = now;
890                            d
891                        };
892                        delays.lock().unwrap().push(delay);
893                        Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
894                    }
895                },
896                should_retry_test_error,
897                create_test_error,
898            )
899            .await;
900
901        let delays = delays.lock().unwrap();
902        // Skip the first delay (initial attempt)
903        for delay in delays.iter().skip(1) {
904            // Each delay should be at least the base delay (50ms for first retry)
905            assert!(delay.as_millis() >= 50);
906            // But no more than base + jitter (100ms for first retry)
907            assert!(delay.as_millis() <= 150);
908        }
909    }
910
911    #[tokio::test]
912    async fn test_max_elapsed_stops_early() {
913        let config = RetryConfig {
914            max_retries: 100, // Very high retry count
915            initial_delay_ms: 50,
916            max_delay_ms: 100,
917            backoff_factor: 1.5,
918            jitter_ms: 0,
919            operation_timeout_ms: None,
920            immediate_first: false,
921            max_elapsed_ms: Some(150), // Should stop after ~3 attempts
922        };
923        let manager = RetryManager::new(config).unwrap();
924
925        let attempt_counter = Arc::new(AtomicU32::new(0));
926        let counter_clone = attempt_counter.clone();
927
928        let start = tokio::time::Instant::now();
929        let result = manager
930            .execute_with_retry(
931                "test_elapsed_limit",
932                move || {
933                    let counter = counter_clone.clone();
934                    async move {
935                        counter.fetch_add(1, Ordering::SeqCst);
936                        Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
937                    }
938                },
939                should_retry_test_error,
940                create_test_error,
941            )
942            .await;
943
944        let elapsed = start.elapsed();
945        assert!(result.is_err());
946        assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
947
948        // Should have stopped due to time limit, not retry count
949        let attempts = attempt_counter.load(Ordering::SeqCst);
950        assert!(attempts < 10); // Much less than max_retries
951        assert!(elapsed.as_millis() >= 100);
952    }
953
954    #[tokio::test]
955    async fn test_mixed_errors_retry_behavior() {
956        let config = RetryConfig {
957            max_retries: 5,
958            initial_delay_ms: 10,
959            max_delay_ms: 50,
960            backoff_factor: 2.0,
961            jitter_ms: 0,
962            operation_timeout_ms: None,
963            immediate_first: false,
964            max_elapsed_ms: None,
965        };
966        let manager = RetryManager::new(config).unwrap();
967
968        let attempt_counter = Arc::new(AtomicU32::new(0));
969        let counter_clone = attempt_counter.clone();
970
971        let result = manager
972            .execute_with_retry(
973                "test_mixed_errors",
974                move || {
975                    let counter = counter_clone.clone();
976                    async move {
977                        let attempts = counter.fetch_add(1, Ordering::SeqCst);
978                        match attempts {
979                            0 => Err(TestError::Retryable("retry 1".to_string())),
980                            1 => Err(TestError::Retryable("retry 2".to_string())),
981                            2 => Err(TestError::NonRetryable("stop here".to_string())),
982                            _ => Ok(42),
983                        }
984                    }
985                },
986                should_retry_test_error,
987                create_test_error,
988            )
989            .await;
990
991        assert!(result.is_err());
992        assert!(matches!(result.unwrap_err(), TestError::NonRetryable(_)));
993        // Should stop at the non-retryable error
994        assert_eq!(attempt_counter.load(Ordering::SeqCst), 3);
995    }
996
997    #[tokio::test]
998    async fn test_cancellation_during_retry_delay() {
999        use tokio_util::sync::CancellationToken;
1000
1001        let config = RetryConfig {
1002            max_retries: 10,
1003            initial_delay_ms: 500, // Long delay to ensure cancellation happens during sleep
1004            max_delay_ms: 1000,
1005            backoff_factor: 2.0,
1006            jitter_ms: 0,
1007            operation_timeout_ms: None,
1008            immediate_first: false,
1009            max_elapsed_ms: None,
1010        };
1011        let manager = RetryManager::new(config).unwrap();
1012
1013        let token = CancellationToken::new();
1014        let token_clone = token.clone();
1015
1016        // Cancel after a short delay
1017        tokio::spawn(async move {
1018            tokio::time::sleep(Duration::from_millis(100)).await;
1019            token_clone.cancel();
1020        });
1021
1022        let attempt_counter = Arc::new(AtomicU32::new(0));
1023        let counter_clone = attempt_counter.clone();
1024
1025        let start = tokio::time::Instant::now();
1026        let result = manager
1027            .execute_with_retry_with_cancel(
1028                "test_cancellation",
1029                move || {
1030                    let counter = counter_clone.clone();
1031                    async move {
1032                        counter.fetch_add(1, Ordering::SeqCst);
1033                        Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1034                    }
1035                },
1036                should_retry_test_error,
1037                create_test_error,
1038                &token,
1039            )
1040            .await;
1041
1042        let elapsed = start.elapsed();
1043
1044        // Should be canceled quickly
1045        assert!(result.is_err());
1046        let error_msg = format!("{}", result.unwrap_err());
1047        assert!(error_msg.contains("canceled"));
1048
1049        // Should not have taken the full delay time
1050        assert!(elapsed.as_millis() < 600);
1051
1052        // Should have made at least one attempt
1053        let attempts = attempt_counter.load(Ordering::SeqCst);
1054        assert!(attempts >= 1);
1055    }
1056
1057    #[tokio::test]
1058    async fn test_cancellation_during_operation_execution() {
1059        use tokio_util::sync::CancellationToken;
1060
1061        let config = RetryConfig {
1062            max_retries: 5,
1063            initial_delay_ms: 50,
1064            max_delay_ms: 100,
1065            backoff_factor: 2.0,
1066            jitter_ms: 0,
1067            operation_timeout_ms: None,
1068            immediate_first: false,
1069            max_elapsed_ms: None,
1070        };
1071        let manager = RetryManager::new(config).unwrap();
1072
1073        let token = CancellationToken::new();
1074        let token_clone = token.clone();
1075
1076        // Cancel after a short delay
1077        tokio::spawn(async move {
1078            tokio::time::sleep(Duration::from_millis(50)).await;
1079            token_clone.cancel();
1080        });
1081
1082        let start = tokio::time::Instant::now();
1083        let result = manager
1084            .execute_with_retry_with_cancel(
1085                "test_cancellation_during_op",
1086                || async {
1087                    // Long-running operation
1088                    tokio::time::sleep(Duration::from_millis(200)).await;
1089                    Ok::<i32, TestError>(42)
1090                },
1091                should_retry_test_error,
1092                create_test_error,
1093                &token,
1094            )
1095            .await;
1096
1097        let elapsed = start.elapsed();
1098
1099        // Should be canceled during the operation
1100        assert!(result.is_err());
1101        let error_msg = format!("{}", result.unwrap_err());
1102        assert!(error_msg.contains("canceled"));
1103
1104        // Should not have completed the long operation
1105        assert!(elapsed.as_millis() < 250);
1106    }
1107
1108    #[tokio::test]
1109    async fn test_cancellation_error_message() {
1110        use tokio_util::sync::CancellationToken;
1111
1112        let config = RetryConfig::default();
1113        let manager = RetryManager::new(config).unwrap();
1114
1115        let token = CancellationToken::new();
1116        token.cancel(); // Pre-cancel for immediate cancellation
1117
1118        let result = manager
1119            .execute_with_retry_with_cancel(
1120                "test_operation",
1121                || async { Ok::<i32, TestError>(42) },
1122                should_retry_test_error,
1123                create_test_error,
1124                &token,
1125            )
1126            .await;
1127
1128        assert!(result.is_err());
1129        let error_msg = format!("{}", result.unwrap_err());
1130        assert!(error_msg.contains("canceled"));
1131    }
1132}
1133
1134#[cfg(test)]
1135mod proptest_tests {
1136    use std::sync::{
1137        Arc,
1138        atomic::{AtomicU32, Ordering},
1139    };
1140
1141    use proptest::prelude::*;
1142    // Import rstest attribute macro used within proptest! tests
1143    use rstest::rstest;
1144
1145    use super::{test_utils::*, *};
1146
1147    proptest! {
1148        #[rstest]
1149        fn test_retry_config_valid_ranges(
1150            max_retries in 0u32..100,
1151            initial_delay_ms in 1u64..10_000,
1152            max_delay_ms in 1u64..60_000,
1153            backoff_factor in 1.0f64..10.0,
1154            jitter_ms in 0u64..1_000,
1155            operation_timeout_ms in prop::option::of(1u64..120_000),
1156            immediate_first in any::<bool>(),
1157            max_elapsed_ms in prop::option::of(1u64..300_000)
1158        ) {
1159            // Ensure max_delay >= initial_delay for valid config
1160            let max_delay_ms = max_delay_ms.max(initial_delay_ms);
1161
1162            let config = RetryConfig {
1163                max_retries,
1164                initial_delay_ms,
1165                max_delay_ms,
1166                backoff_factor,
1167                jitter_ms,
1168                operation_timeout_ms,
1169                immediate_first,
1170                max_elapsed_ms,
1171            };
1172
1173            // Should always be able to create a RetryManager with valid config
1174            let manager = RetryManager::<std::io::Error>::new(config);
1175            prop_assert!(manager.is_ok());
1176        }
1177
1178        #[rstest]
1179        fn test_retry_attempts_bounded(
1180            max_retries in 0u32..5,
1181            initial_delay_ms in 1u64..10,
1182            backoff_factor in 1.0f64..2.0,
1183        ) {
1184            let rt = tokio::runtime::Builder::new_current_thread()
1185                .enable_time()
1186                .build()
1187                .unwrap();
1188
1189            let config = RetryConfig {
1190                max_retries,
1191                initial_delay_ms,
1192                max_delay_ms: initial_delay_ms * 2,
1193                backoff_factor,
1194                jitter_ms: 0,
1195                operation_timeout_ms: None,
1196                immediate_first: false,
1197                max_elapsed_ms: None,
1198            };
1199
1200            let manager = RetryManager::new(config).unwrap();
1201            let attempt_counter = Arc::new(AtomicU32::new(0));
1202            let counter_clone = attempt_counter.clone();
1203
1204            let _result = rt.block_on(manager.execute_with_retry(
1205                "prop_test",
1206                move || {
1207                    let counter = counter_clone.clone();
1208                    async move {
1209                        counter.fetch_add(1, Ordering::SeqCst);
1210                        Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1211                    }
1212                },
1213                |e: &TestError| matches!(e, TestError::Retryable(_)),
1214                TestError::Timeout,
1215            ));
1216
1217            let attempts = attempt_counter.load(Ordering::SeqCst);
1218            // Total attempts should be 1 (initial) + max_retries
1219            prop_assert_eq!(attempts, max_retries + 1);
1220        }
1221
1222        #[rstest]
1223        fn test_timeout_always_respected(
1224            timeout_ms in 10u64..50,
1225            operation_delay_ms in 60u64..100,
1226        ) {
1227            let rt = tokio::runtime::Builder::new_current_thread()
1228                .enable_time()
1229                .start_paused(true)
1230                .build()
1231                .unwrap();
1232
1233            let config = RetryConfig {
1234                max_retries: 0, // No retries to isolate timeout behavior
1235                initial_delay_ms: 10,
1236                max_delay_ms: 100,
1237                backoff_factor: 2.0,
1238                jitter_ms: 0,
1239                operation_timeout_ms: Some(timeout_ms),
1240                immediate_first: false,
1241                max_elapsed_ms: None,
1242            };
1243
1244            let manager = RetryManager::new(config).unwrap();
1245
1246            let result = rt.block_on(async {
1247                let operation_future = manager.execute_with_retry(
1248                    "timeout_test",
1249                    move || async move {
1250                        tokio::time::sleep(Duration::from_millis(operation_delay_ms)).await;
1251                        Ok::<i32, TestError>(42)
1252                    },
1253                    |_: &TestError| true,
1254                    TestError::Timeout,
1255                );
1256
1257                // Advance time to trigger timeout
1258                tokio::time::advance(Duration::from_millis(timeout_ms + 10)).await;
1259                operation_future.await
1260            });
1261
1262            // Operation should timeout
1263            prop_assert!(result.is_err());
1264            prop_assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
1265        }
1266
1267        #[rstest]
1268        fn test_max_elapsed_always_respected(
1269            max_elapsed_ms in 20u64..50,
1270            delay_per_retry in 15u64..30,
1271            max_retries in 10u32..20,
1272        ) {
1273            let rt = tokio::runtime::Builder::new_current_thread()
1274                .enable_time()
1275                .start_paused(true)
1276                .build()
1277                .unwrap();
1278
1279            // Set up config where we would exceed max_elapsed_ms before max_retries
1280            let config = RetryConfig {
1281                max_retries,
1282                initial_delay_ms: delay_per_retry,
1283                max_delay_ms: delay_per_retry * 2,
1284                backoff_factor: 1.0, // No backoff to make timing predictable
1285                jitter_ms: 0,
1286                operation_timeout_ms: None,
1287                immediate_first: false,
1288                max_elapsed_ms: Some(max_elapsed_ms),
1289            };
1290
1291            let manager = RetryManager::new(config).unwrap();
1292            let attempt_counter = Arc::new(AtomicU32::new(0));
1293            let counter_clone = attempt_counter.clone();
1294
1295            let result = rt.block_on(async {
1296                let operation_future = manager.execute_with_retry(
1297                    "elapsed_test",
1298                    move || {
1299                        let counter = counter_clone.clone();
1300                        async move {
1301                            counter.fetch_add(1, Ordering::SeqCst);
1302                            Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1303                        }
1304                    },
1305                    |e: &TestError| matches!(e, TestError::Retryable(_)),
1306                    TestError::Timeout,
1307                );
1308
1309                // Advance time past max_elapsed_ms
1310                tokio::time::advance(Duration::from_millis(max_elapsed_ms + delay_per_retry)).await;
1311                operation_future.await
1312            });
1313
1314            let attempts = attempt_counter.load(Ordering::SeqCst);
1315
1316            // Should have failed with timeout error
1317            prop_assert!(result.is_err());
1318            prop_assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
1319
1320            // Should have stopped before exhausting all retries
1321            prop_assert!(attempts <= max_retries + 1);
1322        }
1323
1324        #[rstest]
1325        #[ignore = "Non-deterministic timing test - TODO: Convert to use deterministic tokio time"]
1326        fn test_jitter_bounds(
1327            jitter_ms in 0u64..20,
1328            base_delay_ms in 10u64..30,
1329        ) {
1330            let rt = tokio::runtime::Builder::new_current_thread()
1331                .enable_time()
1332                .build()
1333                .unwrap();
1334
1335            let config = RetryConfig {
1336                max_retries: 2,
1337                initial_delay_ms: base_delay_ms,
1338                max_delay_ms: base_delay_ms * 2,
1339                backoff_factor: 1.0, // No backoff to isolate jitter
1340                jitter_ms,
1341                operation_timeout_ms: None,
1342                immediate_first: false,
1343                max_elapsed_ms: None,
1344            };
1345
1346            let manager = RetryManager::new(config).unwrap();
1347            let attempt_times = Arc::new(std::sync::Mutex::new(Vec::new()));
1348            let times_clone = attempt_times.clone();
1349            let start_time = std::time::Instant::now();
1350
1351            let _result = rt.block_on(manager.execute_with_retry(
1352                "jitter_test",
1353                move || {
1354                    let times = times_clone.clone();
1355                    async move {
1356                        times.lock().unwrap().push(start_time.elapsed());
1357                        Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1358                    }
1359                },
1360                |e: &TestError| matches!(e, TestError::Retryable(_)),
1361                TestError::Timeout,
1362            ));
1363
1364            let times = attempt_times.lock().unwrap();
1365
1366            // We expect at least 2 attempts total (initial + at least 1 retry)
1367            prop_assert!(times.len() >= 2);
1368
1369            // First attempt should be immediate (no delay)
1370            prop_assert!(times[0].as_millis() < 5);
1371
1372            // Check subsequent retries have appropriate delays
1373            for i in 1..times.len() {
1374                let delay_from_previous = if i == 1 {
1375                    times[i] - times[0]
1376                } else {
1377                    times[i] - times[i - 1]
1378                };
1379
1380                // The delay should be at least base_delay_ms
1381                prop_assert!(
1382                    delay_from_previous.as_millis() >= base_delay_ms as u128,
1383                    "Retry {} delay {}ms is less than base {}ms",
1384                    i, delay_from_previous.as_millis(), base_delay_ms
1385                );
1386
1387                // Delay should be at most base_delay + jitter
1388                prop_assert!(
1389                    delay_from_previous.as_millis() <= (base_delay_ms + jitter_ms + 5) as u128,
1390                    "Retry {} delay {}ms exceeds base {} + jitter {}",
1391                    i, delay_from_previous.as_millis(), base_delay_ms, jitter_ms
1392                );
1393            }
1394        }
1395
1396        #[rstest]
1397        #[ignore = "Non-deterministic timing test - TODO: Convert to use deterministic tokio time"]
1398        fn test_immediate_first_property(
1399            immediate_first in any::<bool>(),
1400            initial_delay_ms in 10u64..30,
1401        ) {
1402            let rt = tokio::runtime::Builder::new_current_thread()
1403                .enable_time()
1404                .build()
1405                .unwrap();
1406
1407            let config = RetryConfig {
1408                max_retries: 2,
1409                initial_delay_ms,
1410                max_delay_ms: initial_delay_ms * 2,
1411                backoff_factor: 2.0,
1412                jitter_ms: 0,
1413                operation_timeout_ms: None,
1414                immediate_first,
1415                max_elapsed_ms: None,
1416            };
1417
1418            let manager = RetryManager::new(config).unwrap();
1419            let attempt_times = Arc::new(std::sync::Mutex::new(Vec::new()));
1420            let times_clone = attempt_times.clone();
1421
1422            let start = std::time::Instant::now();
1423            let _result = rt.block_on(manager.execute_with_retry(
1424                "immediate_test",
1425                move || {
1426                    let times = times_clone.clone();
1427                    async move {
1428                        let elapsed = start.elapsed();
1429                        times.lock().unwrap().push(elapsed);
1430                        Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1431                    }
1432                },
1433                |e: &TestError| matches!(e, TestError::Retryable(_)),
1434                TestError::Timeout,
1435            ));
1436
1437            let times = attempt_times.lock().unwrap();
1438            prop_assert!(times.len() >= 2);
1439
1440            if immediate_first {
1441                // First retry should be immediate
1442                prop_assert!(times[1].as_millis() < 20,
1443                    "With immediate_first=true, first retry took {}ms",
1444                    times[1].as_millis());
1445            } else {
1446                // First retry should have delay
1447                prop_assert!(times[1].as_millis() >= (initial_delay_ms - 10) as u128,
1448                    "With immediate_first=false, first retry was too fast: {}ms",
1449                    times[1].as_millis());
1450            }
1451        }
1452
1453        #[rstest]
1454        fn test_non_retryable_stops_immediately(
1455            attempt_before_non_retryable in 0usize..3,
1456            max_retries in 3u32..5,
1457        ) {
1458            let rt = tokio::runtime::Builder::new_current_thread()
1459                .enable_time()
1460                .build()
1461                .unwrap();
1462
1463            let config = RetryConfig {
1464                max_retries,
1465                initial_delay_ms: 10,
1466                max_delay_ms: 100,
1467                backoff_factor: 2.0,
1468                jitter_ms: 0,
1469                operation_timeout_ms: None,
1470                immediate_first: false,
1471                max_elapsed_ms: None,
1472            };
1473
1474            let manager = RetryManager::new(config).unwrap();
1475            let attempt_counter = Arc::new(AtomicU32::new(0));
1476            let counter_clone = attempt_counter.clone();
1477
1478            let result: Result<i32, TestError> = rt.block_on(manager.execute_with_retry(
1479                "non_retryable_test",
1480                move || {
1481                    let counter = counter_clone.clone();
1482                    async move {
1483                        let attempts = counter.fetch_add(1, Ordering::SeqCst) as usize;
1484                        if attempts == attempt_before_non_retryable {
1485                            Err(TestError::NonRetryable("stop".to_string()))
1486                        } else {
1487                            Err(TestError::Retryable("retry".to_string()))
1488                        }
1489                    }
1490                },
1491                |e: &TestError| matches!(e, TestError::Retryable(_)),
1492                TestError::Timeout,
1493            ));
1494
1495            let attempts = attempt_counter.load(Ordering::SeqCst) as usize;
1496
1497            prop_assert!(result.is_err());
1498            prop_assert!(matches!(result.unwrap_err(), TestError::NonRetryable(_)));
1499            // Should stop exactly when non-retryable error occurs
1500            prop_assert_eq!(attempts, attempt_before_non_retryable + 1);
1501        }
1502
1503        #[rstest]
1504        fn test_cancellation_stops_immediately(
1505            cancel_after_ms in 10u64..100,
1506            initial_delay_ms in 200u64..500,
1507        ) {
1508            use tokio_util::sync::CancellationToken;
1509
1510            let rt = tokio::runtime::Builder::new_current_thread()
1511                .enable_time()
1512                .start_paused(true)
1513                .build()
1514                .unwrap();
1515
1516            let config = RetryConfig {
1517                max_retries: 10,
1518                initial_delay_ms,
1519                max_delay_ms: initial_delay_ms * 2,
1520                backoff_factor: 2.0,
1521                jitter_ms: 0,
1522                operation_timeout_ms: None,
1523                immediate_first: false,
1524                max_elapsed_ms: None,
1525            };
1526
1527            let manager = RetryManager::new(config).unwrap();
1528            let token = CancellationToken::new();
1529            let token_clone = token.clone();
1530
1531            let result: Result<i32, TestError> = rt.block_on(async {
1532                // Spawn cancellation task
1533                tokio::spawn(async move {
1534                    tokio::time::sleep(Duration::from_millis(cancel_after_ms)).await;
1535                    token_clone.cancel();
1536                });
1537
1538                let operation_future = manager.execute_with_retry_with_cancel(
1539                    "cancellation_test",
1540                    || async {
1541                        Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1542                    },
1543                    |e: &TestError| matches!(e, TestError::Retryable(_)),
1544                    create_test_error,
1545                    &token,
1546                );
1547
1548                // Advance time to trigger cancellation
1549                tokio::time::advance(Duration::from_millis(cancel_after_ms + 10)).await;
1550                operation_future.await
1551            });
1552
1553            // Should be canceled
1554            prop_assert!(result.is_err());
1555            let error_msg = format!("{}", result.unwrap_err());
1556            prop_assert!(error_msg.contains("canceled"));
1557        }
1558
1559        #[rstest]
1560        fn test_budget_clamp_prevents_overshoot(
1561            max_elapsed_ms in 10u64..30,
1562            delay_per_retry in 20u64..50,
1563        ) {
1564            let rt = tokio::runtime::Builder::new_current_thread()
1565                .enable_time()
1566                .start_paused(true)
1567                .build()
1568                .unwrap();
1569
1570            // Configure so that first retry delay would exceed budget
1571            let config = RetryConfig {
1572                max_retries: 5,
1573                initial_delay_ms: delay_per_retry,
1574                max_delay_ms: delay_per_retry * 2,
1575                backoff_factor: 1.0,
1576                jitter_ms: 0,
1577                operation_timeout_ms: None,
1578                immediate_first: false,
1579                max_elapsed_ms: Some(max_elapsed_ms),
1580            };
1581
1582            let manager = RetryManager::new(config).unwrap();
1583
1584            let _result = rt.block_on(async {
1585                let operation_future = manager.execute_with_retry(
1586                    "budget_clamp_test",
1587                    || async {
1588                        // Fast operation to focus on delay timing
1589                        Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1590                    },
1591                    |e: &TestError| matches!(e, TestError::Retryable(_)),
1592                    create_test_error,
1593                );
1594
1595                // Advance time past max_elapsed_ms
1596                tokio::time::advance(Duration::from_millis(max_elapsed_ms + delay_per_retry)).await;
1597                operation_future.await
1598            });
1599
1600            // With deterministic time, operation completes without wall-clock delay
1601            // The budget constraint is still enforced by the retry manager
1602        }
1603
1604        #[rstest]
1605        fn test_success_on_kth_attempt(
1606            k in 1usize..5,
1607            initial_delay_ms in 5u64..20,
1608        ) {
1609            let rt = tokio::runtime::Builder::new_current_thread()
1610                .enable_time()
1611                .start_paused(true)
1612                .build()
1613                .unwrap();
1614
1615            let config = RetryConfig {
1616                max_retries: 10, // More than k
1617                initial_delay_ms,
1618                max_delay_ms: initial_delay_ms * 4,
1619                backoff_factor: 2.0,
1620                jitter_ms: 0,
1621                operation_timeout_ms: None,
1622                immediate_first: false,
1623                max_elapsed_ms: None,
1624            };
1625
1626            let manager = RetryManager::new(config).unwrap();
1627            let attempt_counter = Arc::new(AtomicU32::new(0));
1628            let counter_clone = attempt_counter.clone();
1629            let target_k = k;
1630
1631            let (result, _elapsed) = rt.block_on(async {
1632                let start = tokio::time::Instant::now();
1633
1634                let operation_future = manager.execute_with_retry(
1635                    "kth_attempt_test",
1636                    move || {
1637                        let counter = counter_clone.clone();
1638                        async move {
1639                            let attempt = counter.fetch_add(1, Ordering::SeqCst) as usize;
1640                            if attempt + 1 == target_k {
1641                                Ok(42)
1642                            } else {
1643                                Err(TestError::Retryable("retry".to_string()))
1644                            }
1645                        }
1646                    },
1647                    |e: &TestError| matches!(e, TestError::Retryable(_)),
1648                    create_test_error,
1649                );
1650
1651                // Advance time to allow enough retries
1652                for _ in 0..k {
1653                    tokio::time::advance(Duration::from_millis(initial_delay_ms * 4)).await;
1654                }
1655
1656                let result = operation_future.await;
1657                let elapsed = start.elapsed();
1658
1659                (result, elapsed)
1660            });
1661
1662            let attempts = attempt_counter.load(Ordering::SeqCst) as usize;
1663
1664            // Using paused Tokio time (start_paused + advance); assert behavior only (no wall-clock timing)
1665            prop_assert!(result.is_ok());
1666            prop_assert_eq!(result.unwrap(), 42);
1667            prop_assert_eq!(attempts, k);
1668        }
1669    }
1670}