nautilus_network/
retry.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Generic retry mechanism for network operations.
17
18use std::{future::Future, marker::PhantomData, time::Duration};
19
20use tokio_util::sync::CancellationToken;
21
22use crate::backoff::ExponentialBackoff;
23
24/// Configuration for retry behavior.
25#[derive(Debug, Clone)]
26pub struct RetryConfig {
27    /// Maximum number of retry attempts (total attempts = 1 initial + `max_retries`).
28    pub max_retries: u32,
29    /// Initial delay between retries in milliseconds.
30    pub initial_delay_ms: u64,
31    /// Maximum delay between retries in milliseconds.
32    pub max_delay_ms: u64,
33    /// Backoff multiplier factor.
34    pub backoff_factor: f64,
35    /// Maximum jitter in milliseconds to add to delays.
36    pub jitter_ms: u64,
37    /// Optional timeout for individual operations in milliseconds.
38    /// If None, no timeout is applied.
39    pub operation_timeout_ms: Option<u64>,
40    /// Whether the first retry should happen immediately without delay.
41    /// Should be false for HTTP/order operations, true for connection operations.
42    pub immediate_first: bool,
43    /// Optional maximum total elapsed time for all retries in milliseconds.
44    /// If exceeded, retries stop even if `max_retries` hasn't been reached.
45    pub max_elapsed_ms: Option<u64>,
46}
47
48impl Default for RetryConfig {
49    fn default() -> Self {
50        Self {
51            max_retries: 3,
52            initial_delay_ms: 1_000,
53            max_delay_ms: 10_000,
54            backoff_factor: 2.0,
55            jitter_ms: 100,
56            operation_timeout_ms: Some(30_000),
57            immediate_first: false,
58            max_elapsed_ms: None,
59        }
60    }
61}
62
63/// Generic retry manager for network operations.
64///
65/// Stateless and thread-safe - each operation maintains its own backoff state.
66#[derive(Clone, Debug)]
67pub struct RetryManager<E> {
68    config: RetryConfig,
69    _phantom: PhantomData<E>,
70}
71
72impl<E> RetryManager<E>
73where
74    E: std::error::Error,
75{
76    /// Creates a new retry manager with the given configuration.
77    pub const fn new(config: RetryConfig) -> Self {
78        Self {
79            config,
80            _phantom: PhantomData,
81        }
82    }
83
84    /// Formats a retry budget exceeded error message with attempt context.
85    #[inline(always)]
86    fn budget_exceeded_msg(&self, attempt: u32) -> String {
87        format!(
88            "Retry budget exceeded ({}/{})",
89            attempt.saturating_add(1),
90            self.config.max_retries.saturating_add(1)
91        )
92    }
93
94    /// Executes an operation with retry logic and optional cancellation.
95    ///
96    /// Cancellation is checked at three points:
97    /// (1) Before each operation attempt.
98    /// (2) During operation execution (via `tokio::select!`).
99    /// (3) During retry delays.
100    ///
101    /// This means cancellation may be delayed by up to one operation timeout if it occurs mid-execution.
102    ///
103    /// # Errors
104    ///
105    /// Returns an error if the operation fails after exhausting all retries,
106    /// if the operation times out, if creating the backoff state fails, or if canceled.
107    pub async fn execute_with_retry_inner<F, Fut, T>(
108        &self,
109        operation_name: &str,
110        mut operation: F,
111        should_retry: impl Fn(&E) -> bool,
112        create_error: impl Fn(String) -> E,
113        cancel: Option<&CancellationToken>,
114    ) -> Result<T, E>
115    where
116        F: FnMut() -> Fut,
117        Fut: Future<Output = Result<T, E>>,
118    {
119        let mut backoff = ExponentialBackoff::new(
120            Duration::from_millis(self.config.initial_delay_ms),
121            Duration::from_millis(self.config.max_delay_ms),
122            self.config.backoff_factor,
123            self.config.jitter_ms,
124            self.config.immediate_first,
125        )
126        .map_err(|e| create_error(format!("Invalid configuration: {e}")))?;
127
128        let mut attempt = 0;
129        let start_time = tokio::time::Instant::now();
130
131        loop {
132            if let Some(token) = cancel
133                && token.is_cancelled()
134            {
135                tracing::debug!(
136                    operation = %operation_name,
137                    attempts = attempt,
138                    "Operation canceled"
139                );
140                return Err(create_error("canceled".to_string()));
141            }
142
143            if let Some(max_elapsed_ms) = self.config.max_elapsed_ms {
144                let elapsed = start_time.elapsed();
145                if elapsed.as_millis() >= u128::from(max_elapsed_ms) {
146                    return Err(create_error(self.budget_exceeded_msg(attempt)));
147                }
148            }
149
150            let result = match (self.config.operation_timeout_ms, cancel) {
151                (Some(timeout_ms), Some(token)) => {
152                    tokio::select! {
153                        result = tokio::time::timeout(Duration::from_millis(timeout_ms), operation()) => result,
154                        () = token.cancelled() => {
155                            tracing::debug!(
156                                operation = %operation_name,
157                                "Operation canceled during execution"
158                            );
159                            return Err(create_error("canceled".to_string()));
160                        }
161                    }
162                }
163                (Some(timeout_ms), None) => {
164                    tokio::time::timeout(Duration::from_millis(timeout_ms), operation()).await
165                }
166                (None, Some(token)) => {
167                    tokio::select! {
168                        result = operation() => Ok(result),
169                        () = token.cancelled() => {
170                            tracing::debug!(
171                                operation = %operation_name,
172                                "Operation canceled during execution"
173                            );
174                            return Err(create_error("canceled".to_string()));
175                        }
176                    }
177                }
178                (None, None) => Ok(operation().await),
179            };
180
181            match result {
182                Ok(Ok(success)) => {
183                    if attempt > 0 {
184                        tracing::trace!(
185                            operation = %operation_name,
186                            attempts = attempt + 1,
187                            "Retry succeeded"
188                        );
189                    }
190                    return Ok(success);
191                }
192                Ok(Err(e)) => {
193                    if !should_retry(&e) {
194                        tracing::trace!(
195                            operation = %operation_name,
196                            error = %e,
197                            "Non-retryable error"
198                        );
199                        return Err(e);
200                    }
201
202                    if attempt >= self.config.max_retries {
203                        tracing::trace!(
204                            operation = %operation_name,
205                            attempts = attempt + 1,
206                            error = %e,
207                            "Retries exhausted"
208                        );
209                        return Err(e);
210                    }
211
212                    let mut delay = backoff.next_duration();
213
214                    if let Some(max_elapsed_ms) = self.config.max_elapsed_ms {
215                        let elapsed = start_time.elapsed();
216                        let remaining =
217                            Duration::from_millis(max_elapsed_ms).saturating_sub(elapsed);
218
219                        if remaining.is_zero() {
220                            return Err(create_error(self.budget_exceeded_msg(attempt)));
221                        }
222
223                        delay = delay.min(remaining);
224                    }
225
226                    tracing::trace!(
227                        operation = %operation_name,
228                        attempt = attempt + 1,
229                        delay_ms = delay.as_millis() as u64,
230                        error = %e,
231                        "Retrying after failure"
232                    );
233
234                    // Yield even on zero-delay to avoid busy-wait loop
235                    if delay.is_zero() {
236                        tokio::task::yield_now().await;
237                        attempt += 1;
238                        continue;
239                    }
240
241                    if let Some(token) = cancel {
242                        tokio::select! {
243                            () = tokio::time::sleep(delay) => {},
244                            () = token.cancelled() => {
245                                tracing::debug!(
246                                    operation = %operation_name,
247                                    attempt = attempt + 1,
248                                    "Operation canceled during retry delay"
249                                );
250                                return Err(create_error("canceled".to_string()));
251                            }
252                        }
253                    } else {
254                        tokio::time::sleep(delay).await;
255                    }
256                    attempt += 1;
257                }
258                Err(_) => {
259                    let e = create_error(format!(
260                        "Timed out after {}ms",
261                        self.config.operation_timeout_ms.unwrap_or(0)
262                    ));
263
264                    if !should_retry(&e) {
265                        tracing::trace!(
266                            operation = %operation_name,
267                            error = %e,
268                            "Non-retryable timeout"
269                        );
270                        return Err(e);
271                    }
272
273                    if attempt >= self.config.max_retries {
274                        tracing::trace!(
275                            operation = %operation_name,
276                            attempts = attempt + 1,
277                            error = %e,
278                            "Retries exhausted after timeout"
279                        );
280                        return Err(e);
281                    }
282
283                    let mut delay = backoff.next_duration();
284
285                    if let Some(max_elapsed_ms) = self.config.max_elapsed_ms {
286                        let elapsed = start_time.elapsed();
287                        let remaining =
288                            Duration::from_millis(max_elapsed_ms).saturating_sub(elapsed);
289
290                        if remaining.is_zero() {
291                            return Err(create_error(self.budget_exceeded_msg(attempt)));
292                        }
293
294                        delay = delay.min(remaining);
295                    }
296
297                    tracing::trace!(
298                        operation = %operation_name,
299                        attempt = attempt + 1,
300                        delay_ms = delay.as_millis() as u64,
301                        error = %e,
302                        "Retrying after timeout"
303                    );
304
305                    // Yield even on zero-delay to avoid busy-wait loop
306                    if delay.is_zero() {
307                        tokio::task::yield_now().await;
308                        attempt += 1;
309                        continue;
310                    }
311
312                    if let Some(token) = cancel {
313                        tokio::select! {
314                            () = tokio::time::sleep(delay) => {},
315                            () = token.cancelled() => {
316                                tracing::debug!(
317                                    operation = %operation_name,
318                                    attempt = attempt + 1,
319                                    "Operation canceled during retry delay"
320                                );
321                                return Err(create_error("canceled".to_string()));
322                            }
323                        }
324                    } else {
325                        tokio::time::sleep(delay).await;
326                    }
327                    attempt += 1;
328                }
329            }
330        }
331    }
332
333    /// Executes an operation with retry logic.
334    ///
335    /// # Errors
336    ///
337    /// Returns an error if the operation fails after exhausting all retries,
338    /// if the operation times out, or if creating the backoff state fails.
339    pub async fn execute_with_retry<F, Fut, T>(
340        &self,
341        operation_name: &str,
342        operation: F,
343        should_retry: impl Fn(&E) -> bool,
344        create_error: impl Fn(String) -> E,
345    ) -> Result<T, E>
346    where
347        F: FnMut() -> Fut,
348        Fut: Future<Output = Result<T, E>>,
349    {
350        self.execute_with_retry_inner(operation_name, operation, should_retry, create_error, None)
351            .await
352    }
353
354    /// Executes an operation with retry logic and cancellation support.
355    ///
356    /// # Errors
357    ///
358    /// Returns an error if the operation fails after exhausting all retries,
359    /// if the operation times out, if creating the backoff state fails, or if canceled.
360    pub async fn execute_with_retry_with_cancel<F, Fut, T>(
361        &self,
362        operation_name: &str,
363        operation: F,
364        should_retry: impl Fn(&E) -> bool,
365        create_error: impl Fn(String) -> E,
366        cancellation_token: &CancellationToken,
367    ) -> Result<T, E>
368    where
369        F: FnMut() -> Fut,
370        Fut: Future<Output = Result<T, E>>,
371    {
372        self.execute_with_retry_inner(
373            operation_name,
374            operation,
375            should_retry,
376            create_error,
377            Some(cancellation_token),
378        )
379        .await
380    }
381}
382
383/// Convenience function to create a retry manager with default configuration.
384pub fn create_default_retry_manager<E>() -> RetryManager<E>
385where
386    E: std::error::Error,
387{
388    RetryManager::new(RetryConfig::default())
389}
390
391/// Convenience function to create a retry manager for HTTP operations.
392pub const fn create_http_retry_manager<E>() -> RetryManager<E>
393where
394    E: std::error::Error,
395{
396    let config = RetryConfig {
397        max_retries: 3,
398        initial_delay_ms: 1_000,
399        max_delay_ms: 10_000,
400        backoff_factor: 2.0,
401        jitter_ms: 1_000,
402        operation_timeout_ms: Some(60_000), // 60s for HTTP requests
403        immediate_first: false,
404        max_elapsed_ms: Some(180_000), // 3 minutes total budget
405    };
406    RetryManager::new(config)
407}
408
409/// Convenience function to create a retry manager for WebSocket operations.
410pub const fn create_websocket_retry_manager<E>() -> RetryManager<E>
411where
412    E: std::error::Error,
413{
414    let config = RetryConfig {
415        max_retries: 5,
416        initial_delay_ms: 1_000,
417        max_delay_ms: 10_000,
418        backoff_factor: 2.0,
419        jitter_ms: 1_000,
420        operation_timeout_ms: Some(30_000), // 30s for WebSocket operations
421        immediate_first: true,
422        max_elapsed_ms: Some(120_000), // 2 minutes total budget
423    };
424    RetryManager::new(config)
425}
426
427////////////////////////////////////////////////////////////////////////////////
428// Tests
429////////////////////////////////////////////////////////////////////////////////
430
431#[cfg(test)]
432mod test_utils {
433    #[derive(Debug, thiserror::Error)]
434    pub enum TestError {
435        #[error("Retryable error: {0}")]
436        Retryable(String),
437        #[error("Non-retryable error: {0}")]
438        NonRetryable(String),
439        #[error("Timeout error: {0}")]
440        Timeout(String),
441    }
442
443    pub fn should_retry_test_error(error: &TestError) -> bool {
444        matches!(error, TestError::Retryable(_))
445    }
446
447    pub fn create_test_error(msg: String) -> TestError {
448        TestError::Timeout(msg)
449    }
450}
451
452#[cfg(test)]
453mod tests {
454    use std::sync::{
455        Arc,
456        atomic::{AtomicU32, Ordering},
457    };
458
459    use nautilus_core::MUTEX_POISONED;
460    use rstest::rstest;
461
462    use super::{test_utils::*, *};
463
464    const MAX_WAIT_ITERS: usize = 10_000;
465    const MAX_ADVANCE_ITERS: usize = 10_000;
466
467    pub(crate) async fn yield_until<F>(mut condition: F)
468    where
469        F: FnMut() -> bool,
470    {
471        for _ in 0..MAX_WAIT_ITERS {
472            if condition() {
473                return;
474            }
475            tokio::task::yield_now().await;
476        }
477
478        panic!("yield_until timed out waiting for condition");
479    }
480
481    pub(crate) async fn advance_until<F>(mut condition: F)
482    where
483        F: FnMut() -> bool,
484    {
485        for _ in 0..MAX_ADVANCE_ITERS {
486            if condition() {
487                return;
488            }
489            tokio::time::advance(Duration::from_millis(1)).await;
490            tokio::task::yield_now().await;
491        }
492
493        panic!("advance_until timed out waiting for condition");
494    }
495
496    #[rstest]
497    fn test_retry_config_default() {
498        let config = RetryConfig::default();
499        assert_eq!(config.max_retries, 3);
500        assert_eq!(config.initial_delay_ms, 1_000);
501        assert_eq!(config.max_delay_ms, 10_000);
502        assert_eq!(config.backoff_factor, 2.0);
503        assert_eq!(config.jitter_ms, 100);
504        assert_eq!(config.operation_timeout_ms, Some(30_000));
505        assert!(!config.immediate_first);
506        assert_eq!(config.max_elapsed_ms, None);
507    }
508
509    #[tokio::test]
510    async fn test_retry_manager_success_first_attempt() {
511        let manager = RetryManager::new(RetryConfig::default());
512
513        let result = manager
514            .execute_with_retry(
515                "test_operation",
516                || async { Ok::<i32, TestError>(42) },
517                should_retry_test_error,
518                create_test_error,
519            )
520            .await;
521
522        assert_eq!(result.unwrap(), 42);
523    }
524
525    #[tokio::test]
526    async fn test_retry_manager_non_retryable_error() {
527        let manager = RetryManager::new(RetryConfig::default());
528
529        let result = manager
530            .execute_with_retry(
531                "test_operation",
532                || async { Err::<i32, TestError>(TestError::NonRetryable("test".to_string())) },
533                should_retry_test_error,
534                create_test_error,
535            )
536            .await;
537
538        assert!(result.is_err());
539        assert!(matches!(result.unwrap_err(), TestError::NonRetryable(_)));
540    }
541
542    #[tokio::test]
543    async fn test_retry_manager_retryable_error_exhausted() {
544        let config = RetryConfig {
545            max_retries: 2,
546            initial_delay_ms: 10,
547            max_delay_ms: 50,
548            backoff_factor: 2.0,
549            jitter_ms: 0,
550            operation_timeout_ms: None,
551            immediate_first: false,
552            max_elapsed_ms: None,
553        };
554        let manager = RetryManager::new(config);
555
556        let result = manager
557            .execute_with_retry(
558                "test_operation",
559                || async { Err::<i32, TestError>(TestError::Retryable("test".to_string())) },
560                should_retry_test_error,
561                create_test_error,
562            )
563            .await;
564
565        assert!(result.is_err());
566        assert!(matches!(result.unwrap_err(), TestError::Retryable(_)));
567    }
568
569    #[tokio::test]
570    async fn test_timeout_path() {
571        let config = RetryConfig {
572            max_retries: 2,
573            initial_delay_ms: 10,
574            max_delay_ms: 50,
575            backoff_factor: 2.0,
576            jitter_ms: 0,
577            operation_timeout_ms: Some(50),
578            immediate_first: false,
579            max_elapsed_ms: None,
580        };
581        let manager = RetryManager::new(config);
582
583        let result = manager
584            .execute_with_retry(
585                "test_timeout",
586                || async {
587                    tokio::time::sleep(Duration::from_millis(100)).await;
588                    Ok::<i32, TestError>(42)
589                },
590                should_retry_test_error,
591                create_test_error,
592            )
593            .await;
594
595        assert!(result.is_err());
596        assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
597    }
598
599    #[tokio::test]
600    async fn test_max_elapsed_time_budget() {
601        let config = RetryConfig {
602            max_retries: 10,
603            initial_delay_ms: 50,
604            max_delay_ms: 100,
605            backoff_factor: 2.0,
606            jitter_ms: 0,
607            operation_timeout_ms: None,
608            immediate_first: false,
609            max_elapsed_ms: Some(200),
610        };
611        let manager = RetryManager::new(config);
612
613        let start = tokio::time::Instant::now();
614        let result = manager
615            .execute_with_retry(
616                "test_budget",
617                || async { Err::<i32, TestError>(TestError::Retryable("test".to_string())) },
618                should_retry_test_error,
619                create_test_error,
620            )
621            .await;
622
623        let elapsed = start.elapsed();
624        assert!(result.is_err());
625        assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
626        assert!(elapsed.as_millis() >= 150);
627        assert!(elapsed.as_millis() < 1000);
628    }
629
630    #[tokio::test]
631    async fn test_budget_exceeded_message_format() {
632        let config = RetryConfig {
633            max_retries: 5,
634            initial_delay_ms: 10,
635            max_delay_ms: 20,
636            backoff_factor: 1.0,
637            jitter_ms: 0,
638            operation_timeout_ms: None,
639            immediate_first: false,
640            max_elapsed_ms: Some(35),
641        };
642        let manager = RetryManager::new(config);
643
644        let result = manager
645            .execute_with_retry(
646                "test_budget_msg",
647                || async { Err::<i32, TestError>(TestError::Retryable("test".to_string())) },
648                should_retry_test_error,
649                create_test_error,
650            )
651            .await;
652
653        assert!(result.is_err());
654        let error_msg = result.unwrap_err().to_string();
655
656        assert!(error_msg.contains("Retry budget exceeded"));
657        assert!(error_msg.contains("/6)"));
658
659        if let Some(captures) = error_msg.strip_prefix("Timeout error: Retry budget exceeded (")
660            && let Some(nums) = captures.strip_suffix(")")
661        {
662            let parts: Vec<&str> = nums.split('/').collect();
663            assert_eq!(parts.len(), 2);
664            let current: u32 = parts[0].parse().unwrap();
665            let total: u32 = parts[1].parse().unwrap();
666
667            assert_eq!(total, 6, "Total should be max_retries + 1");
668            assert!(current <= total, "Current attempt should not exceed total");
669            assert!(current >= 1, "Current attempt should be at least 1");
670        }
671    }
672
673    #[tokio::test(start_paused = true)]
674    async fn test_budget_exceeded_edge_cases() {
675        let config = RetryConfig {
676            max_retries: 2,
677            initial_delay_ms: 50,
678            max_delay_ms: 100,
679            backoff_factor: 1.0,
680            jitter_ms: 0,
681            operation_timeout_ms: None,
682            immediate_first: false,
683            max_elapsed_ms: Some(100),
684        };
685        let manager = RetryManager::new(config);
686
687        let attempt_count = Arc::new(AtomicU32::new(0));
688        let count_clone = attempt_count.clone();
689
690        let handle = tokio::spawn(async move {
691            manager
692                .execute_with_retry(
693                    "test_first_attempt",
694                    move || {
695                        let count = count_clone.clone();
696                        async move {
697                            count.fetch_add(1, Ordering::SeqCst);
698                            Err::<i32, TestError>(TestError::Retryable("test".to_string()))
699                        }
700                    },
701                    should_retry_test_error,
702                    create_test_error,
703                )
704                .await
705        });
706
707        // Wait for first attempt
708        yield_until(|| attempt_count.load(Ordering::SeqCst) >= 1).await;
709
710        // Advance past budget to trigger check at loop start before second attempt
711        tokio::time::advance(Duration::from_millis(101)).await;
712        tokio::task::yield_now().await;
713
714        let result = handle.await.unwrap();
715        assert!(result.is_err());
716        let error_msg = result.unwrap_err().to_string();
717
718        // Budget check happens at loop start, so shows (2/3) = "starting 2nd of 3 attempts"
719        assert!(
720            error_msg.contains("(2/3)"),
721            "Expected (2/3) but got: {error_msg}"
722        );
723    }
724
725    #[tokio::test]
726    async fn test_budget_exceeded_no_overflow() {
727        let config = RetryConfig {
728            max_retries: u32::MAX,
729            initial_delay_ms: 10,
730            max_delay_ms: 20,
731            backoff_factor: 1.0,
732            jitter_ms: 0,
733            operation_timeout_ms: None,
734            immediate_first: false,
735            max_elapsed_ms: Some(1),
736        };
737        let manager = RetryManager::new(config);
738
739        let result = manager
740            .execute_with_retry(
741                "test_overflow",
742                || async { Err::<i32, TestError>(TestError::Retryable("test".to_string())) },
743                should_retry_test_error,
744                create_test_error,
745            )
746            .await;
747
748        assert!(result.is_err());
749        let error_msg = result.unwrap_err().to_string();
750
751        // Should saturate at u32::MAX instead of wrapping to 0
752        assert!(error_msg.contains("Retry budget exceeded"));
753        assert!(error_msg.contains(&format!("/{}", u32::MAX)));
754    }
755
756    #[rstest]
757    fn test_http_retry_manager_config() {
758        let manager = create_http_retry_manager::<TestError>();
759        assert_eq!(manager.config.max_retries, 3);
760        assert!(!manager.config.immediate_first);
761        assert_eq!(manager.config.max_elapsed_ms, Some(180_000));
762    }
763
764    #[rstest]
765    fn test_websocket_retry_manager_config() {
766        let manager = create_websocket_retry_manager::<TestError>();
767        assert_eq!(manager.config.max_retries, 5);
768        assert!(manager.config.immediate_first);
769        assert_eq!(manager.config.max_elapsed_ms, Some(120_000));
770    }
771
772    #[tokio::test]
773    async fn test_timeout_respects_retry_predicate() {
774        let config = RetryConfig {
775            max_retries: 3,
776            initial_delay_ms: 10,
777            max_delay_ms: 50,
778            backoff_factor: 2.0,
779            jitter_ms: 0,
780            operation_timeout_ms: Some(50),
781            immediate_first: false,
782            max_elapsed_ms: None,
783        };
784        let manager = RetryManager::new(config);
785
786        // Test with retry predicate that rejects timeouts
787        let should_not_retry_timeouts = |error: &TestError| !matches!(error, TestError::Timeout(_));
788
789        let result = manager
790            .execute_with_retry(
791                "test_timeout_non_retryable",
792                || async {
793                    tokio::time::sleep(Duration::from_millis(100)).await;
794                    Ok::<i32, TestError>(42)
795                },
796                should_not_retry_timeouts,
797                create_test_error,
798            )
799            .await;
800
801        // Should fail immediately without retries since timeout is non-retryable
802        assert!(result.is_err());
803        assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
804    }
805
806    #[tokio::test]
807    async fn test_timeout_retries_when_predicate_allows() {
808        let config = RetryConfig {
809            max_retries: 2,
810            initial_delay_ms: 10,
811            max_delay_ms: 50,
812            backoff_factor: 2.0,
813            jitter_ms: 0,
814            operation_timeout_ms: Some(50),
815            immediate_first: false,
816            max_elapsed_ms: None,
817        };
818        let manager = RetryManager::new(config);
819
820        // Test with retry predicate that allows timeouts
821        let should_retry_timeouts = |error: &TestError| matches!(error, TestError::Timeout(_));
822
823        let start = tokio::time::Instant::now();
824        let result = manager
825            .execute_with_retry(
826                "test_timeout_retryable",
827                || async {
828                    tokio::time::sleep(Duration::from_millis(100)).await;
829                    Ok::<i32, TestError>(42)
830                },
831                should_retry_timeouts,
832                create_test_error,
833            )
834            .await;
835
836        let elapsed = start.elapsed();
837
838        // Should fail after retries (not immediately)
839        assert!(result.is_err());
840        assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
841        // Should have taken time for retries (at least 2 timeouts + delays)
842        assert!(elapsed.as_millis() > 80); // More than just one timeout
843    }
844
845    #[tokio::test]
846    async fn test_successful_retry_after_failures() {
847        let config = RetryConfig {
848            max_retries: 3,
849            initial_delay_ms: 10,
850            max_delay_ms: 50,
851            backoff_factor: 2.0,
852            jitter_ms: 0,
853            operation_timeout_ms: None,
854            immediate_first: false,
855            max_elapsed_ms: None,
856        };
857        let manager = RetryManager::new(config);
858
859        let attempt_counter = Arc::new(AtomicU32::new(0));
860        let counter_clone = attempt_counter.clone();
861
862        let result = manager
863            .execute_with_retry(
864                "test_eventual_success",
865                move || {
866                    let counter = counter_clone.clone();
867                    async move {
868                        let attempts = counter.fetch_add(1, Ordering::SeqCst);
869                        if attempts < 2 {
870                            Err(TestError::Retryable("temporary failure".to_string()))
871                        } else {
872                            Ok(42)
873                        }
874                    }
875                },
876                should_retry_test_error,
877                create_test_error,
878            )
879            .await;
880
881        assert_eq!(result.unwrap(), 42);
882        assert_eq!(attempt_counter.load(Ordering::SeqCst), 3);
883    }
884
885    #[tokio::test(start_paused = true)]
886    async fn test_immediate_first_retry() {
887        let config = RetryConfig {
888            max_retries: 2,
889            initial_delay_ms: 100,
890            max_delay_ms: 200,
891            backoff_factor: 2.0,
892            jitter_ms: 0,
893            operation_timeout_ms: None,
894            immediate_first: true,
895            max_elapsed_ms: None,
896        };
897        let manager = RetryManager::new(config);
898
899        let attempt_times = Arc::new(std::sync::Mutex::new(Vec::new()));
900        let times_clone = attempt_times.clone();
901        let start = tokio::time::Instant::now();
902
903        let handle = tokio::spawn({
904            let times_clone = times_clone.clone();
905            async move {
906                let _ = manager
907                    .execute_with_retry(
908                        "test_immediate",
909                        move || {
910                            let times = times_clone.clone();
911                            async move {
912                                times.lock().expect(MUTEX_POISONED).push(start.elapsed());
913                                Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
914                            }
915                        },
916                        should_retry_test_error,
917                        create_test_error,
918                    )
919                    .await;
920            }
921        });
922
923        // Allow initial attempt and immediate retry to run without advancing time
924        yield_until(|| attempt_times.lock().expect(MUTEX_POISONED).len() >= 2).await;
925
926        // Advance time for the next backoff interval
927        tokio::time::advance(Duration::from_millis(100)).await;
928        tokio::task::yield_now().await;
929
930        // Wait for the final retry to be recorded
931        yield_until(|| attempt_times.lock().expect(MUTEX_POISONED).len() >= 3).await;
932
933        handle.await.unwrap();
934
935        let times = attempt_times.lock().expect(MUTEX_POISONED);
936        assert_eq!(times.len(), 3); // Initial + 2 retries
937
938        // First retry should be immediate (within 1ms tolerance)
939        assert!(times[1] <= Duration::from_millis(1));
940        // Second retry should have backoff delay (at least 100ms from start)
941        assert!(times[2] >= Duration::from_millis(100));
942        assert!(times[2] <= Duration::from_millis(110));
943    }
944
945    #[tokio::test]
946    async fn test_operation_without_timeout() {
947        let config = RetryConfig {
948            max_retries: 2,
949            initial_delay_ms: 10,
950            max_delay_ms: 50,
951            backoff_factor: 2.0,
952            jitter_ms: 0,
953            operation_timeout_ms: None, // No timeout
954            immediate_first: false,
955            max_elapsed_ms: None,
956        };
957        let manager = RetryManager::new(config);
958
959        let start = tokio::time::Instant::now();
960        let result = manager
961            .execute_with_retry(
962                "test_no_timeout",
963                || async {
964                    tokio::time::sleep(Duration::from_millis(50)).await;
965                    Ok::<i32, TestError>(42)
966                },
967                should_retry_test_error,
968                create_test_error,
969            )
970            .await;
971
972        let elapsed = start.elapsed();
973        assert_eq!(result.unwrap(), 42);
974        // Should complete without timing out
975        assert!(elapsed.as_millis() >= 30);
976        assert!(elapsed.as_millis() < 200);
977    }
978
979    #[tokio::test]
980    async fn test_zero_retries() {
981        let config = RetryConfig {
982            max_retries: 0,
983            initial_delay_ms: 10,
984            max_delay_ms: 50,
985            backoff_factor: 2.0,
986            jitter_ms: 0,
987            operation_timeout_ms: None,
988            immediate_first: false,
989            max_elapsed_ms: None,
990        };
991        let manager = RetryManager::new(config);
992
993        let attempt_counter = Arc::new(AtomicU32::new(0));
994        let counter_clone = attempt_counter.clone();
995
996        let result = manager
997            .execute_with_retry(
998                "test_no_retries",
999                move || {
1000                    let counter = counter_clone.clone();
1001                    async move {
1002                        counter.fetch_add(1, Ordering::SeqCst);
1003                        Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1004                    }
1005                },
1006                should_retry_test_error,
1007                create_test_error,
1008            )
1009            .await;
1010
1011        assert!(result.is_err());
1012        // Should only attempt once (no retries)
1013        assert_eq!(attempt_counter.load(Ordering::SeqCst), 1);
1014    }
1015
1016    #[tokio::test(start_paused = true)]
1017    async fn test_jitter_applied() {
1018        let config = RetryConfig {
1019            max_retries: 2,
1020            initial_delay_ms: 50,
1021            max_delay_ms: 100,
1022            backoff_factor: 2.0,
1023            jitter_ms: 50, // Significant jitter
1024            operation_timeout_ms: None,
1025            immediate_first: false,
1026            max_elapsed_ms: None,
1027        };
1028        let manager = RetryManager::new(config);
1029
1030        let delays = Arc::new(std::sync::Mutex::new(Vec::new()));
1031        let delays_clone = delays.clone();
1032        let last_time = Arc::new(std::sync::Mutex::new(tokio::time::Instant::now()));
1033        let last_time_clone = last_time.clone();
1034
1035        let handle = tokio::spawn({
1036            let delays_clone = delays_clone.clone();
1037            async move {
1038                let _ = manager
1039                    .execute_with_retry(
1040                        "test_jitter",
1041                        move || {
1042                            let delays = delays_clone.clone();
1043                            let last_time = last_time_clone.clone();
1044                            async move {
1045                                let now = tokio::time::Instant::now();
1046                                let delay = {
1047                                    let mut last = last_time.lock().expect(MUTEX_POISONED);
1048                                    let d = now.duration_since(*last);
1049                                    *last = now;
1050                                    d
1051                                };
1052                                delays.lock().expect(MUTEX_POISONED).push(delay);
1053                                Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1054                            }
1055                        },
1056                        should_retry_test_error,
1057                        create_test_error,
1058                    )
1059                    .await;
1060            }
1061        });
1062
1063        yield_until(|| !delays.lock().expect(MUTEX_POISONED).is_empty()).await;
1064        advance_until(|| delays.lock().expect(MUTEX_POISONED).len() >= 2).await;
1065        advance_until(|| delays.lock().expect(MUTEX_POISONED).len() >= 3).await;
1066
1067        handle.await.unwrap();
1068
1069        let delays = delays.lock().expect(MUTEX_POISONED);
1070        // Skip the first delay (initial attempt)
1071        for delay in delays.iter().skip(1) {
1072            // Each delay should be at least the base delay (50ms for first retry)
1073            assert!(delay.as_millis() >= 50);
1074            // But no more than base + jitter (allow small tolerance for step advance)
1075            assert!(delay.as_millis() <= 151);
1076        }
1077    }
1078
1079    #[tokio::test]
1080    async fn test_max_elapsed_stops_early() {
1081        let config = RetryConfig {
1082            max_retries: 100, // Very high retry count
1083            initial_delay_ms: 50,
1084            max_delay_ms: 100,
1085            backoff_factor: 1.5,
1086            jitter_ms: 0,
1087            operation_timeout_ms: None,
1088            immediate_first: false,
1089            max_elapsed_ms: Some(150), // Should stop after ~3 attempts
1090        };
1091        let manager = RetryManager::new(config);
1092
1093        let attempt_counter = Arc::new(AtomicU32::new(0));
1094        let counter_clone = attempt_counter.clone();
1095
1096        let start = tokio::time::Instant::now();
1097        let result = manager
1098            .execute_with_retry(
1099                "test_elapsed_limit",
1100                move || {
1101                    let counter = counter_clone.clone();
1102                    async move {
1103                        counter.fetch_add(1, Ordering::SeqCst);
1104                        Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1105                    }
1106                },
1107                should_retry_test_error,
1108                create_test_error,
1109            )
1110            .await;
1111
1112        let elapsed = start.elapsed();
1113        assert!(result.is_err());
1114        assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
1115
1116        // Should have stopped due to time limit, not retry count
1117        let attempts = attempt_counter.load(Ordering::SeqCst);
1118        assert!(attempts < 10); // Much less than max_retries
1119        assert!(elapsed.as_millis() >= 100);
1120    }
1121
1122    #[tokio::test]
1123    async fn test_mixed_errors_retry_behavior() {
1124        let config = RetryConfig {
1125            max_retries: 5,
1126            initial_delay_ms: 10,
1127            max_delay_ms: 50,
1128            backoff_factor: 2.0,
1129            jitter_ms: 0,
1130            operation_timeout_ms: None,
1131            immediate_first: false,
1132            max_elapsed_ms: None,
1133        };
1134        let manager = RetryManager::new(config);
1135
1136        let attempt_counter = Arc::new(AtomicU32::new(0));
1137        let counter_clone = attempt_counter.clone();
1138
1139        let result = manager
1140            .execute_with_retry(
1141                "test_mixed_errors",
1142                move || {
1143                    let counter = counter_clone.clone();
1144                    async move {
1145                        let attempts = counter.fetch_add(1, Ordering::SeqCst);
1146                        match attempts {
1147                            0 => Err(TestError::Retryable("retry 1".to_string())),
1148                            1 => Err(TestError::Retryable("retry 2".to_string())),
1149                            2 => Err(TestError::NonRetryable("stop here".to_string())),
1150                            _ => Ok(42),
1151                        }
1152                    }
1153                },
1154                should_retry_test_error,
1155                create_test_error,
1156            )
1157            .await;
1158
1159        assert!(result.is_err());
1160        assert!(matches!(result.unwrap_err(), TestError::NonRetryable(_)));
1161        // Should stop at the non-retryable error
1162        assert_eq!(attempt_counter.load(Ordering::SeqCst), 3);
1163    }
1164
1165    #[tokio::test]
1166    async fn test_cancellation_during_retry_delay() {
1167        use tokio_util::sync::CancellationToken;
1168
1169        let config = RetryConfig {
1170            max_retries: 10,
1171            initial_delay_ms: 500, // Long delay to ensure cancellation happens during sleep
1172            max_delay_ms: 1000,
1173            backoff_factor: 2.0,
1174            jitter_ms: 0,
1175            operation_timeout_ms: None,
1176            immediate_first: false,
1177            max_elapsed_ms: None,
1178        };
1179        let manager = RetryManager::new(config);
1180
1181        let token = CancellationToken::new();
1182        let token_clone = token.clone();
1183
1184        // Cancel after a short delay
1185        tokio::spawn(async move {
1186            tokio::time::sleep(Duration::from_millis(100)).await;
1187            token_clone.cancel();
1188        });
1189
1190        let attempt_counter = Arc::new(AtomicU32::new(0));
1191        let counter_clone = attempt_counter.clone();
1192
1193        let start = tokio::time::Instant::now();
1194        let result = manager
1195            .execute_with_retry_with_cancel(
1196                "test_cancellation",
1197                move || {
1198                    let counter = counter_clone.clone();
1199                    async move {
1200                        counter.fetch_add(1, Ordering::SeqCst);
1201                        Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1202                    }
1203                },
1204                should_retry_test_error,
1205                create_test_error,
1206                &token,
1207            )
1208            .await;
1209
1210        let elapsed = start.elapsed();
1211
1212        // Should be canceled quickly
1213        assert!(result.is_err());
1214        let error_msg = format!("{}", result.unwrap_err());
1215        assert!(error_msg.contains("canceled"));
1216
1217        // Should not have taken the full delay time
1218        assert!(elapsed.as_millis() < 600);
1219
1220        // Should have made at least one attempt
1221        let attempts = attempt_counter.load(Ordering::SeqCst);
1222        assert!(attempts >= 1);
1223    }
1224
1225    #[tokio::test]
1226    async fn test_cancellation_during_operation_execution() {
1227        use tokio_util::sync::CancellationToken;
1228
1229        let config = RetryConfig {
1230            max_retries: 5,
1231            initial_delay_ms: 50,
1232            max_delay_ms: 100,
1233            backoff_factor: 2.0,
1234            jitter_ms: 0,
1235            operation_timeout_ms: None,
1236            immediate_first: false,
1237            max_elapsed_ms: None,
1238        };
1239        let manager = RetryManager::new(config);
1240
1241        let token = CancellationToken::new();
1242        let token_clone = token.clone();
1243
1244        // Cancel after a short delay
1245        tokio::spawn(async move {
1246            tokio::time::sleep(Duration::from_millis(50)).await;
1247            token_clone.cancel();
1248        });
1249
1250        let start = tokio::time::Instant::now();
1251        let result = manager
1252            .execute_with_retry_with_cancel(
1253                "test_cancellation_during_op",
1254                || async {
1255                    // Long-running operation
1256                    tokio::time::sleep(Duration::from_millis(200)).await;
1257                    Ok::<i32, TestError>(42)
1258                },
1259                should_retry_test_error,
1260                create_test_error,
1261                &token,
1262            )
1263            .await;
1264
1265        let elapsed = start.elapsed();
1266
1267        // Should be canceled during the operation
1268        assert!(result.is_err());
1269        let error_msg = format!("{}", result.unwrap_err());
1270        assert!(error_msg.contains("canceled"));
1271
1272        // Should not have completed the long operation
1273        assert!(elapsed.as_millis() < 250);
1274    }
1275
1276    #[tokio::test]
1277    async fn test_cancellation_error_message() {
1278        use tokio_util::sync::CancellationToken;
1279
1280        let config = RetryConfig::default();
1281        let manager = RetryManager::new(config);
1282
1283        let token = CancellationToken::new();
1284        token.cancel(); // Pre-cancel for immediate cancellation
1285
1286        let result = manager
1287            .execute_with_retry_with_cancel(
1288                "test_operation",
1289                || async { Ok::<i32, TestError>(42) },
1290                should_retry_test_error,
1291                create_test_error,
1292                &token,
1293            )
1294            .await;
1295
1296        assert!(result.is_err());
1297        let error_msg = format!("{}", result.unwrap_err());
1298        assert!(error_msg.contains("canceled"));
1299    }
1300}
1301
1302#[cfg(test)]
1303mod proptest_tests {
1304    use std::sync::{
1305        Arc,
1306        atomic::{AtomicU32, Ordering},
1307    };
1308
1309    use nautilus_core::MUTEX_POISONED;
1310    use proptest::prelude::*;
1311    // Import rstest attribute macro used within proptest! tests
1312    use rstest::rstest;
1313
1314    use super::{
1315        test_utils::*,
1316        tests::{advance_until, yield_until},
1317        *,
1318    };
1319
1320    proptest! {
1321        #[rstest]
1322        fn test_retry_config_valid_ranges(
1323            max_retries in 0u32..100,
1324            initial_delay_ms in 1u64..10_000,
1325            max_delay_ms in 1u64..60_000,
1326            backoff_factor in 1.0f64..10.0,
1327            jitter_ms in 0u64..1_000,
1328            operation_timeout_ms in prop::option::of(1u64..120_000),
1329            immediate_first in any::<bool>(),
1330            max_elapsed_ms in prop::option::of(1u64..300_000)
1331        ) {
1332            // Ensure max_delay >= initial_delay for valid config
1333            let max_delay_ms = max_delay_ms.max(initial_delay_ms);
1334
1335            let config = RetryConfig {
1336                max_retries,
1337                initial_delay_ms,
1338                max_delay_ms,
1339                backoff_factor,
1340                jitter_ms,
1341                operation_timeout_ms,
1342                immediate_first,
1343                max_elapsed_ms,
1344            };
1345
1346            // Should always be able to create a RetryManager with valid config
1347            let _manager = RetryManager::<std::io::Error>::new(config);
1348        }
1349
1350        #[rstest]
1351        fn test_retry_attempts_bounded(
1352            max_retries in 0u32..5,
1353            initial_delay_ms in 1u64..10,
1354            backoff_factor in 1.0f64..2.0,
1355        ) {
1356            let rt = tokio::runtime::Builder::new_current_thread()
1357                .enable_time()
1358                .build()
1359                .unwrap();
1360
1361            let config = RetryConfig {
1362                max_retries,
1363                initial_delay_ms,
1364                max_delay_ms: initial_delay_ms * 2,
1365                backoff_factor,
1366                jitter_ms: 0,
1367                operation_timeout_ms: None,
1368                immediate_first: false,
1369                max_elapsed_ms: None,
1370            };
1371
1372            let manager = RetryManager::new(config);
1373            let attempt_counter = Arc::new(AtomicU32::new(0));
1374            let counter_clone = attempt_counter.clone();
1375
1376            let _result = rt.block_on(manager.execute_with_retry(
1377                "prop_test",
1378                move || {
1379                    let counter = counter_clone.clone();
1380                    async move {
1381                        counter.fetch_add(1, Ordering::SeqCst);
1382                        Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1383                    }
1384                },
1385                |e: &TestError| matches!(e, TestError::Retryable(_)),
1386                TestError::Timeout,
1387            ));
1388
1389            let attempts = attempt_counter.load(Ordering::SeqCst);
1390            // Total attempts should be 1 (initial) + max_retries
1391            prop_assert_eq!(attempts, max_retries + 1);
1392        }
1393
1394        #[rstest]
1395        fn test_timeout_always_respected(
1396            timeout_ms in 10u64..50,
1397            operation_delay_ms in 60u64..100,
1398        ) {
1399            let rt = tokio::runtime::Builder::new_current_thread()
1400                .enable_time()
1401                .start_paused(true)
1402                .build()
1403                .unwrap();
1404
1405            let config = RetryConfig {
1406                max_retries: 0, // No retries to isolate timeout behavior
1407                initial_delay_ms: 10,
1408                max_delay_ms: 100,
1409                backoff_factor: 2.0,
1410                jitter_ms: 0,
1411                operation_timeout_ms: Some(timeout_ms),
1412                immediate_first: false,
1413                max_elapsed_ms: None,
1414            };
1415
1416            let manager = RetryManager::new(config);
1417
1418            let result = rt.block_on(async {
1419                let operation_future = manager.execute_with_retry(
1420                    "timeout_test",
1421                    move || async move {
1422                        tokio::time::sleep(Duration::from_millis(operation_delay_ms)).await;
1423                        Ok::<i32, TestError>(42)
1424                    },
1425                    |_: &TestError| true,
1426                    TestError::Timeout,
1427                );
1428
1429                // Advance time to trigger timeout
1430                tokio::time::advance(Duration::from_millis(timeout_ms + 10)).await;
1431                operation_future.await
1432            });
1433
1434            // Operation should timeout
1435            prop_assert!(result.is_err());
1436            prop_assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
1437        }
1438
1439        #[rstest]
1440        fn test_max_elapsed_always_respected(
1441            max_elapsed_ms in 20u64..50,
1442            delay_per_retry in 15u64..30,
1443            max_retries in 10u32..20,
1444        ) {
1445            let rt = tokio::runtime::Builder::new_current_thread()
1446                .enable_time()
1447                .start_paused(true)
1448                .build()
1449                .unwrap();
1450
1451            // Set up config where we would exceed max_elapsed_ms before max_retries
1452            let config = RetryConfig {
1453                max_retries,
1454                initial_delay_ms: delay_per_retry,
1455                max_delay_ms: delay_per_retry * 2,
1456                backoff_factor: 1.0, // No backoff to make timing predictable
1457                jitter_ms: 0,
1458                operation_timeout_ms: None,
1459                immediate_first: false,
1460                max_elapsed_ms: Some(max_elapsed_ms),
1461            };
1462
1463            let manager = RetryManager::new(config);
1464            let attempt_counter = Arc::new(AtomicU32::new(0));
1465            let counter_clone = attempt_counter.clone();
1466
1467            let result = rt.block_on(async {
1468                let operation_future = manager.execute_with_retry(
1469                    "elapsed_test",
1470                    move || {
1471                        let counter = counter_clone.clone();
1472                        async move {
1473                            counter.fetch_add(1, Ordering::SeqCst);
1474                            Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1475                        }
1476                    },
1477                    |e: &TestError| matches!(e, TestError::Retryable(_)),
1478                    TestError::Timeout,
1479                );
1480
1481                // Advance time past max_elapsed_ms
1482                tokio::time::advance(Duration::from_millis(max_elapsed_ms + delay_per_retry)).await;
1483                operation_future.await
1484            });
1485
1486            let attempts = attempt_counter.load(Ordering::SeqCst);
1487
1488            // Should have failed with timeout error
1489            prop_assert!(result.is_err());
1490            prop_assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
1491
1492            // Should have stopped before exhausting all retries
1493            prop_assert!(attempts <= max_retries + 1);
1494        }
1495
1496        #[rstest]
1497        fn test_jitter_bounds(
1498            jitter_ms in 0u64..20,
1499            base_delay_ms in 10u64..30,
1500        ) {
1501            let rt = tokio::runtime::Builder::new_current_thread()
1502                .enable_time()
1503                .start_paused(true)
1504                .build()
1505                .unwrap();
1506
1507            let config = RetryConfig {
1508                max_retries: 2,
1509                initial_delay_ms: base_delay_ms,
1510                max_delay_ms: base_delay_ms * 2,
1511                backoff_factor: 1.0, // No backoff to isolate jitter
1512                jitter_ms,
1513                operation_timeout_ms: None,
1514                immediate_first: false,
1515                max_elapsed_ms: None,
1516            };
1517
1518            let manager = RetryManager::new(config);
1519            let attempt_times = Arc::new(std::sync::Mutex::new(Vec::new()));
1520            let attempt_times_for_block = attempt_times.clone();
1521
1522            rt.block_on(async move {
1523                let attempt_times_for_wait = attempt_times_for_block.clone();
1524                let handle = tokio::spawn({
1525                    let attempt_times_for_task = attempt_times_for_block.clone();
1526                    let manager = manager;
1527                    async move {
1528                        let start_time = tokio::time::Instant::now();
1529                        let _ = manager
1530                            .execute_with_retry(
1531                                "jitter_test",
1532                                move || {
1533                                    let attempt_times_inner = attempt_times_for_task.clone();
1534                                    async move {
1535                                        attempt_times_inner
1536                                            .lock()
1537                                            .unwrap()
1538                                            .push(start_time.elapsed());
1539                                        Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1540                                    }
1541                                },
1542                                |e: &TestError| matches!(e, TestError::Retryable(_)),
1543                                TestError::Timeout,
1544                            )
1545                            .await;
1546                    }
1547                });
1548
1549                yield_until(|| !attempt_times_for_wait.lock().expect(MUTEX_POISONED).is_empty()).await;
1550                advance_until(|| attempt_times_for_wait.lock().expect(MUTEX_POISONED).len() >= 2).await;
1551                advance_until(|| attempt_times_for_wait.lock().expect(MUTEX_POISONED).len() >= 3).await;
1552
1553                handle.await.unwrap();
1554            });
1555
1556            let times = attempt_times.lock().expect(MUTEX_POISONED);
1557
1558            // We expect at least 2 attempts total (initial + at least 1 retry)
1559            prop_assert!(times.len() >= 2);
1560
1561            // First attempt should be immediate (no delay)
1562            prop_assert!(times[0].as_millis() < 5);
1563
1564            // Check subsequent retries have appropriate delays
1565            for i in 1..times.len() {
1566                let delay_from_previous = if i == 1 {
1567                    times[i] - times[0]
1568                } else {
1569                    times[i] - times[i - 1]
1570                };
1571
1572                // The delay should be at least base_delay_ms
1573                prop_assert!(
1574                    delay_from_previous.as_millis() >= base_delay_ms as u128,
1575                    "Retry {} delay {}ms is less than base {}ms",
1576                    i, delay_from_previous.as_millis(), base_delay_ms
1577                );
1578
1579                // Delay should be at most base_delay + jitter
1580                prop_assert!(
1581                    delay_from_previous.as_millis() <= (base_delay_ms + jitter_ms + 1) as u128,
1582                    "Retry {} delay {}ms exceeds base {} + jitter {}",
1583                    i, delay_from_previous.as_millis(), base_delay_ms, jitter_ms
1584                );
1585            }
1586        }
1587
1588        #[rstest]
1589        fn test_immediate_first_property(
1590            immediate_first in any::<bool>(),
1591            initial_delay_ms in 10u64..30,
1592        ) {
1593            let rt = tokio::runtime::Builder::new_current_thread()
1594                .enable_time()
1595                .start_paused(true)
1596                .build()
1597                .unwrap();
1598
1599            let config = RetryConfig {
1600                max_retries: 2,
1601                initial_delay_ms,
1602                max_delay_ms: initial_delay_ms * 2,
1603                backoff_factor: 2.0,
1604                jitter_ms: 0,
1605                operation_timeout_ms: None,
1606                immediate_first,
1607                max_elapsed_ms: None,
1608            };
1609
1610            let manager = RetryManager::new(config);
1611            let attempt_times = Arc::new(std::sync::Mutex::new(Vec::new()));
1612            let attempt_times_for_block = attempt_times.clone();
1613
1614            rt.block_on(async move {
1615                let attempt_times_for_wait = attempt_times_for_block.clone();
1616                let handle = tokio::spawn({
1617                    let attempt_times_for_task = attempt_times_for_block.clone();
1618                    let manager = manager;
1619                    async move {
1620                        let start = tokio::time::Instant::now();
1621                        let _ = manager
1622                            .execute_with_retry(
1623                                "immediate_test",
1624                                move || {
1625                                    let attempt_times_inner = attempt_times_for_task.clone();
1626                                    async move {
1627                                        let elapsed = start.elapsed();
1628                                        attempt_times_inner.lock().expect(MUTEX_POISONED).push(elapsed);
1629                                        Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1630                                    }
1631                                },
1632                                |e: &TestError| matches!(e, TestError::Retryable(_)),
1633                                TestError::Timeout,
1634                            )
1635                            .await;
1636                    }
1637                });
1638
1639                yield_until(|| !attempt_times_for_wait.lock().expect(MUTEX_POISONED).is_empty()).await;
1640                advance_until(|| attempt_times_for_wait.lock().expect(MUTEX_POISONED).len() >= 2).await;
1641                advance_until(|| attempt_times_for_wait.lock().expect(MUTEX_POISONED).len() >= 3).await;
1642
1643                handle.await.unwrap();
1644            });
1645
1646            let times = attempt_times.lock().expect(MUTEX_POISONED);
1647            prop_assert!(times.len() >= 2);
1648
1649            if immediate_first {
1650                // First retry should be immediate
1651                prop_assert!(times[1].as_millis() < 20,
1652                    "With immediate_first=true, first retry took {}ms",
1653                    times[1].as_millis());
1654            } else {
1655                // First retry should have delay
1656                prop_assert!(times[1].as_millis() >= (initial_delay_ms - 1) as u128,
1657                    "With immediate_first=false, first retry was too fast: {}ms",
1658                    times[1].as_millis());
1659            }
1660        }
1661
1662        #[rstest]
1663        fn test_non_retryable_stops_immediately(
1664            attempt_before_non_retryable in 0usize..3,
1665            max_retries in 3u32..5,
1666        ) {
1667            let rt = tokio::runtime::Builder::new_current_thread()
1668                .enable_time()
1669                .build()
1670                .unwrap();
1671
1672            let config = RetryConfig {
1673                max_retries,
1674                initial_delay_ms: 10,
1675                max_delay_ms: 100,
1676                backoff_factor: 2.0,
1677                jitter_ms: 0,
1678                operation_timeout_ms: None,
1679                immediate_first: false,
1680                max_elapsed_ms: None,
1681            };
1682
1683            let manager = RetryManager::new(config);
1684            let attempt_counter = Arc::new(AtomicU32::new(0));
1685            let counter_clone = attempt_counter.clone();
1686
1687            let result: Result<i32, TestError> = rt.block_on(manager.execute_with_retry(
1688                "non_retryable_test",
1689                move || {
1690                    let counter = counter_clone.clone();
1691                    async move {
1692                        let attempts = counter.fetch_add(1, Ordering::SeqCst) as usize;
1693                        if attempts == attempt_before_non_retryable {
1694                            Err(TestError::NonRetryable("stop".to_string()))
1695                        } else {
1696                            Err(TestError::Retryable("retry".to_string()))
1697                        }
1698                    }
1699                },
1700                |e: &TestError| matches!(e, TestError::Retryable(_)),
1701                TestError::Timeout,
1702            ));
1703
1704            let attempts = attempt_counter.load(Ordering::SeqCst) as usize;
1705
1706            prop_assert!(result.is_err());
1707            prop_assert!(matches!(result.unwrap_err(), TestError::NonRetryable(_)));
1708            // Should stop exactly when non-retryable error occurs
1709            prop_assert_eq!(attempts, attempt_before_non_retryable + 1);
1710        }
1711
1712        #[rstest]
1713        fn test_cancellation_stops_immediately(
1714            cancel_after_ms in 10u64..100,
1715            initial_delay_ms in 200u64..500,
1716        ) {
1717            use tokio_util::sync::CancellationToken;
1718
1719            let rt = tokio::runtime::Builder::new_current_thread()
1720                .enable_time()
1721                .start_paused(true)
1722                .build()
1723                .unwrap();
1724
1725            let config = RetryConfig {
1726                max_retries: 10,
1727                initial_delay_ms,
1728                max_delay_ms: initial_delay_ms * 2,
1729                backoff_factor: 2.0,
1730                jitter_ms: 0,
1731                operation_timeout_ms: None,
1732                immediate_first: false,
1733                max_elapsed_ms: None,
1734            };
1735
1736            let manager = RetryManager::new(config);
1737            let token = CancellationToken::new();
1738            let token_clone = token.clone();
1739
1740            let result: Result<i32, TestError> = rt.block_on(async {
1741                // Spawn cancellation task
1742                tokio::spawn(async move {
1743                    tokio::time::sleep(Duration::from_millis(cancel_after_ms)).await;
1744                    token_clone.cancel();
1745                });
1746
1747                let operation_future = manager.execute_with_retry_with_cancel(
1748                    "cancellation_test",
1749                    || async {
1750                        Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1751                    },
1752                    |e: &TestError| matches!(e, TestError::Retryable(_)),
1753                    create_test_error,
1754                    &token,
1755                );
1756
1757                // Advance time to trigger cancellation
1758                tokio::time::advance(Duration::from_millis(cancel_after_ms + 10)).await;
1759                operation_future.await
1760            });
1761
1762            // Should be canceled
1763            prop_assert!(result.is_err());
1764            let error_msg = format!("{}", result.unwrap_err());
1765            prop_assert!(error_msg.contains("canceled"));
1766        }
1767
1768        #[rstest]
1769        fn test_budget_clamp_prevents_overshoot(
1770            max_elapsed_ms in 10u64..30,
1771            delay_per_retry in 20u64..50,
1772        ) {
1773            let rt = tokio::runtime::Builder::new_current_thread()
1774                .enable_time()
1775                .start_paused(true)
1776                .build()
1777                .unwrap();
1778
1779            // Configure so that first retry delay would exceed budget
1780            let config = RetryConfig {
1781                max_retries: 5,
1782                initial_delay_ms: delay_per_retry,
1783                max_delay_ms: delay_per_retry * 2,
1784                backoff_factor: 1.0,
1785                jitter_ms: 0,
1786                operation_timeout_ms: None,
1787                immediate_first: false,
1788                max_elapsed_ms: Some(max_elapsed_ms),
1789            };
1790
1791            let manager = RetryManager::new(config);
1792
1793            let _result = rt.block_on(async {
1794                let operation_future = manager.execute_with_retry(
1795                    "budget_clamp_test",
1796                    || async {
1797                        // Fast operation to focus on delay timing
1798                        Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1799                    },
1800                    |e: &TestError| matches!(e, TestError::Retryable(_)),
1801                    create_test_error,
1802                );
1803
1804                // Advance time past max_elapsed_ms
1805                tokio::time::advance(Duration::from_millis(max_elapsed_ms + delay_per_retry)).await;
1806                operation_future.await
1807            });
1808
1809            // With deterministic time, operation completes without wall-clock delay
1810            // The budget constraint is still enforced by the retry manager
1811        }
1812
1813        #[rstest]
1814        fn test_success_on_kth_attempt(
1815            k in 1usize..5,
1816            initial_delay_ms in 5u64..20,
1817        ) {
1818            let rt = tokio::runtime::Builder::new_current_thread()
1819                .enable_time()
1820                .start_paused(true)
1821                .build()
1822                .unwrap();
1823
1824            let config = RetryConfig {
1825                max_retries: 10, // More than k
1826                initial_delay_ms,
1827                max_delay_ms: initial_delay_ms * 4,
1828                backoff_factor: 2.0,
1829                jitter_ms: 0,
1830                operation_timeout_ms: None,
1831                immediate_first: false,
1832                max_elapsed_ms: None,
1833            };
1834
1835            let manager = RetryManager::new(config);
1836            let attempt_counter = Arc::new(AtomicU32::new(0));
1837            let counter_clone = attempt_counter.clone();
1838            let target_k = k;
1839
1840            let (result, _elapsed) = rt.block_on(async {
1841                let start = tokio::time::Instant::now();
1842
1843                let operation_future = manager.execute_with_retry(
1844                    "kth_attempt_test",
1845                    move || {
1846                        let counter = counter_clone.clone();
1847                        async move {
1848                            let attempt = counter.fetch_add(1, Ordering::SeqCst) as usize;
1849                            if attempt + 1 == target_k {
1850                                Ok(42)
1851                            } else {
1852                                Err(TestError::Retryable("retry".to_string()))
1853                            }
1854                        }
1855                    },
1856                    |e: &TestError| matches!(e, TestError::Retryable(_)),
1857                    create_test_error,
1858                );
1859
1860                // Advance time to allow enough retries
1861                for _ in 0..k {
1862                    tokio::time::advance(Duration::from_millis(initial_delay_ms * 4)).await;
1863                }
1864
1865                let result = operation_future.await;
1866                let elapsed = start.elapsed();
1867
1868                (result, elapsed)
1869            });
1870
1871            let attempts = attempt_counter.load(Ordering::SeqCst) as usize;
1872
1873            // Using paused Tokio time (start_paused + advance); assert behavior only (no wall-clock timing)
1874            prop_assert!(result.is_ok());
1875            prop_assert_eq!(result.unwrap(), 42);
1876            prop_assert_eq!(attempts, k);
1877        }
1878    }
1879}