nautilus_network/
retry.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Generic retry mechanism for network operations.
17
18use std::{future::Future, marker::PhantomData, time::Duration};
19
20use tokio_util::sync::CancellationToken;
21
22use crate::backoff::ExponentialBackoff;
23
24/// Configuration for retry behavior.
25#[derive(Debug, Clone)]
26pub struct RetryConfig {
27    /// Maximum number of retry attempts (total attempts = 1 initial + max_retries).
28    pub max_retries: u32,
29    /// Initial delay between retries in milliseconds.
30    pub initial_delay_ms: u64,
31    /// Maximum delay between retries in milliseconds.
32    pub max_delay_ms: u64,
33    /// Backoff multiplier factor.
34    pub backoff_factor: f64,
35    /// Maximum jitter in milliseconds to add to delays.
36    pub jitter_ms: u64,
37    /// Optional timeout for individual operations in milliseconds.
38    /// If None, no timeout is applied.
39    pub operation_timeout_ms: Option<u64>,
40    /// Whether the first retry should happen immediately without delay.
41    /// Should be false for HTTP/order operations, true for connection operations.
42    pub immediate_first: bool,
43    /// Optional maximum total elapsed time for all retries in milliseconds.
44    /// If exceeded, retries stop even if max_retries hasn't been reached.
45    pub max_elapsed_ms: Option<u64>,
46}
47
48impl Default for RetryConfig {
49    fn default() -> Self {
50        Self {
51            max_retries: 3,
52            initial_delay_ms: 1_000,
53            max_delay_ms: 10_000,
54            backoff_factor: 2.0,
55            jitter_ms: 100,
56            operation_timeout_ms: Some(30_000),
57            immediate_first: false,
58            max_elapsed_ms: None,
59        }
60    }
61}
62
63/// Generic retry manager for network operations.
64///
65/// Stateless and thread-safe - each operation maintains its own backoff state.
66#[derive(Clone, Debug)]
67pub struct RetryManager<E> {
68    config: RetryConfig,
69    _phantom: PhantomData<E>,
70}
71
72impl<E> RetryManager<E>
73where
74    E: std::error::Error,
75{
76    /// Creates a new retry manager with the given configuration.
77    ///
78    /// # Errors
79    ///
80    /// This function will return an error if the configuration is invalid.
81    pub fn new(config: RetryConfig) -> anyhow::Result<Self> {
82        Ok(Self {
83            config,
84            _phantom: PhantomData,
85        })
86    }
87
88    /// Executes an operation with retry logic and optional cancellation.
89    ///
90    /// # Errors
91    ///
92    /// This function will return an error if the operation fails after exhausting all retries,
93    /// if the operation times out, if creating the backoff state fails, or if canceled.
94    pub async fn execute_with_retry_inner<F, Fut, T>(
95        &self,
96        operation_name: &str,
97        mut operation: F,
98        should_retry: impl Fn(&E) -> bool,
99        create_error: impl Fn(String) -> E,
100        cancel: Option<&CancellationToken>,
101    ) -> Result<T, E>
102    where
103        F: FnMut() -> Fut,
104        Fut: Future<Output = Result<T, E>>,
105    {
106        let mut backoff = ExponentialBackoff::new(
107            Duration::from_millis(self.config.initial_delay_ms),
108            Duration::from_millis(self.config.max_delay_ms),
109            self.config.backoff_factor,
110            self.config.jitter_ms,
111            self.config.immediate_first,
112        )
113        .map_err(|e| create_error(format!("Invalid configuration: {e}")))?;
114
115        let mut attempt = 0;
116        let start_time = tokio::time::Instant::now();
117
118        loop {
119            if let Some(token) = cancel
120                && token.is_cancelled()
121            {
122                tracing::debug!(
123                    operation = %operation_name,
124                    attempts = attempt,
125                    "Operation canceled"
126                );
127                return Err(create_error("canceled".to_string()));
128            }
129
130            if let Some(max_elapsed_ms) = self.config.max_elapsed_ms {
131                let elapsed = start_time.elapsed();
132                if elapsed.as_millis() >= max_elapsed_ms as u128 {
133                    let timeout_error = create_error("Budget exceeded".to_string());
134                    tracing::trace!(
135                        operation = %operation_name,
136                        attempts = attempt + 1,
137                        budget_ms = max_elapsed_ms,
138                        "Retry budget exceeded"
139                    );
140                    return Err(timeout_error);
141                }
142            }
143
144            let result = match (self.config.operation_timeout_ms, cancel) {
145                (Some(timeout_ms), Some(token)) => {
146                    tokio::select! {
147                        result = tokio::time::timeout(Duration::from_millis(timeout_ms), operation()) => result,
148                        _ = token.cancelled() => {
149                            tracing::debug!(
150                                operation = %operation_name,
151                                "Operation canceled during execution"
152                            );
153                            return Err(create_error("canceled".to_string()));
154                        }
155                    }
156                }
157                (Some(timeout_ms), None) => {
158                    tokio::time::timeout(Duration::from_millis(timeout_ms), operation()).await
159                }
160                (None, Some(token)) => {
161                    tokio::select! {
162                        result = operation() => Ok(result),
163                        _ = token.cancelled() => {
164                            tracing::debug!(
165                                operation = %operation_name,
166                                "Operation canceled during execution"
167                            );
168                            return Err(create_error("canceled".to_string()));
169                        }
170                    }
171                }
172                (None, None) => Ok(operation().await),
173            };
174
175            match result {
176                Ok(Ok(success)) => {
177                    if attempt > 0 {
178                        tracing::trace!(
179                            operation = %operation_name,
180                            attempts = attempt + 1,
181                            "Retry succeeded"
182                        );
183                    }
184                    return Ok(success);
185                }
186                Ok(Err(error)) => {
187                    if !should_retry(&error) {
188                        tracing::trace!(
189                            operation = %operation_name,
190                            error = %error,
191                            "Non-retryable error"
192                        );
193                        return Err(error);
194                    }
195
196                    if attempt >= self.config.max_retries {
197                        tracing::trace!(
198                            operation = %operation_name,
199                            attempts = attempt + 1,
200                            error = %error,
201                            "Retries exhausted"
202                        );
203                        return Err(error);
204                    }
205
206                    let mut delay = backoff.next_duration();
207
208                    if let Some(max_elapsed_ms) = self.config.max_elapsed_ms {
209                        let elapsed = start_time.elapsed();
210                        let remaining =
211                            Duration::from_millis(max_elapsed_ms).saturating_sub(elapsed);
212
213                        if remaining.is_zero() {
214                            let timeout_error = create_error("Budget exceeded".to_string());
215                            tracing::trace!(
216                                operation = %operation_name,
217                                attempts = attempt + 1,
218                                budget_ms = max_elapsed_ms,
219                                "Retry budget exceeded"
220                            );
221                            return Err(timeout_error);
222                        }
223
224                        delay = delay.min(remaining);
225                    }
226
227                    tracing::trace!(
228                        operation = %operation_name,
229                        attempt = attempt + 1,
230                        delay_ms = delay.as_millis() as u64,
231                        error = %error,
232                        "Retrying after failure"
233                    );
234
235                    // Skip zero-delay sleep to avoid unnecessary yield
236                    if delay.is_zero() {
237                        attempt += 1;
238                        continue;
239                    }
240
241                    if let Some(token) = cancel {
242                        tokio::select! {
243                            _ = tokio::time::sleep(delay) => {},
244                            _ = token.cancelled() => {
245                                tracing::debug!(
246                                    operation = %operation_name,
247                                    attempt = attempt + 1,
248                                    "Operation canceled during retry delay"
249                                );
250                                return Err(create_error("canceled".to_string()));
251                            }
252                        }
253                    } else {
254                        tokio::time::sleep(delay).await;
255                    }
256                    attempt += 1;
257                }
258                Err(_timeout) => {
259                    let timeout_error = create_error(format!(
260                        "Timed out after {}ms",
261                        self.config.operation_timeout_ms.unwrap_or(0)
262                    ));
263
264                    if !should_retry(&timeout_error) {
265                        tracing::trace!(
266                            operation = %operation_name,
267                            error = %timeout_error,
268                            "Non-retryable timeout"
269                        );
270                        return Err(timeout_error);
271                    }
272
273                    if attempt >= self.config.max_retries {
274                        tracing::trace!(
275                            operation = %operation_name,
276                            attempts = attempt + 1,
277                            error = %timeout_error,
278                            "Retries exhausted after timeout"
279                        );
280                        return Err(timeout_error);
281                    }
282
283                    let mut delay = backoff.next_duration();
284
285                    if let Some(max_elapsed_ms) = self.config.max_elapsed_ms {
286                        let elapsed = start_time.elapsed();
287                        let remaining =
288                            Duration::from_millis(max_elapsed_ms).saturating_sub(elapsed);
289
290                        if remaining.is_zero() {
291                            let budget_error = create_error("Budget exceeded".to_string());
292                            tracing::trace!(
293                                operation = %operation_name,
294                                attempts = attempt + 1,
295                                budget_ms = max_elapsed_ms,
296                                "Retry budget exceeded"
297                            );
298                            return Err(budget_error);
299                        }
300
301                        delay = delay.min(remaining);
302                    }
303
304                    tracing::trace!(
305                        operation = %operation_name,
306                        attempt = attempt + 1,
307                        delay_ms = delay.as_millis() as u64,
308                        error = %timeout_error,
309                        "Retrying after timeout"
310                    );
311
312                    // Skip zero-delay sleep to avoid unnecessary yield
313                    if delay.is_zero() {
314                        attempt += 1;
315                        continue;
316                    }
317
318                    if let Some(token) = cancel {
319                        tokio::select! {
320                            _ = tokio::time::sleep(delay) => {},
321                            _ = token.cancelled() => {
322                                tracing::debug!(
323                                    operation = %operation_name,
324                                    attempt = attempt + 1,
325                                    "Operation canceled during retry delay"
326                                );
327                                return Err(create_error("canceled".to_string()));
328                            }
329                        }
330                    } else {
331                        tokio::time::sleep(delay).await;
332                    }
333                    attempt += 1;
334                }
335            }
336        }
337    }
338
339    /// Executes an operation with retry logic.
340    ///
341    /// # Errors
342    ///
343    /// This function will return an error if the operation fails after exhausting all retries,
344    /// if the operation times out, or if creating the backoff state fails.
345    pub async fn execute_with_retry<F, Fut, T>(
346        &self,
347        operation_name: &str,
348        operation: F,
349        should_retry: impl Fn(&E) -> bool,
350        create_error: impl Fn(String) -> E,
351    ) -> Result<T, E>
352    where
353        F: FnMut() -> Fut,
354        Fut: Future<Output = Result<T, E>>,
355    {
356        self.execute_with_retry_inner(operation_name, operation, should_retry, create_error, None)
357            .await
358    }
359
360    /// Executes an operation with retry logic and cancellation support.
361    ///
362    /// # Errors
363    ///
364    /// This function will return an error if the operation fails after exhausting all retries,
365    /// if the operation times out, if creating the backoff state fails, or if canceled.
366    pub async fn execute_with_retry_with_cancel<F, Fut, T>(
367        &self,
368        operation_name: &str,
369        operation: F,
370        should_retry: impl Fn(&E) -> bool,
371        create_error: impl Fn(String) -> E,
372        cancellation_token: &CancellationToken,
373    ) -> Result<T, E>
374    where
375        F: FnMut() -> Fut,
376        Fut: Future<Output = Result<T, E>>,
377    {
378        self.execute_with_retry_inner(
379            operation_name,
380            operation,
381            should_retry,
382            create_error,
383            Some(cancellation_token),
384        )
385        .await
386    }
387}
388
389/// Convenience function to create a retry manager with default configuration.
390///
391/// # Errors
392///
393/// This function will return an error if the default configuration is invalid.
394pub fn create_default_retry_manager<E>() -> anyhow::Result<RetryManager<E>>
395where
396    E: std::error::Error,
397{
398    RetryManager::new(RetryConfig::default())
399}
400
401/// Convenience function to create a retry manager for HTTP operations.
402///
403/// # Errors
404///
405/// This function will return an error if the HTTP configuration is invalid.
406pub fn create_http_retry_manager<E>() -> anyhow::Result<RetryManager<E>>
407where
408    E: std::error::Error,
409{
410    let config = RetryConfig {
411        max_retries: 3,
412        initial_delay_ms: 1_000,
413        max_delay_ms: 10_000,
414        backoff_factor: 2.0,
415        jitter_ms: 1_000,
416        operation_timeout_ms: Some(60_000), // 60s for HTTP requests
417        immediate_first: false,
418        max_elapsed_ms: Some(180_000), // 3 minutes total budget
419    };
420    RetryManager::new(config)
421}
422
423/// Convenience function to create a retry manager for WebSocket operations.
424///
425/// # Errors
426///
427/// This function will return an error if the WebSocket configuration is invalid.
428pub fn create_websocket_retry_manager<E>() -> anyhow::Result<RetryManager<E>>
429where
430    E: std::error::Error,
431{
432    let config = RetryConfig {
433        max_retries: 5,
434        initial_delay_ms: 1_000,
435        max_delay_ms: 10_000,
436        backoff_factor: 2.0,
437        jitter_ms: 1_000,
438        operation_timeout_ms: Some(30_000), // 30s for WebSocket operations
439        immediate_first: true,
440        max_elapsed_ms: Some(120_000), // 2 minutes total budget
441    };
442    RetryManager::new(config)
443}
444
445////////////////////////////////////////////////////////////////////////////////
446// Tests
447////////////////////////////////////////////////////////////////////////////////
448
449#[cfg(test)]
450mod test_utils {
451    #[derive(Debug, thiserror::Error)]
452    pub enum TestError {
453        #[error("Retryable error: {0}")]
454        Retryable(String),
455        #[error("Non-retryable error: {0}")]
456        NonRetryable(String),
457        #[error("Timeout error: {0}")]
458        Timeout(String),
459    }
460
461    pub fn should_retry_test_error(error: &TestError) -> bool {
462        matches!(error, TestError::Retryable(_))
463    }
464
465    pub fn create_test_error(msg: String) -> TestError {
466        TestError::Timeout(msg)
467    }
468}
469
470#[cfg(test)]
471mod tests {
472    use std::sync::{
473        Arc,
474        atomic::{AtomicU32, Ordering},
475    };
476
477    use super::{test_utils::*, *};
478
479    #[test]
480    fn test_retry_config_default() {
481        let config = RetryConfig::default();
482        assert_eq!(config.max_retries, 3);
483        assert_eq!(config.initial_delay_ms, 1_000);
484        assert_eq!(config.max_delay_ms, 10_000);
485        assert_eq!(config.backoff_factor, 2.0);
486        assert_eq!(config.jitter_ms, 100);
487        assert_eq!(config.operation_timeout_ms, Some(30_000));
488        assert!(!config.immediate_first);
489        assert_eq!(config.max_elapsed_ms, None);
490    }
491
492    #[tokio::test]
493    async fn test_retry_manager_success_first_attempt() {
494        let manager = RetryManager::new(RetryConfig::default()).unwrap();
495
496        let result = manager
497            .execute_with_retry(
498                "test_operation",
499                || async { Ok::<i32, TestError>(42) },
500                should_retry_test_error,
501                create_test_error,
502            )
503            .await;
504
505        assert_eq!(result.unwrap(), 42);
506    }
507
508    #[tokio::test]
509    async fn test_retry_manager_non_retryable_error() {
510        let manager = RetryManager::new(RetryConfig::default()).unwrap();
511
512        let result = manager
513            .execute_with_retry(
514                "test_operation",
515                || async { Err::<i32, TestError>(TestError::NonRetryable("test".to_string())) },
516                should_retry_test_error,
517                create_test_error,
518            )
519            .await;
520
521        assert!(result.is_err());
522        assert!(matches!(result.unwrap_err(), TestError::NonRetryable(_)));
523    }
524
525    #[tokio::test]
526    async fn test_retry_manager_retryable_error_exhausted() {
527        let config = RetryConfig {
528            max_retries: 2,
529            initial_delay_ms: 10,
530            max_delay_ms: 50,
531            backoff_factor: 2.0,
532            jitter_ms: 0,
533            operation_timeout_ms: None,
534            immediate_first: false,
535            max_elapsed_ms: None,
536        };
537        let manager = RetryManager::new(config).unwrap();
538
539        let result = manager
540            .execute_with_retry(
541                "test_operation",
542                || async { Err::<i32, TestError>(TestError::Retryable("test".to_string())) },
543                should_retry_test_error,
544                create_test_error,
545            )
546            .await;
547
548        assert!(result.is_err());
549        assert!(matches!(result.unwrap_err(), TestError::Retryable(_)));
550    }
551
552    #[tokio::test]
553    async fn test_timeout_path() {
554        let config = RetryConfig {
555            max_retries: 2,
556            initial_delay_ms: 10,
557            max_delay_ms: 50,
558            backoff_factor: 2.0,
559            jitter_ms: 0,
560            operation_timeout_ms: Some(50),
561            immediate_first: false,
562            max_elapsed_ms: None,
563        };
564        let manager = RetryManager::new(config).unwrap();
565
566        let result = manager
567            .execute_with_retry(
568                "test_timeout",
569                || async {
570                    tokio::time::sleep(Duration::from_millis(100)).await;
571                    Ok::<i32, TestError>(42)
572                },
573                should_retry_test_error,
574                create_test_error,
575            )
576            .await;
577
578        assert!(result.is_err());
579        assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
580    }
581
582    #[tokio::test]
583    async fn test_max_elapsed_time_budget() {
584        let config = RetryConfig {
585            max_retries: 10,
586            initial_delay_ms: 50,
587            max_delay_ms: 100,
588            backoff_factor: 2.0,
589            jitter_ms: 0,
590            operation_timeout_ms: None,
591            immediate_first: false,
592            max_elapsed_ms: Some(200),
593        };
594        let manager = RetryManager::new(config).unwrap();
595
596        let start = tokio::time::Instant::now();
597        let result = manager
598            .execute_with_retry(
599                "test_budget",
600                || async { Err::<i32, TestError>(TestError::Retryable("test".to_string())) },
601                should_retry_test_error,
602                create_test_error,
603            )
604            .await;
605
606        let elapsed = start.elapsed();
607        assert!(result.is_err());
608        assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
609        assert!(elapsed.as_millis() >= 150);
610        assert!(elapsed.as_millis() < 1000);
611    }
612
613    #[test]
614    fn test_http_retry_manager_config() {
615        let manager = create_http_retry_manager::<TestError>().unwrap();
616        assert_eq!(manager.config.max_retries, 3);
617        assert!(!manager.config.immediate_first);
618        assert_eq!(manager.config.max_elapsed_ms, Some(180_000));
619    }
620
621    #[test]
622    fn test_websocket_retry_manager_config() {
623        let manager = create_websocket_retry_manager::<TestError>().unwrap();
624        assert_eq!(manager.config.max_retries, 5);
625        assert!(manager.config.immediate_first);
626        assert_eq!(manager.config.max_elapsed_ms, Some(120_000));
627    }
628
629    #[tokio::test]
630    async fn test_timeout_respects_retry_predicate() {
631        let config = RetryConfig {
632            max_retries: 3,
633            initial_delay_ms: 10,
634            max_delay_ms: 50,
635            backoff_factor: 2.0,
636            jitter_ms: 0,
637            operation_timeout_ms: Some(50),
638            immediate_first: false,
639            max_elapsed_ms: None,
640        };
641        let manager = RetryManager::new(config).unwrap();
642
643        // Test with retry predicate that rejects timeouts
644        let should_not_retry_timeouts = |error: &TestError| !matches!(error, TestError::Timeout(_));
645
646        let result = manager
647            .execute_with_retry(
648                "test_timeout_non_retryable",
649                || async {
650                    tokio::time::sleep(Duration::from_millis(100)).await;
651                    Ok::<i32, TestError>(42)
652                },
653                should_not_retry_timeouts,
654                create_test_error,
655            )
656            .await;
657
658        // Should fail immediately without retries since timeout is non-retryable
659        assert!(result.is_err());
660        assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
661    }
662
663    #[tokio::test]
664    async fn test_timeout_retries_when_predicate_allows() {
665        let config = RetryConfig {
666            max_retries: 2,
667            initial_delay_ms: 10,
668            max_delay_ms: 50,
669            backoff_factor: 2.0,
670            jitter_ms: 0,
671            operation_timeout_ms: Some(50),
672            immediate_first: false,
673            max_elapsed_ms: None,
674        };
675        let manager = RetryManager::new(config).unwrap();
676
677        // Test with retry predicate that allows timeouts
678        let should_retry_timeouts = |error: &TestError| matches!(error, TestError::Timeout(_));
679
680        let start = tokio::time::Instant::now();
681        let result = manager
682            .execute_with_retry(
683                "test_timeout_retryable",
684                || async {
685                    tokio::time::sleep(Duration::from_millis(100)).await;
686                    Ok::<i32, TestError>(42)
687                },
688                should_retry_timeouts,
689                create_test_error,
690            )
691            .await;
692
693        let elapsed = start.elapsed();
694
695        // Should fail after retries (not immediately)
696        assert!(result.is_err());
697        assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
698        // Should have taken time for retries (at least 2 timeouts + delays)
699        assert!(elapsed.as_millis() > 80); // More than just one timeout
700    }
701
702    #[tokio::test]
703    async fn test_successful_retry_after_failures() {
704        let config = RetryConfig {
705            max_retries: 3,
706            initial_delay_ms: 10,
707            max_delay_ms: 50,
708            backoff_factor: 2.0,
709            jitter_ms: 0,
710            operation_timeout_ms: None,
711            immediate_first: false,
712            max_elapsed_ms: None,
713        };
714        let manager = RetryManager::new(config).unwrap();
715
716        let attempt_counter = Arc::new(AtomicU32::new(0));
717        let counter_clone = attempt_counter.clone();
718
719        let result = manager
720            .execute_with_retry(
721                "test_eventual_success",
722                move || {
723                    let counter = counter_clone.clone();
724                    async move {
725                        let attempts = counter.fetch_add(1, Ordering::SeqCst);
726                        if attempts < 2 {
727                            Err(TestError::Retryable("temporary failure".to_string()))
728                        } else {
729                            Ok(42)
730                        }
731                    }
732                },
733                should_retry_test_error,
734                create_test_error,
735            )
736            .await;
737
738        assert_eq!(result.unwrap(), 42);
739        assert_eq!(attempt_counter.load(Ordering::SeqCst), 3);
740    }
741
742    #[tokio::test]
743    #[ignore = "Non-deterministic timing test - TODO: Convert to use deterministic tokio time"]
744    async fn test_immediate_first_retry() {
745        let config = RetryConfig {
746            max_retries: 2,
747            initial_delay_ms: 100,
748            max_delay_ms: 200,
749            backoff_factor: 2.0,
750            jitter_ms: 0,
751            operation_timeout_ms: None,
752            immediate_first: true,
753            max_elapsed_ms: None,
754        };
755        let manager = RetryManager::new(config).unwrap();
756
757        let attempt_times = Arc::new(std::sync::Mutex::new(Vec::new()));
758        let times_clone = attempt_times.clone();
759        let start = tokio::time::Instant::now();
760
761        let _result = manager
762            .execute_with_retry(
763                "test_immediate",
764                move || {
765                    let times = times_clone.clone();
766                    async move {
767                        times.lock().unwrap().push(start.elapsed());
768                        Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
769                    }
770                },
771                should_retry_test_error,
772                create_test_error,
773            )
774            .await;
775
776        let times = attempt_times.lock().unwrap();
777        assert_eq!(times.len(), 3); // Initial + 2 retries
778
779        // First retry should be immediate (within 10ms)
780        assert!(times[1].as_millis() < 10);
781        // Second retry should have delay (at least 100ms from first retry)
782        assert!(times[2].as_millis() >= 100);
783    }
784
785    #[tokio::test]
786    async fn test_operation_without_timeout() {
787        let config = RetryConfig {
788            max_retries: 2,
789            initial_delay_ms: 10,
790            max_delay_ms: 50,
791            backoff_factor: 2.0,
792            jitter_ms: 0,
793            operation_timeout_ms: None, // No timeout
794            immediate_first: false,
795            max_elapsed_ms: None,
796        };
797        let manager = RetryManager::new(config).unwrap();
798
799        let start = tokio::time::Instant::now();
800        let result = manager
801            .execute_with_retry(
802                "test_no_timeout",
803                || async {
804                    tokio::time::sleep(Duration::from_millis(50)).await;
805                    Ok::<i32, TestError>(42)
806                },
807                should_retry_test_error,
808                create_test_error,
809            )
810            .await;
811
812        let elapsed = start.elapsed();
813        assert_eq!(result.unwrap(), 42);
814        // Should complete without timing out
815        assert!(elapsed.as_millis() >= 30);
816        assert!(elapsed.as_millis() < 200);
817    }
818
819    #[tokio::test]
820    async fn test_zero_retries() {
821        let config = RetryConfig {
822            max_retries: 0,
823            initial_delay_ms: 10,
824            max_delay_ms: 50,
825            backoff_factor: 2.0,
826            jitter_ms: 0,
827            operation_timeout_ms: None,
828            immediate_first: false,
829            max_elapsed_ms: None,
830        };
831        let manager = RetryManager::new(config).unwrap();
832
833        let attempt_counter = Arc::new(AtomicU32::new(0));
834        let counter_clone = attempt_counter.clone();
835
836        let result = manager
837            .execute_with_retry(
838                "test_no_retries",
839                move || {
840                    let counter = counter_clone.clone();
841                    async move {
842                        counter.fetch_add(1, Ordering::SeqCst);
843                        Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
844                    }
845                },
846                should_retry_test_error,
847                create_test_error,
848            )
849            .await;
850
851        assert!(result.is_err());
852        // Should only attempt once (no retries)
853        assert_eq!(attempt_counter.load(Ordering::SeqCst), 1);
854    }
855
856    #[tokio::test]
857    #[ignore = "Non-deterministic timing test - TODO: Convert to use deterministic tokio time"]
858    async fn test_jitter_applied() {
859        let config = RetryConfig {
860            max_retries: 2,
861            initial_delay_ms: 50,
862            max_delay_ms: 100,
863            backoff_factor: 2.0,
864            jitter_ms: 50, // Significant jitter
865            operation_timeout_ms: None,
866            immediate_first: false,
867            max_elapsed_ms: None,
868        };
869        let manager = RetryManager::new(config).unwrap();
870
871        let delays = Arc::new(std::sync::Mutex::new(Vec::new()));
872        let delays_clone = delays.clone();
873        let last_time = Arc::new(std::sync::Mutex::new(tokio::time::Instant::now()));
874        let last_time_clone = last_time.clone();
875
876        let _result = manager
877            .execute_with_retry(
878                "test_jitter",
879                move || {
880                    let delays = delays_clone.clone();
881                    let last_time = last_time_clone.clone();
882                    async move {
883                        let now = tokio::time::Instant::now();
884                        let delay = {
885                            let mut last = last_time.lock().unwrap();
886                            let d = now.duration_since(*last);
887                            *last = now;
888                            d
889                        };
890                        delays.lock().unwrap().push(delay);
891                        Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
892                    }
893                },
894                should_retry_test_error,
895                create_test_error,
896            )
897            .await;
898
899        let delays = delays.lock().unwrap();
900        // Skip the first delay (initial attempt)
901        for delay in delays.iter().skip(1) {
902            // Each delay should be at least the base delay (50ms for first retry)
903            assert!(delay.as_millis() >= 50);
904            // But no more than base + jitter (100ms for first retry)
905            assert!(delay.as_millis() <= 150);
906        }
907    }
908
909    #[tokio::test]
910    async fn test_max_elapsed_stops_early() {
911        let config = RetryConfig {
912            max_retries: 100, // Very high retry count
913            initial_delay_ms: 50,
914            max_delay_ms: 100,
915            backoff_factor: 1.5,
916            jitter_ms: 0,
917            operation_timeout_ms: None,
918            immediate_first: false,
919            max_elapsed_ms: Some(150), // Should stop after ~3 attempts
920        };
921        let manager = RetryManager::new(config).unwrap();
922
923        let attempt_counter = Arc::new(AtomicU32::new(0));
924        let counter_clone = attempt_counter.clone();
925
926        let start = tokio::time::Instant::now();
927        let result = manager
928            .execute_with_retry(
929                "test_elapsed_limit",
930                move || {
931                    let counter = counter_clone.clone();
932                    async move {
933                        counter.fetch_add(1, Ordering::SeqCst);
934                        Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
935                    }
936                },
937                should_retry_test_error,
938                create_test_error,
939            )
940            .await;
941
942        let elapsed = start.elapsed();
943        assert!(result.is_err());
944        assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
945
946        // Should have stopped due to time limit, not retry count
947        let attempts = attempt_counter.load(Ordering::SeqCst);
948        assert!(attempts < 10); // Much less than max_retries
949        assert!(elapsed.as_millis() >= 100);
950    }
951
952    #[tokio::test]
953    async fn test_mixed_errors_retry_behavior() {
954        let config = RetryConfig {
955            max_retries: 5,
956            initial_delay_ms: 10,
957            max_delay_ms: 50,
958            backoff_factor: 2.0,
959            jitter_ms: 0,
960            operation_timeout_ms: None,
961            immediate_first: false,
962            max_elapsed_ms: None,
963        };
964        let manager = RetryManager::new(config).unwrap();
965
966        let attempt_counter = Arc::new(AtomicU32::new(0));
967        let counter_clone = attempt_counter.clone();
968
969        let result = manager
970            .execute_with_retry(
971                "test_mixed_errors",
972                move || {
973                    let counter = counter_clone.clone();
974                    async move {
975                        let attempts = counter.fetch_add(1, Ordering::SeqCst);
976                        match attempts {
977                            0 => Err(TestError::Retryable("retry 1".to_string())),
978                            1 => Err(TestError::Retryable("retry 2".to_string())),
979                            2 => Err(TestError::NonRetryable("stop here".to_string())),
980                            _ => Ok(42),
981                        }
982                    }
983                },
984                should_retry_test_error,
985                create_test_error,
986            )
987            .await;
988
989        assert!(result.is_err());
990        assert!(matches!(result.unwrap_err(), TestError::NonRetryable(_)));
991        // Should stop at the non-retryable error
992        assert_eq!(attempt_counter.load(Ordering::SeqCst), 3);
993    }
994
995    #[tokio::test]
996    async fn test_cancellation_during_retry_delay() {
997        use tokio_util::sync::CancellationToken;
998
999        let config = RetryConfig {
1000            max_retries: 10,
1001            initial_delay_ms: 500, // Long delay to ensure cancellation happens during sleep
1002            max_delay_ms: 1000,
1003            backoff_factor: 2.0,
1004            jitter_ms: 0,
1005            operation_timeout_ms: None,
1006            immediate_first: false,
1007            max_elapsed_ms: None,
1008        };
1009        let manager = RetryManager::new(config).unwrap();
1010
1011        let token = CancellationToken::new();
1012        let token_clone = token.clone();
1013
1014        // Cancel after a short delay
1015        tokio::spawn(async move {
1016            tokio::time::sleep(Duration::from_millis(100)).await;
1017            token_clone.cancel();
1018        });
1019
1020        let attempt_counter = Arc::new(AtomicU32::new(0));
1021        let counter_clone = attempt_counter.clone();
1022
1023        let start = tokio::time::Instant::now();
1024        let result = manager
1025            .execute_with_retry_with_cancel(
1026                "test_cancellation",
1027                move || {
1028                    let counter = counter_clone.clone();
1029                    async move {
1030                        counter.fetch_add(1, Ordering::SeqCst);
1031                        Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1032                    }
1033                },
1034                should_retry_test_error,
1035                create_test_error,
1036                &token,
1037            )
1038            .await;
1039
1040        let elapsed = start.elapsed();
1041
1042        // Should be canceled quickly
1043        assert!(result.is_err());
1044        let error_msg = format!("{}", result.unwrap_err());
1045        assert!(error_msg.contains("canceled"));
1046
1047        // Should not have taken the full delay time
1048        assert!(elapsed.as_millis() < 600);
1049
1050        // Should have made at least one attempt
1051        let attempts = attempt_counter.load(Ordering::SeqCst);
1052        assert!(attempts >= 1);
1053    }
1054
1055    #[tokio::test]
1056    async fn test_cancellation_during_operation_execution() {
1057        use tokio_util::sync::CancellationToken;
1058
1059        let config = RetryConfig {
1060            max_retries: 5,
1061            initial_delay_ms: 50,
1062            max_delay_ms: 100,
1063            backoff_factor: 2.0,
1064            jitter_ms: 0,
1065            operation_timeout_ms: None,
1066            immediate_first: false,
1067            max_elapsed_ms: None,
1068        };
1069        let manager = RetryManager::new(config).unwrap();
1070
1071        let token = CancellationToken::new();
1072        let token_clone = token.clone();
1073
1074        // Cancel after a short delay
1075        tokio::spawn(async move {
1076            tokio::time::sleep(Duration::from_millis(50)).await;
1077            token_clone.cancel();
1078        });
1079
1080        let start = tokio::time::Instant::now();
1081        let result = manager
1082            .execute_with_retry_with_cancel(
1083                "test_cancellation_during_op",
1084                || async {
1085                    // Long-running operation
1086                    tokio::time::sleep(Duration::from_millis(200)).await;
1087                    Ok::<i32, TestError>(42)
1088                },
1089                should_retry_test_error,
1090                create_test_error,
1091                &token,
1092            )
1093            .await;
1094
1095        let elapsed = start.elapsed();
1096
1097        // Should be canceled during the operation
1098        assert!(result.is_err());
1099        let error_msg = format!("{}", result.unwrap_err());
1100        assert!(error_msg.contains("canceled"));
1101
1102        // Should not have completed the long operation
1103        assert!(elapsed.as_millis() < 250);
1104    }
1105
1106    #[tokio::test]
1107    async fn test_cancellation_error_message() {
1108        use tokio_util::sync::CancellationToken;
1109
1110        let config = RetryConfig::default();
1111        let manager = RetryManager::new(config).unwrap();
1112
1113        let token = CancellationToken::new();
1114        token.cancel(); // Pre-cancel for immediate cancellation
1115
1116        let result = manager
1117            .execute_with_retry_with_cancel(
1118                "test_operation",
1119                || async { Ok::<i32, TestError>(42) },
1120                should_retry_test_error,
1121                create_test_error,
1122                &token,
1123            )
1124            .await;
1125
1126        assert!(result.is_err());
1127        let error_msg = format!("{}", result.unwrap_err());
1128        assert!(error_msg.contains("canceled"));
1129    }
1130}
1131
1132#[cfg(test)]
1133mod proptest_tests {
1134    use std::sync::{
1135        Arc,
1136        atomic::{AtomicU32, Ordering},
1137    };
1138
1139    use proptest::prelude::*;
1140
1141    use super::{test_utils::*, *};
1142
1143    proptest! {
1144        #[test]
1145        fn test_retry_config_valid_ranges(
1146            max_retries in 0u32..100,
1147            initial_delay_ms in 1u64..10_000,
1148            max_delay_ms in 1u64..60_000,
1149            backoff_factor in 1.0f64..10.0,
1150            jitter_ms in 0u64..1_000,
1151            operation_timeout_ms in prop::option::of(1u64..120_000),
1152            immediate_first in any::<bool>(),
1153            max_elapsed_ms in prop::option::of(1u64..300_000)
1154        ) {
1155            // Ensure max_delay >= initial_delay for valid config
1156            let max_delay_ms = max_delay_ms.max(initial_delay_ms);
1157
1158            let config = RetryConfig {
1159                max_retries,
1160                initial_delay_ms,
1161                max_delay_ms,
1162                backoff_factor,
1163                jitter_ms,
1164                operation_timeout_ms,
1165                immediate_first,
1166                max_elapsed_ms,
1167            };
1168
1169            // Should always be able to create a RetryManager with valid config
1170            let manager = RetryManager::<std::io::Error>::new(config);
1171            prop_assert!(manager.is_ok());
1172        }
1173
1174        #[test]
1175        fn test_retry_attempts_bounded(
1176            max_retries in 0u32..5,
1177            initial_delay_ms in 1u64..10,
1178            backoff_factor in 1.0f64..2.0,
1179        ) {
1180            let rt = tokio::runtime::Builder::new_current_thread()
1181                .enable_time()
1182                .build()
1183                .unwrap();
1184
1185            let config = RetryConfig {
1186                max_retries,
1187                initial_delay_ms,
1188                max_delay_ms: initial_delay_ms * 2,
1189                backoff_factor,
1190                jitter_ms: 0,
1191                operation_timeout_ms: None,
1192                immediate_first: false,
1193                max_elapsed_ms: None,
1194            };
1195
1196            let manager = RetryManager::new(config).unwrap();
1197            let attempt_counter = Arc::new(AtomicU32::new(0));
1198            let counter_clone = attempt_counter.clone();
1199
1200            let _result = rt.block_on(manager.execute_with_retry(
1201                "prop_test",
1202                move || {
1203                    let counter = counter_clone.clone();
1204                    async move {
1205                        counter.fetch_add(1, Ordering::SeqCst);
1206                        Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1207                    }
1208                },
1209                |e: &TestError| matches!(e, TestError::Retryable(_)),
1210                TestError::Timeout,
1211            ));
1212
1213            let attempts = attempt_counter.load(Ordering::SeqCst);
1214            // Total attempts should be 1 (initial) + max_retries
1215            prop_assert_eq!(attempts, max_retries + 1);
1216        }
1217
1218        #[test]
1219        fn test_timeout_always_respected(
1220            timeout_ms in 10u64..50,
1221            operation_delay_ms in 60u64..100,
1222        ) {
1223            let rt = tokio::runtime::Builder::new_current_thread()
1224                .enable_time()
1225                .start_paused(true)
1226                .build()
1227                .unwrap();
1228
1229            let config = RetryConfig {
1230                max_retries: 0, // No retries to isolate timeout behavior
1231                initial_delay_ms: 10,
1232                max_delay_ms: 100,
1233                backoff_factor: 2.0,
1234                jitter_ms: 0,
1235                operation_timeout_ms: Some(timeout_ms),
1236                immediate_first: false,
1237                max_elapsed_ms: None,
1238            };
1239
1240            let manager = RetryManager::new(config).unwrap();
1241
1242            let result = rt.block_on(async {
1243                let operation_future = manager.execute_with_retry(
1244                    "timeout_test",
1245                    move || async move {
1246                        tokio::time::sleep(Duration::from_millis(operation_delay_ms)).await;
1247                        Ok::<i32, TestError>(42)
1248                    },
1249                    |_: &TestError| true,
1250                    TestError::Timeout,
1251                );
1252
1253                // Advance time to trigger timeout
1254                tokio::time::advance(Duration::from_millis(timeout_ms + 10)).await;
1255                operation_future.await
1256            });
1257
1258            // Operation should timeout
1259            prop_assert!(result.is_err());
1260            prop_assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
1261        }
1262
1263        #[test]
1264        fn test_max_elapsed_always_respected(
1265            max_elapsed_ms in 20u64..50,
1266            delay_per_retry in 15u64..30,
1267            max_retries in 10u32..20,
1268        ) {
1269            let rt = tokio::runtime::Builder::new_current_thread()
1270                .enable_time()
1271                .start_paused(true)
1272                .build()
1273                .unwrap();
1274
1275            // Set up config where we would exceed max_elapsed_ms before max_retries
1276            let config = RetryConfig {
1277                max_retries,
1278                initial_delay_ms: delay_per_retry,
1279                max_delay_ms: delay_per_retry * 2,
1280                backoff_factor: 1.0, // No backoff to make timing predictable
1281                jitter_ms: 0,
1282                operation_timeout_ms: None,
1283                immediate_first: false,
1284                max_elapsed_ms: Some(max_elapsed_ms),
1285            };
1286
1287            let manager = RetryManager::new(config).unwrap();
1288            let attempt_counter = Arc::new(AtomicU32::new(0));
1289            let counter_clone = attempt_counter.clone();
1290
1291            let result = rt.block_on(async {
1292                let operation_future = manager.execute_with_retry(
1293                    "elapsed_test",
1294                    move || {
1295                        let counter = counter_clone.clone();
1296                        async move {
1297                            counter.fetch_add(1, Ordering::SeqCst);
1298                            Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1299                        }
1300                    },
1301                    |e: &TestError| matches!(e, TestError::Retryable(_)),
1302                    TestError::Timeout,
1303                );
1304
1305                // Advance time past max_elapsed_ms
1306                tokio::time::advance(Duration::from_millis(max_elapsed_ms + delay_per_retry)).await;
1307                operation_future.await
1308            });
1309
1310            let attempts = attempt_counter.load(Ordering::SeqCst);
1311
1312            // Should have failed with timeout error
1313            prop_assert!(result.is_err());
1314            prop_assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
1315
1316            // Should have stopped before exhausting all retries
1317            prop_assert!(attempts <= max_retries + 1);
1318        }
1319
1320        #[test]
1321        #[ignore = "Non-deterministic timing test - TODO: Convert to use deterministic tokio time"]
1322        fn test_jitter_bounds(
1323            jitter_ms in 0u64..20,
1324            base_delay_ms in 10u64..30,
1325        ) {
1326            let rt = tokio::runtime::Builder::new_current_thread()
1327                .enable_time()
1328                .build()
1329                .unwrap();
1330
1331            let config = RetryConfig {
1332                max_retries: 2,
1333                initial_delay_ms: base_delay_ms,
1334                max_delay_ms: base_delay_ms * 2,
1335                backoff_factor: 1.0, // No backoff to isolate jitter
1336                jitter_ms,
1337                operation_timeout_ms: None,
1338                immediate_first: false,
1339                max_elapsed_ms: None,
1340            };
1341
1342            let manager = RetryManager::new(config).unwrap();
1343            let attempt_times = Arc::new(std::sync::Mutex::new(Vec::new()));
1344            let times_clone = attempt_times.clone();
1345            let start_time = std::time::Instant::now();
1346
1347            let _result = rt.block_on(manager.execute_with_retry(
1348                "jitter_test",
1349                move || {
1350                    let times = times_clone.clone();
1351                    async move {
1352                        times.lock().unwrap().push(start_time.elapsed());
1353                        Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1354                    }
1355                },
1356                |e: &TestError| matches!(e, TestError::Retryable(_)),
1357                TestError::Timeout,
1358            ));
1359
1360            let times = attempt_times.lock().unwrap();
1361
1362            // We expect at least 2 attempts total (initial + at least 1 retry)
1363            prop_assert!(times.len() >= 2);
1364
1365            // First attempt should be immediate (no delay)
1366            prop_assert!(times[0].as_millis() < 5);
1367
1368            // Check subsequent retries have appropriate delays
1369            for i in 1..times.len() {
1370                let delay_from_previous = if i == 1 {
1371                    times[i] - times[0]
1372                } else {
1373                    times[i] - times[i - 1]
1374                };
1375
1376                // The delay should be at least base_delay_ms
1377                prop_assert!(
1378                    delay_from_previous.as_millis() >= base_delay_ms as u128,
1379                    "Retry {} delay {}ms is less than base {}ms",
1380                    i, delay_from_previous.as_millis(), base_delay_ms
1381                );
1382
1383                // Delay should be at most base_delay + jitter
1384                prop_assert!(
1385                    delay_from_previous.as_millis() <= (base_delay_ms + jitter_ms + 5) as u128,
1386                    "Retry {} delay {}ms exceeds base {} + jitter {}",
1387                    i, delay_from_previous.as_millis(), base_delay_ms, jitter_ms
1388                );
1389            }
1390        }
1391
1392        #[test]
1393        #[ignore = "Non-deterministic timing test - TODO: Convert to use deterministic tokio time"]
1394        fn test_immediate_first_property(
1395            immediate_first in any::<bool>(),
1396            initial_delay_ms in 10u64..30,
1397        ) {
1398            let rt = tokio::runtime::Builder::new_current_thread()
1399                .enable_time()
1400                .build()
1401                .unwrap();
1402
1403            let config = RetryConfig {
1404                max_retries: 2,
1405                initial_delay_ms,
1406                max_delay_ms: initial_delay_ms * 2,
1407                backoff_factor: 2.0,
1408                jitter_ms: 0,
1409                operation_timeout_ms: None,
1410                immediate_first,
1411                max_elapsed_ms: None,
1412            };
1413
1414            let manager = RetryManager::new(config).unwrap();
1415            let attempt_times = Arc::new(std::sync::Mutex::new(Vec::new()));
1416            let times_clone = attempt_times.clone();
1417
1418            let start = std::time::Instant::now();
1419            let _result = rt.block_on(manager.execute_with_retry(
1420                "immediate_test",
1421                move || {
1422                    let times = times_clone.clone();
1423                    async move {
1424                        let elapsed = start.elapsed();
1425                        times.lock().unwrap().push(elapsed);
1426                        Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1427                    }
1428                },
1429                |e: &TestError| matches!(e, TestError::Retryable(_)),
1430                TestError::Timeout,
1431            ));
1432
1433            let times = attempt_times.lock().unwrap();
1434            prop_assert!(times.len() >= 2);
1435
1436            if immediate_first {
1437                // First retry should be immediate
1438                prop_assert!(times[1].as_millis() < 20,
1439                    "With immediate_first=true, first retry took {}ms",
1440                    times[1].as_millis());
1441            } else {
1442                // First retry should have delay
1443                prop_assert!(times[1].as_millis() >= (initial_delay_ms - 10) as u128,
1444                    "With immediate_first=false, first retry was too fast: {}ms",
1445                    times[1].as_millis());
1446            }
1447        }
1448
1449        #[test]
1450        fn test_non_retryable_stops_immediately(
1451            attempt_before_non_retryable in 0usize..3,
1452            max_retries in 3u32..5,
1453        ) {
1454            let rt = tokio::runtime::Builder::new_current_thread()
1455                .enable_time()
1456                .build()
1457                .unwrap();
1458
1459            let config = RetryConfig {
1460                max_retries,
1461                initial_delay_ms: 10,
1462                max_delay_ms: 100,
1463                backoff_factor: 2.0,
1464                jitter_ms: 0,
1465                operation_timeout_ms: None,
1466                immediate_first: false,
1467                max_elapsed_ms: None,
1468            };
1469
1470            let manager = RetryManager::new(config).unwrap();
1471            let attempt_counter = Arc::new(AtomicU32::new(0));
1472            let counter_clone = attempt_counter.clone();
1473
1474            let result: Result<i32, TestError> = rt.block_on(manager.execute_with_retry(
1475                "non_retryable_test",
1476                move || {
1477                    let counter = counter_clone.clone();
1478                    async move {
1479                        let attempts = counter.fetch_add(1, Ordering::SeqCst) as usize;
1480                        if attempts == attempt_before_non_retryable {
1481                            Err(TestError::NonRetryable("stop".to_string()))
1482                        } else {
1483                            Err(TestError::Retryable("retry".to_string()))
1484                        }
1485                    }
1486                },
1487                |e: &TestError| matches!(e, TestError::Retryable(_)),
1488                TestError::Timeout,
1489            ));
1490
1491            let attempts = attempt_counter.load(Ordering::SeqCst) as usize;
1492
1493            prop_assert!(result.is_err());
1494            prop_assert!(matches!(result.unwrap_err(), TestError::NonRetryable(_)));
1495            // Should stop exactly when non-retryable error occurs
1496            prop_assert_eq!(attempts, attempt_before_non_retryable + 1);
1497        }
1498
1499        #[test]
1500        fn test_cancellation_stops_immediately(
1501            cancel_after_ms in 10u64..100,
1502            initial_delay_ms in 200u64..500,
1503        ) {
1504            use tokio_util::sync::CancellationToken;
1505
1506            let rt = tokio::runtime::Builder::new_current_thread()
1507                .enable_time()
1508                .start_paused(true)
1509                .build()
1510                .unwrap();
1511
1512            let config = RetryConfig {
1513                max_retries: 10,
1514                initial_delay_ms,
1515                max_delay_ms: initial_delay_ms * 2,
1516                backoff_factor: 2.0,
1517                jitter_ms: 0,
1518                operation_timeout_ms: None,
1519                immediate_first: false,
1520                max_elapsed_ms: None,
1521            };
1522
1523            let manager = RetryManager::new(config).unwrap();
1524            let token = CancellationToken::new();
1525            let token_clone = token.clone();
1526
1527            let result: Result<i32, TestError> = rt.block_on(async {
1528                // Spawn cancellation task
1529                tokio::spawn(async move {
1530                    tokio::time::sleep(Duration::from_millis(cancel_after_ms)).await;
1531                    token_clone.cancel();
1532                });
1533
1534                let operation_future = manager.execute_with_retry_with_cancel(
1535                    "cancellation_test",
1536                    || async {
1537                        Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1538                    },
1539                    |e: &TestError| matches!(e, TestError::Retryable(_)),
1540                    create_test_error,
1541                    &token,
1542                );
1543
1544                // Advance time to trigger cancellation
1545                tokio::time::advance(Duration::from_millis(cancel_after_ms + 10)).await;
1546                operation_future.await
1547            });
1548
1549            // Should be canceled
1550            prop_assert!(result.is_err());
1551            let error_msg = format!("{}", result.unwrap_err());
1552            prop_assert!(error_msg.contains("canceled"));
1553        }
1554
1555        #[test]
1556        fn test_budget_clamp_prevents_overshoot(
1557            max_elapsed_ms in 10u64..30,
1558            delay_per_retry in 20u64..50,
1559        ) {
1560            let rt = tokio::runtime::Builder::new_current_thread()
1561                .enable_time()
1562                .start_paused(true)
1563                .build()
1564                .unwrap();
1565
1566            // Configure so that first retry delay would exceed budget
1567            let config = RetryConfig {
1568                max_retries: 5,
1569                initial_delay_ms: delay_per_retry,
1570                max_delay_ms: delay_per_retry * 2,
1571                backoff_factor: 1.0,
1572                jitter_ms: 0,
1573                operation_timeout_ms: None,
1574                immediate_first: false,
1575                max_elapsed_ms: Some(max_elapsed_ms),
1576            };
1577
1578            let manager = RetryManager::new(config).unwrap();
1579
1580            let _result = rt.block_on(async {
1581                let operation_future = manager.execute_with_retry(
1582                    "budget_clamp_test",
1583                    || async {
1584                        // Fast operation to focus on delay timing
1585                        Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1586                    },
1587                    |e: &TestError| matches!(e, TestError::Retryable(_)),
1588                    create_test_error,
1589                );
1590
1591                // Advance time past max_elapsed_ms
1592                tokio::time::advance(Duration::from_millis(max_elapsed_ms + delay_per_retry)).await;
1593                operation_future.await
1594            });
1595
1596            // With deterministic time, operation completes without wall-clock delay
1597            // The budget constraint is still enforced by the retry manager
1598        }
1599
1600        #[test]
1601        fn test_success_on_kth_attempt(
1602            k in 1usize..5,
1603            initial_delay_ms in 5u64..20,
1604        ) {
1605            let rt = tokio::runtime::Builder::new_current_thread()
1606                .enable_time()
1607                .start_paused(true)
1608                .build()
1609                .unwrap();
1610
1611            let config = RetryConfig {
1612                max_retries: 10, // More than k
1613                initial_delay_ms,
1614                max_delay_ms: initial_delay_ms * 4,
1615                backoff_factor: 2.0,
1616                jitter_ms: 0,
1617                operation_timeout_ms: None,
1618                immediate_first: false,
1619                max_elapsed_ms: None,
1620            };
1621
1622            let manager = RetryManager::new(config).unwrap();
1623            let attempt_counter = Arc::new(AtomicU32::new(0));
1624            let counter_clone = attempt_counter.clone();
1625            let target_k = k;
1626
1627            let (result, _elapsed) = rt.block_on(async {
1628                let start = tokio::time::Instant::now();
1629
1630                let operation_future = manager.execute_with_retry(
1631                    "kth_attempt_test",
1632                    move || {
1633                        let counter = counter_clone.clone();
1634                        async move {
1635                            let attempt = counter.fetch_add(1, Ordering::SeqCst) as usize;
1636                            if attempt + 1 == target_k {
1637                                Ok(42)
1638                            } else {
1639                                Err(TestError::Retryable("retry".to_string()))
1640                            }
1641                        }
1642                    },
1643                    |e: &TestError| matches!(e, TestError::Retryable(_)),
1644                    create_test_error,
1645                );
1646
1647                // Advance time to allow enough retries
1648                for _ in 0..k {
1649                    tokio::time::advance(Duration::from_millis(initial_delay_ms * 4)).await;
1650                }
1651
1652                let result = operation_future.await;
1653                let elapsed = start.elapsed();
1654
1655                (result, elapsed)
1656            });
1657
1658            let attempts = attempt_counter.load(Ordering::SeqCst) as usize;
1659
1660            // Using paused Tokio time (start_paused + advance); assert behavior only (no wall-clock timing)
1661            prop_assert!(result.is_ok());
1662            prop_assert_eq!(result.unwrap(), 42);
1663            prop_assert_eq!(attempts, k);
1664        }
1665    }
1666}