nautilus_network/
retry.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Generic retry mechanism for network operations.
17
18use std::{future::Future, marker::PhantomData, time::Duration};
19
20use tokio_util::sync::CancellationToken;
21
22use crate::backoff::ExponentialBackoff;
23
24/// Configuration for retry behavior.
25#[derive(Debug, Clone)]
26pub struct RetryConfig {
27    /// Maximum number of retry attempts (total attempts = 1 initial + `max_retries`).
28    pub max_retries: u32,
29    /// Initial delay between retries in milliseconds.
30    pub initial_delay_ms: u64,
31    /// Maximum delay between retries in milliseconds.
32    pub max_delay_ms: u64,
33    /// Backoff multiplier factor.
34    pub backoff_factor: f64,
35    /// Maximum jitter in milliseconds to add to delays.
36    pub jitter_ms: u64,
37    /// Optional timeout for individual operations in milliseconds.
38    /// If None, no timeout is applied.
39    pub operation_timeout_ms: Option<u64>,
40    /// Whether the first retry should happen immediately without delay.
41    /// Should be false for HTTP/order operations, true for connection operations.
42    pub immediate_first: bool,
43    /// Optional maximum total elapsed time for all retries in milliseconds.
44    /// If exceeded, retries stop even if `max_retries` hasn't been reached.
45    pub max_elapsed_ms: Option<u64>,
46}
47
48impl Default for RetryConfig {
49    fn default() -> Self {
50        Self {
51            max_retries: 3,
52            initial_delay_ms: 1_000,
53            max_delay_ms: 10_000,
54            backoff_factor: 2.0,
55            jitter_ms: 100,
56            operation_timeout_ms: Some(30_000),
57            immediate_first: false,
58            max_elapsed_ms: None,
59        }
60    }
61}
62
63/// Generic retry manager for network operations.
64///
65/// Stateless and thread-safe - each operation maintains its own backoff state.
66#[derive(Clone, Debug)]
67pub struct RetryManager<E> {
68    config: RetryConfig,
69    _phantom: PhantomData<E>,
70}
71
72impl<E> RetryManager<E>
73where
74    E: std::error::Error,
75{
76    /// Creates a new retry manager with the given configuration.
77    pub const fn new(config: RetryConfig) -> Self {
78        Self {
79            config,
80            _phantom: PhantomData,
81        }
82    }
83
84    /// Formats a retry budget exceeded error message with attempt context.
85    #[inline(always)]
86    fn budget_exceeded_msg(&self, attempt: u32) -> String {
87        format!(
88            "Retry budget exceeded ({}/{})",
89            attempt.saturating_add(1),
90            self.config.max_retries.saturating_add(1)
91        )
92    }
93
94    /// Executes an operation with retry logic and optional cancellation.
95    ///
96    /// Cancellation is checked at three points:
97    /// (1) Before each operation attempt.
98    /// (2) During operation execution (via `tokio::select!`).
99    /// (3) During retry delays.
100    ///
101    /// This means cancellation may be delayed by up to one operation timeout if it occurs mid-execution.
102    ///
103    /// # Errors
104    ///
105    /// Returns an error if the operation fails after exhausting all retries,
106    /// if the operation times out, if creating the backoff state fails, or if canceled.
107    pub async fn execute_with_retry_inner<F, Fut, T>(
108        &self,
109        operation_name: &str,
110        mut operation: F,
111        should_retry: impl Fn(&E) -> bool,
112        create_error: impl Fn(String) -> E,
113        cancel: Option<&CancellationToken>,
114    ) -> Result<T, E>
115    where
116        F: FnMut() -> Fut,
117        Fut: Future<Output = Result<T, E>>,
118    {
119        let mut backoff = ExponentialBackoff::new(
120            Duration::from_millis(self.config.initial_delay_ms),
121            Duration::from_millis(self.config.max_delay_ms),
122            self.config.backoff_factor,
123            self.config.jitter_ms,
124            self.config.immediate_first,
125        )
126        .map_err(|e| create_error(format!("Invalid configuration: {e}")))?;
127
128        let mut attempt = 0;
129        let start_time = tokio::time::Instant::now();
130
131        loop {
132            if let Some(token) = cancel
133                && token.is_cancelled()
134            {
135                tracing::debug!(
136                    operation = %operation_name,
137                    attempts = attempt,
138                    "Operation canceled"
139                );
140                return Err(create_error("canceled".to_string()));
141            }
142
143            if let Some(max_elapsed_ms) = self.config.max_elapsed_ms {
144                let elapsed = start_time.elapsed();
145                if elapsed.as_millis() >= u128::from(max_elapsed_ms) {
146                    return Err(create_error(self.budget_exceeded_msg(attempt)));
147                }
148            }
149
150            let result = match (self.config.operation_timeout_ms, cancel) {
151                (Some(timeout_ms), Some(token)) => {
152                    tokio::select! {
153                        result = tokio::time::timeout(Duration::from_millis(timeout_ms), operation()) => result,
154                        () = token.cancelled() => {
155                            tracing::debug!(
156                                operation = %operation_name,
157                                "Operation canceled during execution"
158                            );
159                            return Err(create_error("canceled".to_string()));
160                        }
161                    }
162                }
163                (Some(timeout_ms), None) => {
164                    tokio::time::timeout(Duration::from_millis(timeout_ms), operation()).await
165                }
166                (None, Some(token)) => {
167                    tokio::select! {
168                        result = operation() => Ok(result),
169                        () = token.cancelled() => {
170                            tracing::debug!(
171                                operation = %operation_name,
172                                "Operation canceled during execution"
173                            );
174                            return Err(create_error("canceled".to_string()));
175                        }
176                    }
177                }
178                (None, None) => Ok(operation().await),
179            };
180
181            match result {
182                Ok(Ok(success)) => {
183                    if attempt > 0 {
184                        tracing::trace!(
185                            operation = %operation_name,
186                            attempts = attempt + 1,
187                            "Retry succeeded"
188                        );
189                    }
190                    return Ok(success);
191                }
192                Ok(Err(e)) => {
193                    if !should_retry(&e) {
194                        tracing::trace!(
195                            operation = %operation_name,
196                            error = %e,
197                            "Non-retryable error"
198                        );
199                        return Err(e);
200                    }
201
202                    if attempt >= self.config.max_retries {
203                        tracing::trace!(
204                            operation = %operation_name,
205                            attempts = attempt + 1,
206                            error = %e,
207                            "Retries exhausted"
208                        );
209                        return Err(e);
210                    }
211
212                    let mut delay = backoff.next_duration();
213
214                    if let Some(max_elapsed_ms) = self.config.max_elapsed_ms {
215                        let elapsed = start_time.elapsed();
216                        let remaining =
217                            Duration::from_millis(max_elapsed_ms).saturating_sub(elapsed);
218
219                        if remaining.is_zero() {
220                            return Err(create_error(self.budget_exceeded_msg(attempt)));
221                        }
222
223                        delay = delay.min(remaining);
224                    }
225
226                    tracing::trace!(
227                        operation = %operation_name,
228                        attempt = attempt + 1,
229                        delay_ms = delay.as_millis() as u64,
230                        error = %e,
231                        "Retrying after failure"
232                    );
233
234                    // Yield even on zero-delay to avoid busy-wait loop
235                    if delay.is_zero() {
236                        tokio::task::yield_now().await;
237                        attempt += 1;
238                        continue;
239                    }
240
241                    if let Some(token) = cancel {
242                        tokio::select! {
243                            () = tokio::time::sleep(delay) => {},
244                            () = token.cancelled() => {
245                                tracing::debug!(
246                                    operation = %operation_name,
247                                    attempt = attempt + 1,
248                                    "Operation canceled during retry delay"
249                                );
250                                return Err(create_error("canceled".to_string()));
251                            }
252                        }
253                    } else {
254                        tokio::time::sleep(delay).await;
255                    }
256                    attempt += 1;
257                }
258                Err(_) => {
259                    let e = create_error(format!(
260                        "Timed out after {}ms",
261                        self.config.operation_timeout_ms.unwrap_or(0)
262                    ));
263
264                    if !should_retry(&e) {
265                        tracing::trace!(
266                            operation = %operation_name,
267                            error = %e,
268                            "Non-retryable timeout"
269                        );
270                        return Err(e);
271                    }
272
273                    if attempt >= self.config.max_retries {
274                        tracing::trace!(
275                            operation = %operation_name,
276                            attempts = attempt + 1,
277                            error = %e,
278                            "Retries exhausted after timeout"
279                        );
280                        return Err(e);
281                    }
282
283                    let mut delay = backoff.next_duration();
284
285                    if let Some(max_elapsed_ms) = self.config.max_elapsed_ms {
286                        let elapsed = start_time.elapsed();
287                        let remaining =
288                            Duration::from_millis(max_elapsed_ms).saturating_sub(elapsed);
289
290                        if remaining.is_zero() {
291                            return Err(create_error(self.budget_exceeded_msg(attempt)));
292                        }
293
294                        delay = delay.min(remaining);
295                    }
296
297                    tracing::trace!(
298                        operation = %operation_name,
299                        attempt = attempt + 1,
300                        delay_ms = delay.as_millis() as u64,
301                        error = %e,
302                        "Retrying after timeout"
303                    );
304
305                    // Yield even on zero-delay to avoid busy-wait loop
306                    if delay.is_zero() {
307                        tokio::task::yield_now().await;
308                        attempt += 1;
309                        continue;
310                    }
311
312                    if let Some(token) = cancel {
313                        tokio::select! {
314                            () = tokio::time::sleep(delay) => {},
315                            () = token.cancelled() => {
316                                tracing::debug!(
317                                    operation = %operation_name,
318                                    attempt = attempt + 1,
319                                    "Operation canceled during retry delay"
320                                );
321                                return Err(create_error("canceled".to_string()));
322                            }
323                        }
324                    } else {
325                        tokio::time::sleep(delay).await;
326                    }
327                    attempt += 1;
328                }
329            }
330        }
331    }
332
333    /// Executes an operation with retry logic.
334    ///
335    /// # Errors
336    ///
337    /// Returns an error if the operation fails after exhausting all retries,
338    /// if the operation times out, or if creating the backoff state fails.
339    pub async fn execute_with_retry<F, Fut, T>(
340        &self,
341        operation_name: &str,
342        operation: F,
343        should_retry: impl Fn(&E) -> bool,
344        create_error: impl Fn(String) -> E,
345    ) -> Result<T, E>
346    where
347        F: FnMut() -> Fut,
348        Fut: Future<Output = Result<T, E>>,
349    {
350        self.execute_with_retry_inner(operation_name, operation, should_retry, create_error, None)
351            .await
352    }
353
354    /// Executes an operation with retry logic and cancellation support.
355    ///
356    /// # Errors
357    ///
358    /// Returns an error if the operation fails after exhausting all retries,
359    /// if the operation times out, if creating the backoff state fails, or if canceled.
360    pub async fn execute_with_retry_with_cancel<F, Fut, T>(
361        &self,
362        operation_name: &str,
363        operation: F,
364        should_retry: impl Fn(&E) -> bool,
365        create_error: impl Fn(String) -> E,
366        cancellation_token: &CancellationToken,
367    ) -> Result<T, E>
368    where
369        F: FnMut() -> Fut,
370        Fut: Future<Output = Result<T, E>>,
371    {
372        self.execute_with_retry_inner(
373            operation_name,
374            operation,
375            should_retry,
376            create_error,
377            Some(cancellation_token),
378        )
379        .await
380    }
381}
382
383/// Convenience function to create a retry manager with default configuration.
384pub fn create_default_retry_manager<E>() -> RetryManager<E>
385where
386    E: std::error::Error,
387{
388    RetryManager::new(RetryConfig::default())
389}
390
391/// Convenience function to create a retry manager for HTTP operations.
392pub const fn create_http_retry_manager<E>() -> RetryManager<E>
393where
394    E: std::error::Error,
395{
396    let config = RetryConfig {
397        max_retries: 3,
398        initial_delay_ms: 1_000,
399        max_delay_ms: 10_000,
400        backoff_factor: 2.0,
401        jitter_ms: 1_000,
402        operation_timeout_ms: Some(60_000), // 60s for HTTP requests
403        immediate_first: false,
404        max_elapsed_ms: Some(180_000), // 3 minutes total budget
405    };
406    RetryManager::new(config)
407}
408
409/// Convenience function to create a retry manager for WebSocket operations.
410pub const fn create_websocket_retry_manager<E>() -> RetryManager<E>
411where
412    E: std::error::Error,
413{
414    let config = RetryConfig {
415        max_retries: 5,
416        initial_delay_ms: 1_000,
417        max_delay_ms: 10_000,
418        backoff_factor: 2.0,
419        jitter_ms: 1_000,
420        operation_timeout_ms: Some(30_000), // 30s for WebSocket operations
421        immediate_first: true,
422        max_elapsed_ms: Some(120_000), // 2 minutes total budget
423    };
424    RetryManager::new(config)
425}
426
427#[cfg(test)]
428mod test_utils {
429    #[derive(Debug, thiserror::Error)]
430    pub enum TestError {
431        #[error("Retryable error: {0}")]
432        Retryable(String),
433        #[error("Non-retryable error: {0}")]
434        NonRetryable(String),
435        #[error("Timeout error: {0}")]
436        Timeout(String),
437    }
438
439    pub fn should_retry_test_error(error: &TestError) -> bool {
440        matches!(error, TestError::Retryable(_))
441    }
442
443    pub fn create_test_error(msg: String) -> TestError {
444        TestError::Timeout(msg)
445    }
446}
447
448#[cfg(test)]
449mod tests {
450    use std::sync::{
451        Arc,
452        atomic::{AtomicU32, Ordering},
453    };
454
455    use nautilus_core::MUTEX_POISONED;
456    use rstest::rstest;
457
458    use super::{test_utils::*, *};
459
460    const MAX_WAIT_ITERS: usize = 10_000;
461    const MAX_ADVANCE_ITERS: usize = 10_000;
462
463    pub(crate) async fn yield_until<F>(mut condition: F)
464    where
465        F: FnMut() -> bool,
466    {
467        for _ in 0..MAX_WAIT_ITERS {
468            if condition() {
469                return;
470            }
471            tokio::task::yield_now().await;
472        }
473
474        panic!("yield_until timed out waiting for condition");
475    }
476
477    pub(crate) async fn advance_until<F>(mut condition: F)
478    where
479        F: FnMut() -> bool,
480    {
481        for _ in 0..MAX_ADVANCE_ITERS {
482            if condition() {
483                return;
484            }
485            tokio::time::advance(Duration::from_millis(1)).await;
486            tokio::task::yield_now().await;
487        }
488
489        panic!("advance_until timed out waiting for condition");
490    }
491
492    #[rstest]
493    fn test_retry_config_default() {
494        let config = RetryConfig::default();
495        assert_eq!(config.max_retries, 3);
496        assert_eq!(config.initial_delay_ms, 1_000);
497        assert_eq!(config.max_delay_ms, 10_000);
498        assert_eq!(config.backoff_factor, 2.0);
499        assert_eq!(config.jitter_ms, 100);
500        assert_eq!(config.operation_timeout_ms, Some(30_000));
501        assert!(!config.immediate_first);
502        assert_eq!(config.max_elapsed_ms, None);
503    }
504
505    #[tokio::test]
506    async fn test_retry_manager_success_first_attempt() {
507        let manager = RetryManager::new(RetryConfig::default());
508
509        let result = manager
510            .execute_with_retry(
511                "test_operation",
512                || async { Ok::<i32, TestError>(42) },
513                should_retry_test_error,
514                create_test_error,
515            )
516            .await;
517
518        assert_eq!(result.unwrap(), 42);
519    }
520
521    #[tokio::test]
522    async fn test_retry_manager_non_retryable_error() {
523        let manager = RetryManager::new(RetryConfig::default());
524
525        let result = manager
526            .execute_with_retry(
527                "test_operation",
528                || async { Err::<i32, TestError>(TestError::NonRetryable("test".to_string())) },
529                should_retry_test_error,
530                create_test_error,
531            )
532            .await;
533
534        assert!(result.is_err());
535        assert!(matches!(result.unwrap_err(), TestError::NonRetryable(_)));
536    }
537
538    #[tokio::test]
539    async fn test_retry_manager_retryable_error_exhausted() {
540        let config = RetryConfig {
541            max_retries: 2,
542            initial_delay_ms: 10,
543            max_delay_ms: 50,
544            backoff_factor: 2.0,
545            jitter_ms: 0,
546            operation_timeout_ms: None,
547            immediate_first: false,
548            max_elapsed_ms: None,
549        };
550        let manager = RetryManager::new(config);
551
552        let result = manager
553            .execute_with_retry(
554                "test_operation",
555                || async { Err::<i32, TestError>(TestError::Retryable("test".to_string())) },
556                should_retry_test_error,
557                create_test_error,
558            )
559            .await;
560
561        assert!(result.is_err());
562        assert!(matches!(result.unwrap_err(), TestError::Retryable(_)));
563    }
564
565    #[tokio::test]
566    async fn test_timeout_path() {
567        let config = RetryConfig {
568            max_retries: 2,
569            initial_delay_ms: 10,
570            max_delay_ms: 50,
571            backoff_factor: 2.0,
572            jitter_ms: 0,
573            operation_timeout_ms: Some(50),
574            immediate_first: false,
575            max_elapsed_ms: None,
576        };
577        let manager = RetryManager::new(config);
578
579        let result = manager
580            .execute_with_retry(
581                "test_timeout",
582                || async {
583                    tokio::time::sleep(Duration::from_millis(100)).await;
584                    Ok::<i32, TestError>(42)
585                },
586                should_retry_test_error,
587                create_test_error,
588            )
589            .await;
590
591        assert!(result.is_err());
592        assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
593    }
594
595    #[tokio::test]
596    async fn test_max_elapsed_time_budget() {
597        let config = RetryConfig {
598            max_retries: 10,
599            initial_delay_ms: 50,
600            max_delay_ms: 100,
601            backoff_factor: 2.0,
602            jitter_ms: 0,
603            operation_timeout_ms: None,
604            immediate_first: false,
605            max_elapsed_ms: Some(200),
606        };
607        let manager = RetryManager::new(config);
608
609        let start = tokio::time::Instant::now();
610        let result = manager
611            .execute_with_retry(
612                "test_budget",
613                || async { Err::<i32, TestError>(TestError::Retryable("test".to_string())) },
614                should_retry_test_error,
615                create_test_error,
616            )
617            .await;
618
619        let elapsed = start.elapsed();
620        assert!(result.is_err());
621        assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
622        assert!(elapsed.as_millis() >= 150);
623        assert!(elapsed.as_millis() < 1000);
624    }
625
626    #[tokio::test]
627    async fn test_budget_exceeded_message_format() {
628        let config = RetryConfig {
629            max_retries: 5,
630            initial_delay_ms: 10,
631            max_delay_ms: 20,
632            backoff_factor: 1.0,
633            jitter_ms: 0,
634            operation_timeout_ms: None,
635            immediate_first: false,
636            max_elapsed_ms: Some(35),
637        };
638        let manager = RetryManager::new(config);
639
640        let result = manager
641            .execute_with_retry(
642                "test_budget_msg",
643                || async { Err::<i32, TestError>(TestError::Retryable("test".to_string())) },
644                should_retry_test_error,
645                create_test_error,
646            )
647            .await;
648
649        assert!(result.is_err());
650        let error_msg = result.unwrap_err().to_string();
651
652        assert!(error_msg.contains("Retry budget exceeded"));
653        assert!(error_msg.contains("/6)"));
654
655        if let Some(captures) = error_msg.strip_prefix("Timeout error: Retry budget exceeded (")
656            && let Some(nums) = captures.strip_suffix(")")
657        {
658            let parts: Vec<&str> = nums.split('/').collect();
659            assert_eq!(parts.len(), 2);
660            let current: u32 = parts[0].parse().unwrap();
661            let total: u32 = parts[1].parse().unwrap();
662
663            assert_eq!(total, 6, "Total should be max_retries + 1");
664            assert!(current <= total, "Current attempt should not exceed total");
665            assert!(current >= 1, "Current attempt should be at least 1");
666        }
667    }
668
669    #[tokio::test(start_paused = true)]
670    async fn test_budget_exceeded_edge_cases() {
671        let config = RetryConfig {
672            max_retries: 2,
673            initial_delay_ms: 50,
674            max_delay_ms: 100,
675            backoff_factor: 1.0,
676            jitter_ms: 0,
677            operation_timeout_ms: None,
678            immediate_first: false,
679            max_elapsed_ms: Some(100),
680        };
681        let manager = RetryManager::new(config);
682
683        let attempt_count = Arc::new(AtomicU32::new(0));
684        let count_clone = attempt_count.clone();
685
686        let handle = tokio::spawn(async move {
687            manager
688                .execute_with_retry(
689                    "test_first_attempt",
690                    move || {
691                        let count = count_clone.clone();
692                        async move {
693                            count.fetch_add(1, Ordering::SeqCst);
694                            Err::<i32, TestError>(TestError::Retryable("test".to_string()))
695                        }
696                    },
697                    should_retry_test_error,
698                    create_test_error,
699                )
700                .await
701        });
702
703        // Wait for first attempt
704        yield_until(|| attempt_count.load(Ordering::SeqCst) >= 1).await;
705
706        // Advance past budget to trigger check at loop start before second attempt
707        tokio::time::advance(Duration::from_millis(101)).await;
708        tokio::task::yield_now().await;
709
710        let result = handle.await.unwrap();
711        assert!(result.is_err());
712        let error_msg = result.unwrap_err().to_string();
713
714        // Budget check happens at loop start, so shows (2/3) = "starting 2nd of 3 attempts"
715        assert!(
716            error_msg.contains("(2/3)"),
717            "Expected (2/3) but got: {error_msg}"
718        );
719    }
720
721    #[tokio::test]
722    async fn test_budget_exceeded_no_overflow() {
723        let config = RetryConfig {
724            max_retries: u32::MAX,
725            initial_delay_ms: 10,
726            max_delay_ms: 20,
727            backoff_factor: 1.0,
728            jitter_ms: 0,
729            operation_timeout_ms: None,
730            immediate_first: false,
731            max_elapsed_ms: Some(1),
732        };
733        let manager = RetryManager::new(config);
734
735        let result = manager
736            .execute_with_retry(
737                "test_overflow",
738                || async { Err::<i32, TestError>(TestError::Retryable("test".to_string())) },
739                should_retry_test_error,
740                create_test_error,
741            )
742            .await;
743
744        assert!(result.is_err());
745        let error_msg = result.unwrap_err().to_string();
746
747        // Should saturate at u32::MAX instead of wrapping to 0
748        assert!(error_msg.contains("Retry budget exceeded"));
749        assert!(error_msg.contains(&format!("/{}", u32::MAX)));
750    }
751
752    #[rstest]
753    fn test_http_retry_manager_config() {
754        let manager = create_http_retry_manager::<TestError>();
755        assert_eq!(manager.config.max_retries, 3);
756        assert!(!manager.config.immediate_first);
757        assert_eq!(manager.config.max_elapsed_ms, Some(180_000));
758    }
759
760    #[rstest]
761    fn test_websocket_retry_manager_config() {
762        let manager = create_websocket_retry_manager::<TestError>();
763        assert_eq!(manager.config.max_retries, 5);
764        assert!(manager.config.immediate_first);
765        assert_eq!(manager.config.max_elapsed_ms, Some(120_000));
766    }
767
768    #[tokio::test]
769    async fn test_timeout_respects_retry_predicate() {
770        let config = RetryConfig {
771            max_retries: 3,
772            initial_delay_ms: 10,
773            max_delay_ms: 50,
774            backoff_factor: 2.0,
775            jitter_ms: 0,
776            operation_timeout_ms: Some(50),
777            immediate_first: false,
778            max_elapsed_ms: None,
779        };
780        let manager = RetryManager::new(config);
781
782        // Test with retry predicate that rejects timeouts
783        let should_not_retry_timeouts = |error: &TestError| !matches!(error, TestError::Timeout(_));
784
785        let result = manager
786            .execute_with_retry(
787                "test_timeout_non_retryable",
788                || async {
789                    tokio::time::sleep(Duration::from_millis(100)).await;
790                    Ok::<i32, TestError>(42)
791                },
792                should_not_retry_timeouts,
793                create_test_error,
794            )
795            .await;
796
797        // Should fail immediately without retries since timeout is non-retryable
798        assert!(result.is_err());
799        assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
800    }
801
802    #[tokio::test]
803    async fn test_timeout_retries_when_predicate_allows() {
804        let config = RetryConfig {
805            max_retries: 2,
806            initial_delay_ms: 10,
807            max_delay_ms: 50,
808            backoff_factor: 2.0,
809            jitter_ms: 0,
810            operation_timeout_ms: Some(50),
811            immediate_first: false,
812            max_elapsed_ms: None,
813        };
814        let manager = RetryManager::new(config);
815
816        // Test with retry predicate that allows timeouts
817        let should_retry_timeouts = |error: &TestError| matches!(error, TestError::Timeout(_));
818
819        let start = tokio::time::Instant::now();
820        let result = manager
821            .execute_with_retry(
822                "test_timeout_retryable",
823                || async {
824                    tokio::time::sleep(Duration::from_millis(100)).await;
825                    Ok::<i32, TestError>(42)
826                },
827                should_retry_timeouts,
828                create_test_error,
829            )
830            .await;
831
832        let elapsed = start.elapsed();
833
834        // Should fail after retries (not immediately)
835        assert!(result.is_err());
836        assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
837        // Should have taken time for retries (at least 2 timeouts + delays)
838        assert!(elapsed.as_millis() > 80); // More than just one timeout
839    }
840
841    #[tokio::test]
842    async fn test_successful_retry_after_failures() {
843        let config = RetryConfig {
844            max_retries: 3,
845            initial_delay_ms: 10,
846            max_delay_ms: 50,
847            backoff_factor: 2.0,
848            jitter_ms: 0,
849            operation_timeout_ms: None,
850            immediate_first: false,
851            max_elapsed_ms: None,
852        };
853        let manager = RetryManager::new(config);
854
855        let attempt_counter = Arc::new(AtomicU32::new(0));
856        let counter_clone = attempt_counter.clone();
857
858        let result = manager
859            .execute_with_retry(
860                "test_eventual_success",
861                move || {
862                    let counter = counter_clone.clone();
863                    async move {
864                        let attempts = counter.fetch_add(1, Ordering::SeqCst);
865                        if attempts < 2 {
866                            Err(TestError::Retryable("temporary failure".to_string()))
867                        } else {
868                            Ok(42)
869                        }
870                    }
871                },
872                should_retry_test_error,
873                create_test_error,
874            )
875            .await;
876
877        assert_eq!(result.unwrap(), 42);
878        assert_eq!(attempt_counter.load(Ordering::SeqCst), 3);
879    }
880
881    #[tokio::test(start_paused = true)]
882    async fn test_immediate_first_retry() {
883        let config = RetryConfig {
884            max_retries: 2,
885            initial_delay_ms: 100,
886            max_delay_ms: 200,
887            backoff_factor: 2.0,
888            jitter_ms: 0,
889            operation_timeout_ms: None,
890            immediate_first: true,
891            max_elapsed_ms: None,
892        };
893        let manager = RetryManager::new(config);
894
895        let attempt_times = Arc::new(std::sync::Mutex::new(Vec::new()));
896        let times_clone = attempt_times.clone();
897        let start = tokio::time::Instant::now();
898
899        let handle = tokio::spawn({
900            let times_clone = times_clone.clone();
901            async move {
902                let _ = manager
903                    .execute_with_retry(
904                        "test_immediate",
905                        move || {
906                            let times = times_clone.clone();
907                            async move {
908                                times.lock().expect(MUTEX_POISONED).push(start.elapsed());
909                                Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
910                            }
911                        },
912                        should_retry_test_error,
913                        create_test_error,
914                    )
915                    .await;
916            }
917        });
918
919        // Allow initial attempt and immediate retry to run without advancing time
920        yield_until(|| attempt_times.lock().expect(MUTEX_POISONED).len() >= 2).await;
921
922        // Advance time for the next backoff interval
923        tokio::time::advance(Duration::from_millis(100)).await;
924        tokio::task::yield_now().await;
925
926        // Wait for the final retry to be recorded
927        yield_until(|| attempt_times.lock().expect(MUTEX_POISONED).len() >= 3).await;
928
929        handle.await.unwrap();
930
931        let times = attempt_times.lock().expect(MUTEX_POISONED);
932        assert_eq!(times.len(), 3); // Initial + 2 retries
933
934        // First retry should be immediate (within 1ms tolerance)
935        assert!(times[1] <= Duration::from_millis(1));
936        // Second retry should have backoff delay (at least 100ms from start)
937        assert!(times[2] >= Duration::from_millis(100));
938        assert!(times[2] <= Duration::from_millis(110));
939    }
940
941    #[tokio::test]
942    async fn test_operation_without_timeout() {
943        let config = RetryConfig {
944            max_retries: 2,
945            initial_delay_ms: 10,
946            max_delay_ms: 50,
947            backoff_factor: 2.0,
948            jitter_ms: 0,
949            operation_timeout_ms: None, // No timeout
950            immediate_first: false,
951            max_elapsed_ms: None,
952        };
953        let manager = RetryManager::new(config);
954
955        let start = tokio::time::Instant::now();
956        let result = manager
957            .execute_with_retry(
958                "test_no_timeout",
959                || async {
960                    tokio::time::sleep(Duration::from_millis(50)).await;
961                    Ok::<i32, TestError>(42)
962                },
963                should_retry_test_error,
964                create_test_error,
965            )
966            .await;
967
968        let elapsed = start.elapsed();
969        assert_eq!(result.unwrap(), 42);
970        // Should complete without timing out
971        assert!(elapsed.as_millis() >= 30);
972        assert!(elapsed.as_millis() < 200);
973    }
974
975    #[tokio::test]
976    async fn test_zero_retries() {
977        let config = RetryConfig {
978            max_retries: 0,
979            initial_delay_ms: 10,
980            max_delay_ms: 50,
981            backoff_factor: 2.0,
982            jitter_ms: 0,
983            operation_timeout_ms: None,
984            immediate_first: false,
985            max_elapsed_ms: None,
986        };
987        let manager = RetryManager::new(config);
988
989        let attempt_counter = Arc::new(AtomicU32::new(0));
990        let counter_clone = attempt_counter.clone();
991
992        let result = manager
993            .execute_with_retry(
994                "test_no_retries",
995                move || {
996                    let counter = counter_clone.clone();
997                    async move {
998                        counter.fetch_add(1, Ordering::SeqCst);
999                        Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1000                    }
1001                },
1002                should_retry_test_error,
1003                create_test_error,
1004            )
1005            .await;
1006
1007        assert!(result.is_err());
1008        // Should only attempt once (no retries)
1009        assert_eq!(attempt_counter.load(Ordering::SeqCst), 1);
1010    }
1011
1012    #[tokio::test(start_paused = true)]
1013    async fn test_jitter_applied() {
1014        let config = RetryConfig {
1015            max_retries: 2,
1016            initial_delay_ms: 50,
1017            max_delay_ms: 100,
1018            backoff_factor: 2.0,
1019            jitter_ms: 50, // Significant jitter
1020            operation_timeout_ms: None,
1021            immediate_first: false,
1022            max_elapsed_ms: None,
1023        };
1024        let manager = RetryManager::new(config);
1025
1026        let delays = Arc::new(std::sync::Mutex::new(Vec::new()));
1027        let delays_clone = delays.clone();
1028        let last_time = Arc::new(std::sync::Mutex::new(tokio::time::Instant::now()));
1029        let last_time_clone = last_time.clone();
1030
1031        let handle = tokio::spawn({
1032            let delays_clone = delays_clone.clone();
1033            async move {
1034                let _ = manager
1035                    .execute_with_retry(
1036                        "test_jitter",
1037                        move || {
1038                            let delays = delays_clone.clone();
1039                            let last_time = last_time_clone.clone();
1040                            async move {
1041                                let now = tokio::time::Instant::now();
1042                                let delay = {
1043                                    let mut last = last_time.lock().expect(MUTEX_POISONED);
1044                                    let d = now.duration_since(*last);
1045                                    *last = now;
1046                                    d
1047                                };
1048                                delays.lock().expect(MUTEX_POISONED).push(delay);
1049                                Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1050                            }
1051                        },
1052                        should_retry_test_error,
1053                        create_test_error,
1054                    )
1055                    .await;
1056            }
1057        });
1058
1059        yield_until(|| !delays.lock().expect(MUTEX_POISONED).is_empty()).await;
1060        advance_until(|| delays.lock().expect(MUTEX_POISONED).len() >= 2).await;
1061        advance_until(|| delays.lock().expect(MUTEX_POISONED).len() >= 3).await;
1062
1063        handle.await.unwrap();
1064
1065        let delays = delays.lock().expect(MUTEX_POISONED);
1066        // Skip the first delay (initial attempt)
1067        for delay in delays.iter().skip(1) {
1068            // Each delay should be at least the base delay (50ms for first retry)
1069            assert!(delay.as_millis() >= 50);
1070            // But no more than base + jitter (allow small tolerance for step advance)
1071            assert!(delay.as_millis() <= 151);
1072        }
1073    }
1074
1075    #[tokio::test]
1076    async fn test_max_elapsed_stops_early() {
1077        let config = RetryConfig {
1078            max_retries: 100, // Very high retry count
1079            initial_delay_ms: 50,
1080            max_delay_ms: 100,
1081            backoff_factor: 1.5,
1082            jitter_ms: 0,
1083            operation_timeout_ms: None,
1084            immediate_first: false,
1085            max_elapsed_ms: Some(150), // Should stop after ~3 attempts
1086        };
1087        let manager = RetryManager::new(config);
1088
1089        let attempt_counter = Arc::new(AtomicU32::new(0));
1090        let counter_clone = attempt_counter.clone();
1091
1092        let start = tokio::time::Instant::now();
1093        let result = manager
1094            .execute_with_retry(
1095                "test_elapsed_limit",
1096                move || {
1097                    let counter = counter_clone.clone();
1098                    async move {
1099                        counter.fetch_add(1, Ordering::SeqCst);
1100                        Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1101                    }
1102                },
1103                should_retry_test_error,
1104                create_test_error,
1105            )
1106            .await;
1107
1108        let elapsed = start.elapsed();
1109        assert!(result.is_err());
1110        assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
1111
1112        // Should have stopped due to time limit, not retry count
1113        let attempts = attempt_counter.load(Ordering::SeqCst);
1114        assert!(attempts < 10); // Much less than max_retries
1115        assert!(elapsed.as_millis() >= 100);
1116    }
1117
1118    #[tokio::test]
1119    async fn test_mixed_errors_retry_behavior() {
1120        let config = RetryConfig {
1121            max_retries: 5,
1122            initial_delay_ms: 10,
1123            max_delay_ms: 50,
1124            backoff_factor: 2.0,
1125            jitter_ms: 0,
1126            operation_timeout_ms: None,
1127            immediate_first: false,
1128            max_elapsed_ms: None,
1129        };
1130        let manager = RetryManager::new(config);
1131
1132        let attempt_counter = Arc::new(AtomicU32::new(0));
1133        let counter_clone = attempt_counter.clone();
1134
1135        let result = manager
1136            .execute_with_retry(
1137                "test_mixed_errors",
1138                move || {
1139                    let counter = counter_clone.clone();
1140                    async move {
1141                        let attempts = counter.fetch_add(1, Ordering::SeqCst);
1142                        match attempts {
1143                            0 => Err(TestError::Retryable("retry 1".to_string())),
1144                            1 => Err(TestError::Retryable("retry 2".to_string())),
1145                            2 => Err(TestError::NonRetryable("stop here".to_string())),
1146                            _ => Ok(42),
1147                        }
1148                    }
1149                },
1150                should_retry_test_error,
1151                create_test_error,
1152            )
1153            .await;
1154
1155        assert!(result.is_err());
1156        assert!(matches!(result.unwrap_err(), TestError::NonRetryable(_)));
1157        // Should stop at the non-retryable error
1158        assert_eq!(attempt_counter.load(Ordering::SeqCst), 3);
1159    }
1160
1161    #[tokio::test]
1162    async fn test_cancellation_during_retry_delay() {
1163        use tokio_util::sync::CancellationToken;
1164
1165        let config = RetryConfig {
1166            max_retries: 10,
1167            initial_delay_ms: 500, // Long delay to ensure cancellation happens during sleep
1168            max_delay_ms: 1000,
1169            backoff_factor: 2.0,
1170            jitter_ms: 0,
1171            operation_timeout_ms: None,
1172            immediate_first: false,
1173            max_elapsed_ms: None,
1174        };
1175        let manager = RetryManager::new(config);
1176
1177        let token = CancellationToken::new();
1178        let token_clone = token.clone();
1179
1180        // Cancel after a short delay
1181        tokio::spawn(async move {
1182            tokio::time::sleep(Duration::from_millis(100)).await;
1183            token_clone.cancel();
1184        });
1185
1186        let attempt_counter = Arc::new(AtomicU32::new(0));
1187        let counter_clone = attempt_counter.clone();
1188
1189        let start = tokio::time::Instant::now();
1190        let result = manager
1191            .execute_with_retry_with_cancel(
1192                "test_cancellation",
1193                move || {
1194                    let counter = counter_clone.clone();
1195                    async move {
1196                        counter.fetch_add(1, Ordering::SeqCst);
1197                        Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1198                    }
1199                },
1200                should_retry_test_error,
1201                create_test_error,
1202                &token,
1203            )
1204            .await;
1205
1206        let elapsed = start.elapsed();
1207
1208        // Should be canceled quickly
1209        assert!(result.is_err());
1210        let error_msg = format!("{}", result.unwrap_err());
1211        assert!(error_msg.contains("canceled"));
1212
1213        // Should not have taken the full delay time
1214        assert!(elapsed.as_millis() < 600);
1215
1216        // Should have made at least one attempt
1217        let attempts = attempt_counter.load(Ordering::SeqCst);
1218        assert!(attempts >= 1);
1219    }
1220
1221    #[tokio::test]
1222    async fn test_cancellation_during_operation_execution() {
1223        use tokio_util::sync::CancellationToken;
1224
1225        let config = RetryConfig {
1226            max_retries: 5,
1227            initial_delay_ms: 50,
1228            max_delay_ms: 100,
1229            backoff_factor: 2.0,
1230            jitter_ms: 0,
1231            operation_timeout_ms: None,
1232            immediate_first: false,
1233            max_elapsed_ms: None,
1234        };
1235        let manager = RetryManager::new(config);
1236
1237        let token = CancellationToken::new();
1238        let token_clone = token.clone();
1239
1240        // Cancel after a short delay
1241        tokio::spawn(async move {
1242            tokio::time::sleep(Duration::from_millis(50)).await;
1243            token_clone.cancel();
1244        });
1245
1246        let start = tokio::time::Instant::now();
1247        let result = manager
1248            .execute_with_retry_with_cancel(
1249                "test_cancellation_during_op",
1250                || async {
1251                    // Long-running operation
1252                    tokio::time::sleep(Duration::from_millis(200)).await;
1253                    Ok::<i32, TestError>(42)
1254                },
1255                should_retry_test_error,
1256                create_test_error,
1257                &token,
1258            )
1259            .await;
1260
1261        let elapsed = start.elapsed();
1262
1263        // Should be canceled during the operation
1264        assert!(result.is_err());
1265        let error_msg = format!("{}", result.unwrap_err());
1266        assert!(error_msg.contains("canceled"));
1267
1268        // Should not have completed the long operation
1269        assert!(elapsed.as_millis() < 250);
1270    }
1271
1272    #[tokio::test]
1273    async fn test_cancellation_error_message() {
1274        use tokio_util::sync::CancellationToken;
1275
1276        let config = RetryConfig::default();
1277        let manager = RetryManager::new(config);
1278
1279        let token = CancellationToken::new();
1280        token.cancel(); // Pre-cancel for immediate cancellation
1281
1282        let result = manager
1283            .execute_with_retry_with_cancel(
1284                "test_operation",
1285                || async { Ok::<i32, TestError>(42) },
1286                should_retry_test_error,
1287                create_test_error,
1288                &token,
1289            )
1290            .await;
1291
1292        assert!(result.is_err());
1293        let error_msg = format!("{}", result.unwrap_err());
1294        assert!(error_msg.contains("canceled"));
1295    }
1296}
1297
1298#[cfg(test)]
1299mod proptest_tests {
1300    use std::sync::{
1301        Arc,
1302        atomic::{AtomicU32, Ordering},
1303    };
1304
1305    use nautilus_core::MUTEX_POISONED;
1306    use proptest::prelude::*;
1307    // Import rstest attribute macro used within proptest! tests
1308    use rstest::rstest;
1309
1310    use super::{
1311        test_utils::*,
1312        tests::{advance_until, yield_until},
1313        *,
1314    };
1315
1316    proptest! {
1317        #[rstest]
1318        fn test_retry_config_valid_ranges(
1319            max_retries in 0u32..100,
1320            initial_delay_ms in 1u64..10_000,
1321            max_delay_ms in 1u64..60_000,
1322            backoff_factor in 1.0f64..10.0,
1323            jitter_ms in 0u64..1_000,
1324            operation_timeout_ms in prop::option::of(1u64..120_000),
1325            immediate_first in any::<bool>(),
1326            max_elapsed_ms in prop::option::of(1u64..300_000)
1327        ) {
1328            // Ensure max_delay >= initial_delay for valid config
1329            let max_delay_ms = max_delay_ms.max(initial_delay_ms);
1330
1331            let config = RetryConfig {
1332                max_retries,
1333                initial_delay_ms,
1334                max_delay_ms,
1335                backoff_factor,
1336                jitter_ms,
1337                operation_timeout_ms,
1338                immediate_first,
1339                max_elapsed_ms,
1340            };
1341
1342            // Should always be able to create a RetryManager with valid config
1343            let _manager = RetryManager::<std::io::Error>::new(config);
1344        }
1345
1346        #[rstest]
1347        fn test_retry_attempts_bounded(
1348            max_retries in 0u32..5,
1349            initial_delay_ms in 1u64..10,
1350            backoff_factor in 1.0f64..2.0,
1351        ) {
1352            let rt = tokio::runtime::Builder::new_current_thread()
1353                .enable_time()
1354                .build()
1355                .unwrap();
1356
1357            let config = RetryConfig {
1358                max_retries,
1359                initial_delay_ms,
1360                max_delay_ms: initial_delay_ms * 2,
1361                backoff_factor,
1362                jitter_ms: 0,
1363                operation_timeout_ms: None,
1364                immediate_first: false,
1365                max_elapsed_ms: None,
1366            };
1367
1368            let manager = RetryManager::new(config);
1369            let attempt_counter = Arc::new(AtomicU32::new(0));
1370            let counter_clone = attempt_counter.clone();
1371
1372            let _result = rt.block_on(manager.execute_with_retry(
1373                "prop_test",
1374                move || {
1375                    let counter = counter_clone.clone();
1376                    async move {
1377                        counter.fetch_add(1, Ordering::SeqCst);
1378                        Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1379                    }
1380                },
1381                |e: &TestError| matches!(e, TestError::Retryable(_)),
1382                TestError::Timeout,
1383            ));
1384
1385            let attempts = attempt_counter.load(Ordering::SeqCst);
1386            // Total attempts should be 1 (initial) + max_retries
1387            prop_assert_eq!(attempts, max_retries + 1);
1388        }
1389
1390        #[rstest]
1391        fn test_timeout_always_respected(
1392            timeout_ms in 10u64..50,
1393            operation_delay_ms in 60u64..100,
1394        ) {
1395            let rt = tokio::runtime::Builder::new_current_thread()
1396                .enable_time()
1397                .start_paused(true)
1398                .build()
1399                .unwrap();
1400
1401            let config = RetryConfig {
1402                max_retries: 0, // No retries to isolate timeout behavior
1403                initial_delay_ms: 10,
1404                max_delay_ms: 100,
1405                backoff_factor: 2.0,
1406                jitter_ms: 0,
1407                operation_timeout_ms: Some(timeout_ms),
1408                immediate_first: false,
1409                max_elapsed_ms: None,
1410            };
1411
1412            let manager = RetryManager::new(config);
1413
1414            let result = rt.block_on(async {
1415                let operation_future = manager.execute_with_retry(
1416                    "timeout_test",
1417                    move || async move {
1418                        tokio::time::sleep(Duration::from_millis(operation_delay_ms)).await;
1419                        Ok::<i32, TestError>(42)
1420                    },
1421                    |_: &TestError| true,
1422                    TestError::Timeout,
1423                );
1424
1425                // Advance time to trigger timeout
1426                tokio::time::advance(Duration::from_millis(timeout_ms + 10)).await;
1427                operation_future.await
1428            });
1429
1430            // Operation should timeout
1431            prop_assert!(result.is_err());
1432            prop_assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
1433        }
1434
1435        #[rstest]
1436        fn test_max_elapsed_always_respected(
1437            max_elapsed_ms in 20u64..50,
1438            delay_per_retry in 15u64..30,
1439            max_retries in 10u32..20,
1440        ) {
1441            let rt = tokio::runtime::Builder::new_current_thread()
1442                .enable_time()
1443                .start_paused(true)
1444                .build()
1445                .unwrap();
1446
1447            // Set up config where we would exceed max_elapsed_ms before max_retries
1448            let config = RetryConfig {
1449                max_retries,
1450                initial_delay_ms: delay_per_retry,
1451                max_delay_ms: delay_per_retry * 2,
1452                backoff_factor: 1.0, // No backoff to make timing predictable
1453                jitter_ms: 0,
1454                operation_timeout_ms: None,
1455                immediate_first: false,
1456                max_elapsed_ms: Some(max_elapsed_ms),
1457            };
1458
1459            let manager = RetryManager::new(config);
1460            let attempt_counter = Arc::new(AtomicU32::new(0));
1461            let counter_clone = attempt_counter.clone();
1462
1463            let result = rt.block_on(async {
1464                let operation_future = manager.execute_with_retry(
1465                    "elapsed_test",
1466                    move || {
1467                        let counter = counter_clone.clone();
1468                        async move {
1469                            counter.fetch_add(1, Ordering::SeqCst);
1470                            Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1471                        }
1472                    },
1473                    |e: &TestError| matches!(e, TestError::Retryable(_)),
1474                    TestError::Timeout,
1475                );
1476
1477                // Advance time past max_elapsed_ms
1478                tokio::time::advance(Duration::from_millis(max_elapsed_ms + delay_per_retry)).await;
1479                operation_future.await
1480            });
1481
1482            let attempts = attempt_counter.load(Ordering::SeqCst);
1483
1484            // Should have failed with timeout error
1485            prop_assert!(result.is_err());
1486            prop_assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
1487
1488            // Should have stopped before exhausting all retries
1489            prop_assert!(attempts <= max_retries + 1);
1490        }
1491
1492        #[rstest]
1493        fn test_jitter_bounds(
1494            jitter_ms in 0u64..20,
1495            base_delay_ms in 10u64..30,
1496        ) {
1497            let rt = tokio::runtime::Builder::new_current_thread()
1498                .enable_time()
1499                .start_paused(true)
1500                .build()
1501                .unwrap();
1502
1503            let config = RetryConfig {
1504                max_retries: 2,
1505                initial_delay_ms: base_delay_ms,
1506                max_delay_ms: base_delay_ms * 2,
1507                backoff_factor: 1.0, // No backoff to isolate jitter
1508                jitter_ms,
1509                operation_timeout_ms: None,
1510                immediate_first: false,
1511                max_elapsed_ms: None,
1512            };
1513
1514            let manager = RetryManager::new(config);
1515            let attempt_times = Arc::new(std::sync::Mutex::new(Vec::new()));
1516            let attempt_times_for_block = attempt_times.clone();
1517
1518            rt.block_on(async move {
1519                let attempt_times_for_wait = attempt_times_for_block.clone();
1520                let handle = tokio::spawn({
1521                    let attempt_times_for_task = attempt_times_for_block.clone();
1522                    let manager = manager;
1523                    async move {
1524                        let start_time = tokio::time::Instant::now();
1525                        let _ = manager
1526                            .execute_with_retry(
1527                                "jitter_test",
1528                                move || {
1529                                    let attempt_times_inner = attempt_times_for_task.clone();
1530                                    async move {
1531                                        attempt_times_inner
1532                                            .lock()
1533                                            .unwrap()
1534                                            .push(start_time.elapsed());
1535                                        Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1536                                    }
1537                                },
1538                                |e: &TestError| matches!(e, TestError::Retryable(_)),
1539                                TestError::Timeout,
1540                            )
1541                            .await;
1542                    }
1543                });
1544
1545                yield_until(|| !attempt_times_for_wait.lock().expect(MUTEX_POISONED).is_empty()).await;
1546                advance_until(|| attempt_times_for_wait.lock().expect(MUTEX_POISONED).len() >= 2).await;
1547                advance_until(|| attempt_times_for_wait.lock().expect(MUTEX_POISONED).len() >= 3).await;
1548
1549                handle.await.unwrap();
1550            });
1551
1552            let times = attempt_times.lock().expect(MUTEX_POISONED);
1553
1554            // We expect at least 2 attempts total (initial + at least 1 retry)
1555            prop_assert!(times.len() >= 2);
1556
1557            // First attempt should be immediate (no delay)
1558            prop_assert!(times[0].as_millis() < 5);
1559
1560            // Check subsequent retries have appropriate delays
1561            for i in 1..times.len() {
1562                let delay_from_previous = if i == 1 {
1563                    times[i] - times[0]
1564                } else {
1565                    times[i] - times[i - 1]
1566                };
1567
1568                // The delay should be at least base_delay_ms
1569                prop_assert!(
1570                    delay_from_previous.as_millis() >= base_delay_ms as u128,
1571                    "Retry {} delay {}ms is less than base {}ms",
1572                    i, delay_from_previous.as_millis(), base_delay_ms
1573                );
1574
1575                // Delay should be at most base_delay + jitter
1576                prop_assert!(
1577                    delay_from_previous.as_millis() <= (base_delay_ms + jitter_ms + 1) as u128,
1578                    "Retry {} delay {}ms exceeds base {} + jitter {}",
1579                    i, delay_from_previous.as_millis(), base_delay_ms, jitter_ms
1580                );
1581            }
1582        }
1583
1584        #[rstest]
1585        fn test_immediate_first_property(
1586            immediate_first in any::<bool>(),
1587            initial_delay_ms in 10u64..30,
1588        ) {
1589            let rt = tokio::runtime::Builder::new_current_thread()
1590                .enable_time()
1591                .start_paused(true)
1592                .build()
1593                .unwrap();
1594
1595            let config = RetryConfig {
1596                max_retries: 2,
1597                initial_delay_ms,
1598                max_delay_ms: initial_delay_ms * 2,
1599                backoff_factor: 2.0,
1600                jitter_ms: 0,
1601                operation_timeout_ms: None,
1602                immediate_first,
1603                max_elapsed_ms: None,
1604            };
1605
1606            let manager = RetryManager::new(config);
1607            let attempt_times = Arc::new(std::sync::Mutex::new(Vec::new()));
1608            let attempt_times_for_block = attempt_times.clone();
1609
1610            rt.block_on(async move {
1611                let attempt_times_for_wait = attempt_times_for_block.clone();
1612                let handle = tokio::spawn({
1613                    let attempt_times_for_task = attempt_times_for_block.clone();
1614                    let manager = manager;
1615                    async move {
1616                        let start = tokio::time::Instant::now();
1617                        let _ = manager
1618                            .execute_with_retry(
1619                                "immediate_test",
1620                                move || {
1621                                    let attempt_times_inner = attempt_times_for_task.clone();
1622                                    async move {
1623                                        let elapsed = start.elapsed();
1624                                        attempt_times_inner.lock().expect(MUTEX_POISONED).push(elapsed);
1625                                        Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1626                                    }
1627                                },
1628                                |e: &TestError| matches!(e, TestError::Retryable(_)),
1629                                TestError::Timeout,
1630                            )
1631                            .await;
1632                    }
1633                });
1634
1635                yield_until(|| !attempt_times_for_wait.lock().expect(MUTEX_POISONED).is_empty()).await;
1636                advance_until(|| attempt_times_for_wait.lock().expect(MUTEX_POISONED).len() >= 2).await;
1637                advance_until(|| attempt_times_for_wait.lock().expect(MUTEX_POISONED).len() >= 3).await;
1638
1639                handle.await.unwrap();
1640            });
1641
1642            let times = attempt_times.lock().expect(MUTEX_POISONED);
1643            prop_assert!(times.len() >= 2);
1644
1645            if immediate_first {
1646                // First retry should be immediate
1647                prop_assert!(times[1].as_millis() < 20,
1648                    "With immediate_first=true, first retry took {}ms",
1649                    times[1].as_millis());
1650            } else {
1651                // First retry should have delay
1652                prop_assert!(times[1].as_millis() >= (initial_delay_ms - 1) as u128,
1653                    "With immediate_first=false, first retry was too fast: {}ms",
1654                    times[1].as_millis());
1655            }
1656        }
1657
1658        #[rstest]
1659        fn test_non_retryable_stops_immediately(
1660            attempt_before_non_retryable in 0usize..3,
1661            max_retries in 3u32..5,
1662        ) {
1663            let rt = tokio::runtime::Builder::new_current_thread()
1664                .enable_time()
1665                .build()
1666                .unwrap();
1667
1668            let config = RetryConfig {
1669                max_retries,
1670                initial_delay_ms: 10,
1671                max_delay_ms: 100,
1672                backoff_factor: 2.0,
1673                jitter_ms: 0,
1674                operation_timeout_ms: None,
1675                immediate_first: false,
1676                max_elapsed_ms: None,
1677            };
1678
1679            let manager = RetryManager::new(config);
1680            let attempt_counter = Arc::new(AtomicU32::new(0));
1681            let counter_clone = attempt_counter.clone();
1682
1683            let result: Result<i32, TestError> = rt.block_on(manager.execute_with_retry(
1684                "non_retryable_test",
1685                move || {
1686                    let counter = counter_clone.clone();
1687                    async move {
1688                        let attempts = counter.fetch_add(1, Ordering::SeqCst) as usize;
1689                        if attempts == attempt_before_non_retryable {
1690                            Err(TestError::NonRetryable("stop".to_string()))
1691                        } else {
1692                            Err(TestError::Retryable("retry".to_string()))
1693                        }
1694                    }
1695                },
1696                |e: &TestError| matches!(e, TestError::Retryable(_)),
1697                TestError::Timeout,
1698            ));
1699
1700            let attempts = attempt_counter.load(Ordering::SeqCst) as usize;
1701
1702            prop_assert!(result.is_err());
1703            prop_assert!(matches!(result.unwrap_err(), TestError::NonRetryable(_)));
1704            // Should stop exactly when non-retryable error occurs
1705            prop_assert_eq!(attempts, attempt_before_non_retryable + 1);
1706        }
1707
1708        #[rstest]
1709        fn test_cancellation_stops_immediately(
1710            cancel_after_ms in 10u64..100,
1711            initial_delay_ms in 200u64..500,
1712        ) {
1713            use tokio_util::sync::CancellationToken;
1714
1715            let rt = tokio::runtime::Builder::new_current_thread()
1716                .enable_time()
1717                .start_paused(true)
1718                .build()
1719                .unwrap();
1720
1721            let config = RetryConfig {
1722                max_retries: 10,
1723                initial_delay_ms,
1724                max_delay_ms: initial_delay_ms * 2,
1725                backoff_factor: 2.0,
1726                jitter_ms: 0,
1727                operation_timeout_ms: None,
1728                immediate_first: false,
1729                max_elapsed_ms: None,
1730            };
1731
1732            let manager = RetryManager::new(config);
1733            let token = CancellationToken::new();
1734            let token_clone = token.clone();
1735
1736            let result: Result<i32, TestError> = rt.block_on(async {
1737                // Spawn cancellation task
1738                tokio::spawn(async move {
1739                    tokio::time::sleep(Duration::from_millis(cancel_after_ms)).await;
1740                    token_clone.cancel();
1741                });
1742
1743                let operation_future = manager.execute_with_retry_with_cancel(
1744                    "cancellation_test",
1745                    || async {
1746                        Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1747                    },
1748                    |e: &TestError| matches!(e, TestError::Retryable(_)),
1749                    create_test_error,
1750                    &token,
1751                );
1752
1753                // Advance time to trigger cancellation
1754                tokio::time::advance(Duration::from_millis(cancel_after_ms + 10)).await;
1755                operation_future.await
1756            });
1757
1758            // Should be canceled
1759            prop_assert!(result.is_err());
1760            let error_msg = format!("{}", result.unwrap_err());
1761            prop_assert!(error_msg.contains("canceled"));
1762        }
1763
1764        #[rstest]
1765        fn test_budget_clamp_prevents_overshoot(
1766            max_elapsed_ms in 10u64..30,
1767            delay_per_retry in 20u64..50,
1768        ) {
1769            let rt = tokio::runtime::Builder::new_current_thread()
1770                .enable_time()
1771                .start_paused(true)
1772                .build()
1773                .unwrap();
1774
1775            // Configure so that first retry delay would exceed budget
1776            let config = RetryConfig {
1777                max_retries: 5,
1778                initial_delay_ms: delay_per_retry,
1779                max_delay_ms: delay_per_retry * 2,
1780                backoff_factor: 1.0,
1781                jitter_ms: 0,
1782                operation_timeout_ms: None,
1783                immediate_first: false,
1784                max_elapsed_ms: Some(max_elapsed_ms),
1785            };
1786
1787            let manager = RetryManager::new(config);
1788
1789            let _result = rt.block_on(async {
1790                let operation_future = manager.execute_with_retry(
1791                    "budget_clamp_test",
1792                    || async {
1793                        // Fast operation to focus on delay timing
1794                        Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1795                    },
1796                    |e: &TestError| matches!(e, TestError::Retryable(_)),
1797                    create_test_error,
1798                );
1799
1800                // Advance time past max_elapsed_ms
1801                tokio::time::advance(Duration::from_millis(max_elapsed_ms + delay_per_retry)).await;
1802                operation_future.await
1803            });
1804
1805            // With deterministic time, operation completes without wall-clock delay
1806            // The budget constraint is still enforced by the retry manager
1807        }
1808
1809        #[rstest]
1810        fn test_success_on_kth_attempt(
1811            k in 1usize..5,
1812            initial_delay_ms in 5u64..20,
1813        ) {
1814            let rt = tokio::runtime::Builder::new_current_thread()
1815                .enable_time()
1816                .start_paused(true)
1817                .build()
1818                .unwrap();
1819
1820            let config = RetryConfig {
1821                max_retries: 10, // More than k
1822                initial_delay_ms,
1823                max_delay_ms: initial_delay_ms * 4,
1824                backoff_factor: 2.0,
1825                jitter_ms: 0,
1826                operation_timeout_ms: None,
1827                immediate_first: false,
1828                max_elapsed_ms: None,
1829            };
1830
1831            let manager = RetryManager::new(config);
1832            let attempt_counter = Arc::new(AtomicU32::new(0));
1833            let counter_clone = attempt_counter.clone();
1834            let target_k = k;
1835
1836            let (result, _elapsed) = rt.block_on(async {
1837                let start = tokio::time::Instant::now();
1838
1839                let operation_future = manager.execute_with_retry(
1840                    "kth_attempt_test",
1841                    move || {
1842                        let counter = counter_clone.clone();
1843                        async move {
1844                            let attempt = counter.fetch_add(1, Ordering::SeqCst) as usize;
1845                            if attempt + 1 == target_k {
1846                                Ok(42)
1847                            } else {
1848                                Err(TestError::Retryable("retry".to_string()))
1849                            }
1850                        }
1851                    },
1852                    |e: &TestError| matches!(e, TestError::Retryable(_)),
1853                    create_test_error,
1854                );
1855
1856                // Advance time to allow enough retries
1857                for _ in 0..k {
1858                    tokio::time::advance(Duration::from_millis(initial_delay_ms * 4)).await;
1859                }
1860
1861                let result = operation_future.await;
1862                let elapsed = start.elapsed();
1863
1864                (result, elapsed)
1865            });
1866
1867            let attempts = attempt_counter.load(Ordering::SeqCst) as usize;
1868
1869            // Using paused Tokio time (start_paused + advance); assert behavior only (no wall-clock timing)
1870            prop_assert!(result.is_ok());
1871            prop_assert_eq!(result.unwrap(), 42);
1872            prop_assert_eq!(attempts, k);
1873        }
1874    }
1875}