nautilus_network/
retry.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Generic retry mechanism for network operations.
17
18use std::{future::Future, marker::PhantomData, time::Duration};
19
20use tokio_util::sync::CancellationToken;
21
22use crate::backoff::ExponentialBackoff;
23
24/// Configuration for retry behavior.
25#[derive(Debug, Clone)]
26pub struct RetryConfig {
27    /// Maximum number of retry attempts (total attempts = 1 initial + `max_retries`).
28    pub max_retries: u32,
29    /// Initial delay between retries in milliseconds.
30    pub initial_delay_ms: u64,
31    /// Maximum delay between retries in milliseconds.
32    pub max_delay_ms: u64,
33    /// Backoff multiplier factor.
34    pub backoff_factor: f64,
35    /// Maximum jitter in milliseconds to add to delays.
36    pub jitter_ms: u64,
37    /// Optional timeout for individual operations in milliseconds.
38    /// If None, no timeout is applied.
39    pub operation_timeout_ms: Option<u64>,
40    /// Whether the first retry should happen immediately without delay.
41    /// Should be false for HTTP/order operations, true for connection operations.
42    pub immediate_first: bool,
43    /// Optional maximum total elapsed time for all retries in milliseconds.
44    /// If exceeded, retries stop even if `max_retries` hasn't been reached.
45    pub max_elapsed_ms: Option<u64>,
46}
47
48impl Default for RetryConfig {
49    fn default() -> Self {
50        Self {
51            max_retries: 3,
52            initial_delay_ms: 1_000,
53            max_delay_ms: 10_000,
54            backoff_factor: 2.0,
55            jitter_ms: 100,
56            operation_timeout_ms: Some(30_000),
57            immediate_first: false,
58            max_elapsed_ms: None,
59        }
60    }
61}
62
63/// Generic retry manager for network operations.
64///
65/// Stateless and thread-safe - each operation maintains its own backoff state.
66#[derive(Clone, Debug)]
67pub struct RetryManager<E> {
68    config: RetryConfig,
69    _phantom: PhantomData<E>,
70}
71
72impl<E> RetryManager<E>
73where
74    E: std::error::Error,
75{
76    /// Creates a new retry manager with the given configuration.
77    pub const fn new(config: RetryConfig) -> Self {
78        Self {
79            config,
80            _phantom: PhantomData,
81        }
82    }
83
84    /// Formats a retry budget exceeded error message with attempt context.
85    #[inline(always)]
86    fn budget_exceeded_msg(&self, attempt: u32) -> String {
87        format!(
88            "Retry budget exceeded ({}/{})",
89            attempt.saturating_add(1),
90            self.config.max_retries.saturating_add(1)
91        )
92    }
93
94    /// Executes an operation with retry logic and optional cancellation.
95    ///
96    /// Cancellation is checked at three points:
97    /// (1) Before each operation attempt.
98    /// (2) During operation execution (via `tokio::select!`).
99    /// (3) During retry delays.
100    ///
101    /// This means cancellation may be delayed by up to one operation timeout if it occurs mid-execution.
102    ///
103    /// # Errors
104    ///
105    /// Returns an error if the operation fails after exhausting all retries,
106    /// if the operation times out, if creating the backoff state fails, or if canceled.
107    pub async fn execute_with_retry_inner<F, Fut, T>(
108        &self,
109        operation_name: &str,
110        mut operation: F,
111        should_retry: impl Fn(&E) -> bool,
112        create_error: impl Fn(String) -> E,
113        cancel: Option<&CancellationToken>,
114    ) -> Result<T, E>
115    where
116        F: FnMut() -> Fut,
117        Fut: Future<Output = Result<T, E>>,
118    {
119        let mut backoff = ExponentialBackoff::new(
120            Duration::from_millis(self.config.initial_delay_ms),
121            Duration::from_millis(self.config.max_delay_ms),
122            self.config.backoff_factor,
123            self.config.jitter_ms,
124            self.config.immediate_first,
125        )
126        .map_err(|e| create_error(format!("Invalid configuration: {e}")))?;
127
128        let mut attempt = 0;
129        let start_time = tokio::time::Instant::now();
130
131        loop {
132            if let Some(token) = cancel
133                && token.is_cancelled()
134            {
135                log::debug!("Operation '{operation_name}' canceled after {attempt} attempts");
136                return Err(create_error("canceled".to_string()));
137            }
138
139            if let Some(max_elapsed_ms) = self.config.max_elapsed_ms {
140                let elapsed = start_time.elapsed();
141                if elapsed.as_millis() >= u128::from(max_elapsed_ms) {
142                    return Err(create_error(self.budget_exceeded_msg(attempt)));
143                }
144            }
145
146            let result = match (self.config.operation_timeout_ms, cancel) {
147                (Some(timeout_ms), Some(token)) => {
148                    tokio::select! {
149                        result = tokio::time::timeout(Duration::from_millis(timeout_ms), operation()) => result,
150                        () = token.cancelled() => {
151                            log::debug!("Operation '{operation_name}' canceled during execution");
152                            return Err(create_error("canceled".to_string()));
153                        }
154                    }
155                }
156                (Some(timeout_ms), None) => {
157                    tokio::time::timeout(Duration::from_millis(timeout_ms), operation()).await
158                }
159                (None, Some(token)) => {
160                    tokio::select! {
161                        result = operation() => Ok(result),
162                        () = token.cancelled() => {
163                            log::debug!("Operation '{operation_name}' canceled during execution");
164                            return Err(create_error("canceled".to_string()));
165                        }
166                    }
167                }
168                (None, None) => Ok(operation().await),
169            };
170
171            match result {
172                Ok(Ok(success)) => {
173                    if attempt > 0 {
174                        log::trace!(
175                            "Operation '{operation_name}' succeeded after {} attempts",
176                            attempt + 1
177                        );
178                    }
179                    return Ok(success);
180                }
181                Ok(Err(e)) => {
182                    if !should_retry(&e) {
183                        log::trace!("Operation '{operation_name}' non-retryable error: {e}");
184                        return Err(e);
185                    }
186
187                    if attempt >= self.config.max_retries {
188                        log::trace!(
189                            "Operation '{operation_name}' retries exhausted after {} attempts: {e}",
190                            attempt + 1
191                        );
192                        return Err(e);
193                    }
194
195                    let mut delay = backoff.next_duration();
196
197                    if let Some(max_elapsed_ms) = self.config.max_elapsed_ms {
198                        let elapsed = start_time.elapsed();
199                        let remaining =
200                            Duration::from_millis(max_elapsed_ms).saturating_sub(elapsed);
201
202                        if remaining.is_zero() {
203                            return Err(create_error(self.budget_exceeded_msg(attempt)));
204                        }
205
206                        delay = delay.min(remaining);
207                    }
208
209                    log::trace!(
210                        "Operation '{operation_name}' attempt {} failed, retrying in {}ms: {e}",
211                        attempt + 1,
212                        delay.as_millis()
213                    );
214
215                    // Yield even on zero-delay to avoid busy-wait loop
216                    if delay.is_zero() {
217                        tokio::task::yield_now().await;
218                        attempt += 1;
219                        continue;
220                    }
221
222                    if let Some(token) = cancel {
223                        tokio::select! {
224                            () = tokio::time::sleep(delay) => {},
225                            () = token.cancelled() => {
226                                log::debug!("Operation '{operation_name}' canceled during retry delay (attempt {})", attempt + 1);
227                                return Err(create_error("canceled".to_string()));
228                            }
229                        }
230                    } else {
231                        tokio::time::sleep(delay).await;
232                    }
233                    attempt += 1;
234                }
235                Err(_) => {
236                    let e = create_error(format!(
237                        "Timed out after {}ms",
238                        self.config.operation_timeout_ms.unwrap_or(0)
239                    ));
240
241                    if !should_retry(&e) {
242                        log::trace!("Operation '{operation_name}' non-retryable timeout: {e}");
243                        return Err(e);
244                    }
245
246                    if attempt >= self.config.max_retries {
247                        log::trace!(
248                            "Operation '{operation_name}' retries exhausted after timeout ({} attempts): {e}",
249                            attempt + 1
250                        );
251                        return Err(e);
252                    }
253
254                    let mut delay = backoff.next_duration();
255
256                    if let Some(max_elapsed_ms) = self.config.max_elapsed_ms {
257                        let elapsed = start_time.elapsed();
258                        let remaining =
259                            Duration::from_millis(max_elapsed_ms).saturating_sub(elapsed);
260
261                        if remaining.is_zero() {
262                            return Err(create_error(self.budget_exceeded_msg(attempt)));
263                        }
264
265                        delay = delay.min(remaining);
266                    }
267
268                    log::trace!(
269                        "Operation '{operation_name}' attempt {} timed out, retrying in {}ms: {e}",
270                        attempt + 1,
271                        delay.as_millis()
272                    );
273
274                    // Yield even on zero-delay to avoid busy-wait loop
275                    if delay.is_zero() {
276                        tokio::task::yield_now().await;
277                        attempt += 1;
278                        continue;
279                    }
280
281                    if let Some(token) = cancel {
282                        tokio::select! {
283                            () = tokio::time::sleep(delay) => {},
284                            () = token.cancelled() => {
285                                log::debug!("Operation '{operation_name}' canceled during retry delay (attempt {})", attempt + 1);
286                                return Err(create_error("canceled".to_string()));
287                            }
288                        }
289                    } else {
290                        tokio::time::sleep(delay).await;
291                    }
292                    attempt += 1;
293                }
294            }
295        }
296    }
297
298    /// Executes an operation with retry logic.
299    ///
300    /// # Errors
301    ///
302    /// Returns an error if the operation fails after exhausting all retries,
303    /// if the operation times out, or if creating the backoff state fails.
304    pub async fn execute_with_retry<F, Fut, T>(
305        &self,
306        operation_name: &str,
307        operation: F,
308        should_retry: impl Fn(&E) -> bool,
309        create_error: impl Fn(String) -> E,
310    ) -> Result<T, E>
311    where
312        F: FnMut() -> Fut,
313        Fut: Future<Output = Result<T, E>>,
314    {
315        self.execute_with_retry_inner(operation_name, operation, should_retry, create_error, None)
316            .await
317    }
318
319    /// Executes an operation with retry logic and cancellation support.
320    ///
321    /// # Errors
322    ///
323    /// Returns an error if the operation fails after exhausting all retries,
324    /// if the operation times out, if creating the backoff state fails, or if canceled.
325    pub async fn execute_with_retry_with_cancel<F, Fut, T>(
326        &self,
327        operation_name: &str,
328        operation: F,
329        should_retry: impl Fn(&E) -> bool,
330        create_error: impl Fn(String) -> E,
331        cancellation_token: &CancellationToken,
332    ) -> Result<T, E>
333    where
334        F: FnMut() -> Fut,
335        Fut: Future<Output = Result<T, E>>,
336    {
337        self.execute_with_retry_inner(
338            operation_name,
339            operation,
340            should_retry,
341            create_error,
342            Some(cancellation_token),
343        )
344        .await
345    }
346}
347
348/// Convenience function to create a retry manager with default configuration.
349pub fn create_default_retry_manager<E>() -> RetryManager<E>
350where
351    E: std::error::Error,
352{
353    RetryManager::new(RetryConfig::default())
354}
355
356/// Convenience function to create a retry manager for HTTP operations.
357pub const fn create_http_retry_manager<E>() -> RetryManager<E>
358where
359    E: std::error::Error,
360{
361    let config = RetryConfig {
362        max_retries: 3,
363        initial_delay_ms: 1_000,
364        max_delay_ms: 10_000,
365        backoff_factor: 2.0,
366        jitter_ms: 1_000,
367        operation_timeout_ms: Some(60_000), // 60s for HTTP requests
368        immediate_first: false,
369        max_elapsed_ms: Some(180_000), // 3 minutes total budget
370    };
371    RetryManager::new(config)
372}
373
374/// Convenience function to create a retry manager for WebSocket operations.
375pub const fn create_websocket_retry_manager<E>() -> RetryManager<E>
376where
377    E: std::error::Error,
378{
379    let config = RetryConfig {
380        max_retries: 5,
381        initial_delay_ms: 1_000,
382        max_delay_ms: 10_000,
383        backoff_factor: 2.0,
384        jitter_ms: 1_000,
385        operation_timeout_ms: Some(30_000), // 30s for WebSocket operations
386        immediate_first: true,
387        max_elapsed_ms: Some(120_000), // 2 minutes total budget
388    };
389    RetryManager::new(config)
390}
391
392#[cfg(test)]
393mod test_utils {
394    #[derive(Debug, thiserror::Error)]
395    pub enum TestError {
396        #[error("Retryable error: {0}")]
397        Retryable(String),
398        #[error("Non-retryable error: {0}")]
399        NonRetryable(String),
400        #[error("Timeout error: {0}")]
401        Timeout(String),
402    }
403
404    pub fn should_retry_test_error(error: &TestError) -> bool {
405        matches!(error, TestError::Retryable(_))
406    }
407
408    pub fn create_test_error(msg: String) -> TestError {
409        TestError::Timeout(msg)
410    }
411}
412
413#[cfg(test)]
414mod tests {
415    use std::sync::{
416        Arc,
417        atomic::{AtomicU32, Ordering},
418    };
419
420    use nautilus_core::MUTEX_POISONED;
421    use rstest::rstest;
422
423    use super::{test_utils::*, *};
424
425    const MAX_WAIT_ITERS: usize = 10_000;
426    const MAX_ADVANCE_ITERS: usize = 10_000;
427
428    pub(crate) async fn yield_until<F>(mut condition: F)
429    where
430        F: FnMut() -> bool,
431    {
432        for _ in 0..MAX_WAIT_ITERS {
433            if condition() {
434                return;
435            }
436            tokio::task::yield_now().await;
437        }
438
439        panic!("yield_until timed out waiting for condition");
440    }
441
442    pub(crate) async fn advance_until<F>(mut condition: F)
443    where
444        F: FnMut() -> bool,
445    {
446        for _ in 0..MAX_ADVANCE_ITERS {
447            if condition() {
448                return;
449            }
450            tokio::time::advance(Duration::from_millis(1)).await;
451            tokio::task::yield_now().await;
452        }
453
454        panic!("advance_until timed out waiting for condition");
455    }
456
457    #[rstest]
458    fn test_retry_config_default() {
459        let config = RetryConfig::default();
460        assert_eq!(config.max_retries, 3);
461        assert_eq!(config.initial_delay_ms, 1_000);
462        assert_eq!(config.max_delay_ms, 10_000);
463        assert_eq!(config.backoff_factor, 2.0);
464        assert_eq!(config.jitter_ms, 100);
465        assert_eq!(config.operation_timeout_ms, Some(30_000));
466        assert!(!config.immediate_first);
467        assert_eq!(config.max_elapsed_ms, None);
468    }
469
470    #[tokio::test]
471    async fn test_retry_manager_success_first_attempt() {
472        let manager = RetryManager::new(RetryConfig::default());
473
474        let result = manager
475            .execute_with_retry(
476                "test_operation",
477                || async { Ok::<i32, TestError>(42) },
478                should_retry_test_error,
479                create_test_error,
480            )
481            .await;
482
483        assert_eq!(result.unwrap(), 42);
484    }
485
486    #[tokio::test]
487    async fn test_retry_manager_non_retryable_error() {
488        let manager = RetryManager::new(RetryConfig::default());
489
490        let result = manager
491            .execute_with_retry(
492                "test_operation",
493                || async { Err::<i32, TestError>(TestError::NonRetryable("test".to_string())) },
494                should_retry_test_error,
495                create_test_error,
496            )
497            .await;
498
499        assert!(result.is_err());
500        assert!(matches!(result.unwrap_err(), TestError::NonRetryable(_)));
501    }
502
503    #[tokio::test]
504    async fn test_retry_manager_retryable_error_exhausted() {
505        let config = RetryConfig {
506            max_retries: 2,
507            initial_delay_ms: 10,
508            max_delay_ms: 50,
509            backoff_factor: 2.0,
510            jitter_ms: 0,
511            operation_timeout_ms: None,
512            immediate_first: false,
513            max_elapsed_ms: None,
514        };
515        let manager = RetryManager::new(config);
516
517        let result = manager
518            .execute_with_retry(
519                "test_operation",
520                || async { Err::<i32, TestError>(TestError::Retryable("test".to_string())) },
521                should_retry_test_error,
522                create_test_error,
523            )
524            .await;
525
526        assert!(result.is_err());
527        assert!(matches!(result.unwrap_err(), TestError::Retryable(_)));
528    }
529
530    #[tokio::test]
531    async fn test_timeout_path() {
532        let config = RetryConfig {
533            max_retries: 2,
534            initial_delay_ms: 10,
535            max_delay_ms: 50,
536            backoff_factor: 2.0,
537            jitter_ms: 0,
538            operation_timeout_ms: Some(50),
539            immediate_first: false,
540            max_elapsed_ms: None,
541        };
542        let manager = RetryManager::new(config);
543
544        let result = manager
545            .execute_with_retry(
546                "test_timeout",
547                || async {
548                    tokio::time::sleep(Duration::from_millis(100)).await;
549                    Ok::<i32, TestError>(42)
550                },
551                should_retry_test_error,
552                create_test_error,
553            )
554            .await;
555
556        assert!(result.is_err());
557        assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
558    }
559
560    #[tokio::test]
561    async fn test_max_elapsed_time_budget() {
562        let config = RetryConfig {
563            max_retries: 10,
564            initial_delay_ms: 50,
565            max_delay_ms: 100,
566            backoff_factor: 2.0,
567            jitter_ms: 0,
568            operation_timeout_ms: None,
569            immediate_first: false,
570            max_elapsed_ms: Some(200),
571        };
572        let manager = RetryManager::new(config);
573
574        let start = tokio::time::Instant::now();
575        let result = manager
576            .execute_with_retry(
577                "test_budget",
578                || async { Err::<i32, TestError>(TestError::Retryable("test".to_string())) },
579                should_retry_test_error,
580                create_test_error,
581            )
582            .await;
583
584        let elapsed = start.elapsed();
585        assert!(result.is_err());
586        assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
587        assert!(elapsed.as_millis() >= 150);
588        assert!(elapsed.as_millis() < 1000);
589    }
590
591    #[tokio::test]
592    async fn test_budget_exceeded_message_format() {
593        let config = RetryConfig {
594            max_retries: 5,
595            initial_delay_ms: 10,
596            max_delay_ms: 20,
597            backoff_factor: 1.0,
598            jitter_ms: 0,
599            operation_timeout_ms: None,
600            immediate_first: false,
601            max_elapsed_ms: Some(35),
602        };
603        let manager = RetryManager::new(config);
604
605        let result = manager
606            .execute_with_retry(
607                "test_budget_msg",
608                || async { Err::<i32, TestError>(TestError::Retryable("test".to_string())) },
609                should_retry_test_error,
610                create_test_error,
611            )
612            .await;
613
614        assert!(result.is_err());
615        let error_msg = result.unwrap_err().to_string();
616
617        assert!(error_msg.contains("Retry budget exceeded"));
618        assert!(error_msg.contains("/6)"));
619
620        if let Some(captures) = error_msg.strip_prefix("Timeout error: Retry budget exceeded (")
621            && let Some(nums) = captures.strip_suffix(")")
622        {
623            let parts: Vec<&str> = nums.split('/').collect();
624            assert_eq!(parts.len(), 2);
625            let current: u32 = parts[0].parse().unwrap();
626            let total: u32 = parts[1].parse().unwrap();
627
628            assert_eq!(total, 6, "Total should be max_retries + 1");
629            assert!(current <= total, "Current attempt should not exceed total");
630            assert!(current >= 1, "Current attempt should be at least 1");
631        }
632    }
633
634    #[tokio::test(start_paused = true)]
635    async fn test_budget_exceeded_edge_cases() {
636        let config = RetryConfig {
637            max_retries: 2,
638            initial_delay_ms: 50,
639            max_delay_ms: 100,
640            backoff_factor: 1.0,
641            jitter_ms: 0,
642            operation_timeout_ms: None,
643            immediate_first: false,
644            max_elapsed_ms: Some(100),
645        };
646        let manager = RetryManager::new(config);
647
648        let attempt_count = Arc::new(AtomicU32::new(0));
649        let count_clone = attempt_count.clone();
650
651        let handle = tokio::spawn(async move {
652            manager
653                .execute_with_retry(
654                    "test_first_attempt",
655                    move || {
656                        let count = count_clone.clone();
657                        async move {
658                            count.fetch_add(1, Ordering::SeqCst);
659                            Err::<i32, TestError>(TestError::Retryable("test".to_string()))
660                        }
661                    },
662                    should_retry_test_error,
663                    create_test_error,
664                )
665                .await
666        });
667
668        // Wait for first attempt
669        yield_until(|| attempt_count.load(Ordering::SeqCst) >= 1).await;
670
671        // Advance past budget to trigger check at loop start before second attempt
672        tokio::time::advance(Duration::from_millis(101)).await;
673        tokio::task::yield_now().await;
674
675        let result = handle.await.unwrap();
676        assert!(result.is_err());
677        let error_msg = result.unwrap_err().to_string();
678
679        // Budget check happens at loop start, so shows (2/3) = "starting 2nd of 3 attempts"
680        assert!(
681            error_msg.contains("(2/3)"),
682            "Expected (2/3) but got: {error_msg}"
683        );
684    }
685
686    #[tokio::test]
687    async fn test_budget_exceeded_no_overflow() {
688        let config = RetryConfig {
689            max_retries: u32::MAX,
690            initial_delay_ms: 10,
691            max_delay_ms: 20,
692            backoff_factor: 1.0,
693            jitter_ms: 0,
694            operation_timeout_ms: None,
695            immediate_first: false,
696            max_elapsed_ms: Some(1),
697        };
698        let manager = RetryManager::new(config);
699
700        let result = manager
701            .execute_with_retry(
702                "test_overflow",
703                || async { Err::<i32, TestError>(TestError::Retryable("test".to_string())) },
704                should_retry_test_error,
705                create_test_error,
706            )
707            .await;
708
709        assert!(result.is_err());
710        let error_msg = result.unwrap_err().to_string();
711
712        // Should saturate at u32::MAX instead of wrapping to 0
713        assert!(error_msg.contains("Retry budget exceeded"));
714        assert!(error_msg.contains(&format!("/{}", u32::MAX)));
715    }
716
717    #[rstest]
718    fn test_http_retry_manager_config() {
719        let manager = create_http_retry_manager::<TestError>();
720        assert_eq!(manager.config.max_retries, 3);
721        assert!(!manager.config.immediate_first);
722        assert_eq!(manager.config.max_elapsed_ms, Some(180_000));
723    }
724
725    #[rstest]
726    fn test_websocket_retry_manager_config() {
727        let manager = create_websocket_retry_manager::<TestError>();
728        assert_eq!(manager.config.max_retries, 5);
729        assert!(manager.config.immediate_first);
730        assert_eq!(manager.config.max_elapsed_ms, Some(120_000));
731    }
732
733    #[tokio::test]
734    async fn test_timeout_respects_retry_predicate() {
735        let config = RetryConfig {
736            max_retries: 3,
737            initial_delay_ms: 10,
738            max_delay_ms: 50,
739            backoff_factor: 2.0,
740            jitter_ms: 0,
741            operation_timeout_ms: Some(50),
742            immediate_first: false,
743            max_elapsed_ms: None,
744        };
745        let manager = RetryManager::new(config);
746
747        // Test with retry predicate that rejects timeouts
748        let should_not_retry_timeouts = |error: &TestError| !matches!(error, TestError::Timeout(_));
749
750        let result = manager
751            .execute_with_retry(
752                "test_timeout_non_retryable",
753                || async {
754                    tokio::time::sleep(Duration::from_millis(100)).await;
755                    Ok::<i32, TestError>(42)
756                },
757                should_not_retry_timeouts,
758                create_test_error,
759            )
760            .await;
761
762        // Should fail immediately without retries since timeout is non-retryable
763        assert!(result.is_err());
764        assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
765    }
766
767    #[tokio::test]
768    async fn test_timeout_retries_when_predicate_allows() {
769        let config = RetryConfig {
770            max_retries: 2,
771            initial_delay_ms: 10,
772            max_delay_ms: 50,
773            backoff_factor: 2.0,
774            jitter_ms: 0,
775            operation_timeout_ms: Some(50),
776            immediate_first: false,
777            max_elapsed_ms: None,
778        };
779        let manager = RetryManager::new(config);
780
781        // Test with retry predicate that allows timeouts
782        let should_retry_timeouts = |error: &TestError| matches!(error, TestError::Timeout(_));
783
784        let start = tokio::time::Instant::now();
785        let result = manager
786            .execute_with_retry(
787                "test_timeout_retryable",
788                || async {
789                    tokio::time::sleep(Duration::from_millis(100)).await;
790                    Ok::<i32, TestError>(42)
791                },
792                should_retry_timeouts,
793                create_test_error,
794            )
795            .await;
796
797        let elapsed = start.elapsed();
798
799        // Should fail after retries (not immediately)
800        assert!(result.is_err());
801        assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
802        // Should have taken time for retries (at least 2 timeouts + delays)
803        assert!(elapsed.as_millis() > 80); // More than just one timeout
804    }
805
806    #[tokio::test]
807    async fn test_successful_retry_after_failures() {
808        let config = RetryConfig {
809            max_retries: 3,
810            initial_delay_ms: 10,
811            max_delay_ms: 50,
812            backoff_factor: 2.0,
813            jitter_ms: 0,
814            operation_timeout_ms: None,
815            immediate_first: false,
816            max_elapsed_ms: None,
817        };
818        let manager = RetryManager::new(config);
819
820        let attempt_counter = Arc::new(AtomicU32::new(0));
821        let counter_clone = attempt_counter.clone();
822
823        let result = manager
824            .execute_with_retry(
825                "test_eventual_success",
826                move || {
827                    let counter = counter_clone.clone();
828                    async move {
829                        let attempts = counter.fetch_add(1, Ordering::SeqCst);
830                        if attempts < 2 {
831                            Err(TestError::Retryable("temporary failure".to_string()))
832                        } else {
833                            Ok(42)
834                        }
835                    }
836                },
837                should_retry_test_error,
838                create_test_error,
839            )
840            .await;
841
842        assert_eq!(result.unwrap(), 42);
843        assert_eq!(attempt_counter.load(Ordering::SeqCst), 3);
844    }
845
846    #[tokio::test(start_paused = true)]
847    async fn test_immediate_first_retry() {
848        let config = RetryConfig {
849            max_retries: 2,
850            initial_delay_ms: 100,
851            max_delay_ms: 200,
852            backoff_factor: 2.0,
853            jitter_ms: 0,
854            operation_timeout_ms: None,
855            immediate_first: true,
856            max_elapsed_ms: None,
857        };
858        let manager = RetryManager::new(config);
859
860        let attempt_times = Arc::new(std::sync::Mutex::new(Vec::new()));
861        let times_clone = attempt_times.clone();
862        let start = tokio::time::Instant::now();
863
864        let handle = tokio::spawn({
865            let times_clone = times_clone.clone();
866            async move {
867                let _ = manager
868                    .execute_with_retry(
869                        "test_immediate",
870                        move || {
871                            let times = times_clone.clone();
872                            async move {
873                                times.lock().expect(MUTEX_POISONED).push(start.elapsed());
874                                Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
875                            }
876                        },
877                        should_retry_test_error,
878                        create_test_error,
879                    )
880                    .await;
881            }
882        });
883
884        // Allow initial attempt and immediate retry to run without advancing time
885        yield_until(|| attempt_times.lock().expect(MUTEX_POISONED).len() >= 2).await;
886
887        // Advance time for the next backoff interval
888        tokio::time::advance(Duration::from_millis(100)).await;
889        tokio::task::yield_now().await;
890
891        // Wait for the final retry to be recorded
892        yield_until(|| attempt_times.lock().expect(MUTEX_POISONED).len() >= 3).await;
893
894        handle.await.unwrap();
895
896        let times = attempt_times.lock().expect(MUTEX_POISONED);
897        assert_eq!(times.len(), 3); // Initial + 2 retries
898
899        // First retry should be immediate (within 1ms tolerance)
900        assert!(times[1] <= Duration::from_millis(1));
901        // Second retry should have backoff delay (at least 100ms from start)
902        assert!(times[2] >= Duration::from_millis(100));
903        assert!(times[2] <= Duration::from_millis(110));
904    }
905
906    #[tokio::test]
907    async fn test_operation_without_timeout() {
908        let config = RetryConfig {
909            max_retries: 2,
910            initial_delay_ms: 10,
911            max_delay_ms: 50,
912            backoff_factor: 2.0,
913            jitter_ms: 0,
914            operation_timeout_ms: None, // No timeout
915            immediate_first: false,
916            max_elapsed_ms: None,
917        };
918        let manager = RetryManager::new(config);
919
920        let start = tokio::time::Instant::now();
921        let result = manager
922            .execute_with_retry(
923                "test_no_timeout",
924                || async {
925                    tokio::time::sleep(Duration::from_millis(50)).await;
926                    Ok::<i32, TestError>(42)
927                },
928                should_retry_test_error,
929                create_test_error,
930            )
931            .await;
932
933        let elapsed = start.elapsed();
934        assert_eq!(result.unwrap(), 42);
935        // Should complete without timing out
936        assert!(elapsed.as_millis() >= 30);
937        assert!(elapsed.as_millis() < 200);
938    }
939
940    #[tokio::test]
941    async fn test_zero_retries() {
942        let config = RetryConfig {
943            max_retries: 0,
944            initial_delay_ms: 10,
945            max_delay_ms: 50,
946            backoff_factor: 2.0,
947            jitter_ms: 0,
948            operation_timeout_ms: None,
949            immediate_first: false,
950            max_elapsed_ms: None,
951        };
952        let manager = RetryManager::new(config);
953
954        let attempt_counter = Arc::new(AtomicU32::new(0));
955        let counter_clone = attempt_counter.clone();
956
957        let result = manager
958            .execute_with_retry(
959                "test_no_retries",
960                move || {
961                    let counter = counter_clone.clone();
962                    async move {
963                        counter.fetch_add(1, Ordering::SeqCst);
964                        Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
965                    }
966                },
967                should_retry_test_error,
968                create_test_error,
969            )
970            .await;
971
972        assert!(result.is_err());
973        // Should only attempt once (no retries)
974        assert_eq!(attempt_counter.load(Ordering::SeqCst), 1);
975    }
976
977    #[tokio::test(start_paused = true)]
978    async fn test_jitter_applied() {
979        let config = RetryConfig {
980            max_retries: 2,
981            initial_delay_ms: 50,
982            max_delay_ms: 100,
983            backoff_factor: 2.0,
984            jitter_ms: 50, // Significant jitter
985            operation_timeout_ms: None,
986            immediate_first: false,
987            max_elapsed_ms: None,
988        };
989        let manager = RetryManager::new(config);
990
991        let delays = Arc::new(std::sync::Mutex::new(Vec::new()));
992        let delays_clone = delays.clone();
993        let last_time = Arc::new(std::sync::Mutex::new(tokio::time::Instant::now()));
994        let last_time_clone = last_time.clone();
995
996        let handle = tokio::spawn({
997            let delays_clone = delays_clone.clone();
998            async move {
999                let _ = manager
1000                    .execute_with_retry(
1001                        "test_jitter",
1002                        move || {
1003                            let delays = delays_clone.clone();
1004                            let last_time = last_time_clone.clone();
1005                            async move {
1006                                let now = tokio::time::Instant::now();
1007                                let delay = {
1008                                    let mut last = last_time.lock().expect(MUTEX_POISONED);
1009                                    let d = now.duration_since(*last);
1010                                    *last = now;
1011                                    d
1012                                };
1013                                delays.lock().expect(MUTEX_POISONED).push(delay);
1014                                Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1015                            }
1016                        },
1017                        should_retry_test_error,
1018                        create_test_error,
1019                    )
1020                    .await;
1021            }
1022        });
1023
1024        yield_until(|| !delays.lock().expect(MUTEX_POISONED).is_empty()).await;
1025        advance_until(|| delays.lock().expect(MUTEX_POISONED).len() >= 2).await;
1026        advance_until(|| delays.lock().expect(MUTEX_POISONED).len() >= 3).await;
1027
1028        handle.await.unwrap();
1029
1030        let delays = delays.lock().expect(MUTEX_POISONED);
1031        // Skip the first delay (initial attempt)
1032        for delay in delays.iter().skip(1) {
1033            // Each delay should be at least the base delay (50ms for first retry)
1034            assert!(delay.as_millis() >= 50);
1035            // But no more than base + jitter (allow small tolerance for step advance)
1036            assert!(delay.as_millis() <= 151);
1037        }
1038    }
1039
1040    #[tokio::test]
1041    async fn test_max_elapsed_stops_early() {
1042        let config = RetryConfig {
1043            max_retries: 100, // Very high retry count
1044            initial_delay_ms: 50,
1045            max_delay_ms: 100,
1046            backoff_factor: 1.5,
1047            jitter_ms: 0,
1048            operation_timeout_ms: None,
1049            immediate_first: false,
1050            max_elapsed_ms: Some(150), // Should stop after ~3 attempts
1051        };
1052        let manager = RetryManager::new(config);
1053
1054        let attempt_counter = Arc::new(AtomicU32::new(0));
1055        let counter_clone = attempt_counter.clone();
1056
1057        let start = tokio::time::Instant::now();
1058        let result = manager
1059            .execute_with_retry(
1060                "test_elapsed_limit",
1061                move || {
1062                    let counter = counter_clone.clone();
1063                    async move {
1064                        counter.fetch_add(1, Ordering::SeqCst);
1065                        Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1066                    }
1067                },
1068                should_retry_test_error,
1069                create_test_error,
1070            )
1071            .await;
1072
1073        let elapsed = start.elapsed();
1074        assert!(result.is_err());
1075        assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
1076
1077        // Should have stopped due to time limit, not retry count
1078        let attempts = attempt_counter.load(Ordering::SeqCst);
1079        assert!(attempts < 10); // Much less than max_retries
1080        assert!(elapsed.as_millis() >= 100);
1081    }
1082
1083    #[tokio::test]
1084    async fn test_mixed_errors_retry_behavior() {
1085        let config = RetryConfig {
1086            max_retries: 5,
1087            initial_delay_ms: 10,
1088            max_delay_ms: 50,
1089            backoff_factor: 2.0,
1090            jitter_ms: 0,
1091            operation_timeout_ms: None,
1092            immediate_first: false,
1093            max_elapsed_ms: None,
1094        };
1095        let manager = RetryManager::new(config);
1096
1097        let attempt_counter = Arc::new(AtomicU32::new(0));
1098        let counter_clone = attempt_counter.clone();
1099
1100        let result = manager
1101            .execute_with_retry(
1102                "test_mixed_errors",
1103                move || {
1104                    let counter = counter_clone.clone();
1105                    async move {
1106                        let attempts = counter.fetch_add(1, Ordering::SeqCst);
1107                        match attempts {
1108                            0 => Err(TestError::Retryable("retry 1".to_string())),
1109                            1 => Err(TestError::Retryable("retry 2".to_string())),
1110                            2 => Err(TestError::NonRetryable("stop here".to_string())),
1111                            _ => Ok(42),
1112                        }
1113                    }
1114                },
1115                should_retry_test_error,
1116                create_test_error,
1117            )
1118            .await;
1119
1120        assert!(result.is_err());
1121        assert!(matches!(result.unwrap_err(), TestError::NonRetryable(_)));
1122        // Should stop at the non-retryable error
1123        assert_eq!(attempt_counter.load(Ordering::SeqCst), 3);
1124    }
1125
1126    #[tokio::test]
1127    async fn test_cancellation_during_retry_delay() {
1128        use tokio_util::sync::CancellationToken;
1129
1130        let config = RetryConfig {
1131            max_retries: 10,
1132            initial_delay_ms: 500, // Long delay to ensure cancellation happens during sleep
1133            max_delay_ms: 1000,
1134            backoff_factor: 2.0,
1135            jitter_ms: 0,
1136            operation_timeout_ms: None,
1137            immediate_first: false,
1138            max_elapsed_ms: None,
1139        };
1140        let manager = RetryManager::new(config);
1141
1142        let token = CancellationToken::new();
1143        let token_clone = token.clone();
1144
1145        // Cancel after a short delay
1146        tokio::spawn(async move {
1147            tokio::time::sleep(Duration::from_millis(100)).await;
1148            token_clone.cancel();
1149        });
1150
1151        let attempt_counter = Arc::new(AtomicU32::new(0));
1152        let counter_clone = attempt_counter.clone();
1153
1154        let start = tokio::time::Instant::now();
1155        let result = manager
1156            .execute_with_retry_with_cancel(
1157                "test_cancellation",
1158                move || {
1159                    let counter = counter_clone.clone();
1160                    async move {
1161                        counter.fetch_add(1, Ordering::SeqCst);
1162                        Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1163                    }
1164                },
1165                should_retry_test_error,
1166                create_test_error,
1167                &token,
1168            )
1169            .await;
1170
1171        let elapsed = start.elapsed();
1172
1173        // Should be canceled quickly
1174        assert!(result.is_err());
1175        let error_msg = format!("{}", result.unwrap_err());
1176        assert!(error_msg.contains("canceled"));
1177
1178        // Should not have taken the full delay time
1179        assert!(elapsed.as_millis() < 600);
1180
1181        // Should have made at least one attempt
1182        let attempts = attempt_counter.load(Ordering::SeqCst);
1183        assert!(attempts >= 1);
1184    }
1185
1186    #[tokio::test]
1187    async fn test_cancellation_during_operation_execution() {
1188        use tokio_util::sync::CancellationToken;
1189
1190        let config = RetryConfig {
1191            max_retries: 5,
1192            initial_delay_ms: 50,
1193            max_delay_ms: 100,
1194            backoff_factor: 2.0,
1195            jitter_ms: 0,
1196            operation_timeout_ms: None,
1197            immediate_first: false,
1198            max_elapsed_ms: None,
1199        };
1200        let manager = RetryManager::new(config);
1201
1202        let token = CancellationToken::new();
1203        let token_clone = token.clone();
1204
1205        // Cancel after a short delay
1206        tokio::spawn(async move {
1207            tokio::time::sleep(Duration::from_millis(50)).await;
1208            token_clone.cancel();
1209        });
1210
1211        let start = tokio::time::Instant::now();
1212        let result = manager
1213            .execute_with_retry_with_cancel(
1214                "test_cancellation_during_op",
1215                || async {
1216                    // Long-running operation
1217                    tokio::time::sleep(Duration::from_millis(200)).await;
1218                    Ok::<i32, TestError>(42)
1219                },
1220                should_retry_test_error,
1221                create_test_error,
1222                &token,
1223            )
1224            .await;
1225
1226        let elapsed = start.elapsed();
1227
1228        // Should be canceled during the operation
1229        assert!(result.is_err());
1230        let error_msg = format!("{}", result.unwrap_err());
1231        assert!(error_msg.contains("canceled"));
1232
1233        // Should not have completed the long operation
1234        assert!(elapsed.as_millis() < 250);
1235    }
1236
1237    #[tokio::test]
1238    async fn test_cancellation_error_message() {
1239        use tokio_util::sync::CancellationToken;
1240
1241        let config = RetryConfig::default();
1242        let manager = RetryManager::new(config);
1243
1244        let token = CancellationToken::new();
1245        token.cancel(); // Pre-cancel for immediate cancellation
1246
1247        let result = manager
1248            .execute_with_retry_with_cancel(
1249                "test_operation",
1250                || async { Ok::<i32, TestError>(42) },
1251                should_retry_test_error,
1252                create_test_error,
1253                &token,
1254            )
1255            .await;
1256
1257        assert!(result.is_err());
1258        let error_msg = format!("{}", result.unwrap_err());
1259        assert!(error_msg.contains("canceled"));
1260    }
1261}
1262
1263#[cfg(test)]
1264mod proptest_tests {
1265    use std::sync::{
1266        Arc,
1267        atomic::{AtomicU32, Ordering},
1268    };
1269
1270    use nautilus_core::MUTEX_POISONED;
1271    use proptest::prelude::*;
1272    // Import rstest attribute macro used within proptest! tests
1273    use rstest::rstest;
1274
1275    use super::{
1276        test_utils::*,
1277        tests::{advance_until, yield_until},
1278        *,
1279    };
1280
1281    proptest! {
1282        #[rstest]
1283        fn test_retry_config_valid_ranges(
1284            max_retries in 0u32..100,
1285            initial_delay_ms in 1u64..10_000,
1286            max_delay_ms in 1u64..60_000,
1287            backoff_factor in 1.0f64..10.0,
1288            jitter_ms in 0u64..1_000,
1289            operation_timeout_ms in prop::option::of(1u64..120_000),
1290            immediate_first in any::<bool>(),
1291            max_elapsed_ms in prop::option::of(1u64..300_000)
1292        ) {
1293            // Ensure max_delay >= initial_delay for valid config
1294            let max_delay_ms = max_delay_ms.max(initial_delay_ms);
1295
1296            let config = RetryConfig {
1297                max_retries,
1298                initial_delay_ms,
1299                max_delay_ms,
1300                backoff_factor,
1301                jitter_ms,
1302                operation_timeout_ms,
1303                immediate_first,
1304                max_elapsed_ms,
1305            };
1306
1307            // Should always be able to create a RetryManager with valid config
1308            let _manager = RetryManager::<std::io::Error>::new(config);
1309        }
1310
1311        #[rstest]
1312        fn test_retry_attempts_bounded(
1313            max_retries in 0u32..5,
1314            initial_delay_ms in 1u64..10,
1315            backoff_factor in 1.0f64..2.0,
1316        ) {
1317            let rt = tokio::runtime::Builder::new_current_thread()
1318                .enable_time()
1319                .build()
1320                .unwrap();
1321
1322            let config = RetryConfig {
1323                max_retries,
1324                initial_delay_ms,
1325                max_delay_ms: initial_delay_ms * 2,
1326                backoff_factor,
1327                jitter_ms: 0,
1328                operation_timeout_ms: None,
1329                immediate_first: false,
1330                max_elapsed_ms: None,
1331            };
1332
1333            let manager = RetryManager::new(config);
1334            let attempt_counter = Arc::new(AtomicU32::new(0));
1335            let counter_clone = attempt_counter.clone();
1336
1337            let _result = rt.block_on(manager.execute_with_retry(
1338                "prop_test",
1339                move || {
1340                    let counter = counter_clone.clone();
1341                    async move {
1342                        counter.fetch_add(1, Ordering::SeqCst);
1343                        Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1344                    }
1345                },
1346                |e: &TestError| matches!(e, TestError::Retryable(_)),
1347                TestError::Timeout,
1348            ));
1349
1350            let attempts = attempt_counter.load(Ordering::SeqCst);
1351            // Total attempts should be 1 (initial) + max_retries
1352            prop_assert_eq!(attempts, max_retries + 1);
1353        }
1354
1355        #[rstest]
1356        fn test_timeout_always_respected(
1357            timeout_ms in 10u64..50,
1358            operation_delay_ms in 60u64..100,
1359        ) {
1360            let rt = tokio::runtime::Builder::new_current_thread()
1361                .enable_time()
1362                .start_paused(true)
1363                .build()
1364                .unwrap();
1365
1366            let config = RetryConfig {
1367                max_retries: 0, // No retries to isolate timeout behavior
1368                initial_delay_ms: 10,
1369                max_delay_ms: 100,
1370                backoff_factor: 2.0,
1371                jitter_ms: 0,
1372                operation_timeout_ms: Some(timeout_ms),
1373                immediate_first: false,
1374                max_elapsed_ms: None,
1375            };
1376
1377            let manager = RetryManager::new(config);
1378
1379            let result = rt.block_on(async {
1380                let operation_future = manager.execute_with_retry(
1381                    "timeout_test",
1382                    move || async move {
1383                        tokio::time::sleep(Duration::from_millis(operation_delay_ms)).await;
1384                        Ok::<i32, TestError>(42)
1385                    },
1386                    |_: &TestError| true,
1387                    TestError::Timeout,
1388                );
1389
1390                // Advance time to trigger timeout
1391                tokio::time::advance(Duration::from_millis(timeout_ms + 10)).await;
1392                operation_future.await
1393            });
1394
1395            // Operation should timeout
1396            prop_assert!(result.is_err());
1397            prop_assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
1398        }
1399
1400        #[rstest]
1401        fn test_max_elapsed_always_respected(
1402            max_elapsed_ms in 20u64..50,
1403            delay_per_retry in 15u64..30,
1404            max_retries in 10u32..20,
1405        ) {
1406            let rt = tokio::runtime::Builder::new_current_thread()
1407                .enable_time()
1408                .start_paused(true)
1409                .build()
1410                .unwrap();
1411
1412            // Set up config where we would exceed max_elapsed_ms before max_retries
1413            let config = RetryConfig {
1414                max_retries,
1415                initial_delay_ms: delay_per_retry,
1416                max_delay_ms: delay_per_retry * 2,
1417                backoff_factor: 1.0, // No backoff to make timing predictable
1418                jitter_ms: 0,
1419                operation_timeout_ms: None,
1420                immediate_first: false,
1421                max_elapsed_ms: Some(max_elapsed_ms),
1422            };
1423
1424            let manager = RetryManager::new(config);
1425            let attempt_counter = Arc::new(AtomicU32::new(0));
1426            let counter_clone = attempt_counter.clone();
1427
1428            let result = rt.block_on(async {
1429                let operation_future = manager.execute_with_retry(
1430                    "elapsed_test",
1431                    move || {
1432                        let counter = counter_clone.clone();
1433                        async move {
1434                            counter.fetch_add(1, Ordering::SeqCst);
1435                            Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1436                        }
1437                    },
1438                    |e: &TestError| matches!(e, TestError::Retryable(_)),
1439                    TestError::Timeout,
1440                );
1441
1442                // Advance time past max_elapsed_ms
1443                tokio::time::advance(Duration::from_millis(max_elapsed_ms + delay_per_retry)).await;
1444                operation_future.await
1445            });
1446
1447            let attempts = attempt_counter.load(Ordering::SeqCst);
1448
1449            // Should have failed with timeout error
1450            prop_assert!(result.is_err());
1451            prop_assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
1452
1453            // Should have stopped before exhausting all retries
1454            prop_assert!(attempts <= max_retries + 1);
1455        }
1456
1457        #[rstest]
1458        fn test_jitter_bounds(
1459            jitter_ms in 0u64..20,
1460            base_delay_ms in 10u64..30,
1461        ) {
1462            let rt = tokio::runtime::Builder::new_current_thread()
1463                .enable_time()
1464                .start_paused(true)
1465                .build()
1466                .unwrap();
1467
1468            let config = RetryConfig {
1469                max_retries: 2,
1470                initial_delay_ms: base_delay_ms,
1471                max_delay_ms: base_delay_ms * 2,
1472                backoff_factor: 1.0, // No backoff to isolate jitter
1473                jitter_ms,
1474                operation_timeout_ms: None,
1475                immediate_first: false,
1476                max_elapsed_ms: None,
1477            };
1478
1479            let manager = RetryManager::new(config);
1480            let attempt_times = Arc::new(std::sync::Mutex::new(Vec::new()));
1481            let attempt_times_for_block = attempt_times.clone();
1482
1483            rt.block_on(async move {
1484                let attempt_times_for_wait = attempt_times_for_block.clone();
1485                let handle = tokio::spawn({
1486                    let attempt_times_for_task = attempt_times_for_block.clone();
1487                    let manager = manager;
1488                    async move {
1489                        let start_time = tokio::time::Instant::now();
1490                        let _ = manager
1491                            .execute_with_retry(
1492                                "jitter_test",
1493                                move || {
1494                                    let attempt_times_inner = attempt_times_for_task.clone();
1495                                    async move {
1496                                        attempt_times_inner
1497                                            .lock()
1498                                            .unwrap()
1499                                            .push(start_time.elapsed());
1500                                        Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1501                                    }
1502                                },
1503                                |e: &TestError| matches!(e, TestError::Retryable(_)),
1504                                TestError::Timeout,
1505                            )
1506                            .await;
1507                    }
1508                });
1509
1510                yield_until(|| !attempt_times_for_wait.lock().expect(MUTEX_POISONED).is_empty()).await;
1511                advance_until(|| attempt_times_for_wait.lock().expect(MUTEX_POISONED).len() >= 2).await;
1512                advance_until(|| attempt_times_for_wait.lock().expect(MUTEX_POISONED).len() >= 3).await;
1513
1514                handle.await.unwrap();
1515            });
1516
1517            let times = attempt_times.lock().expect(MUTEX_POISONED);
1518
1519            // We expect at least 2 attempts total (initial + at least 1 retry)
1520            prop_assert!(times.len() >= 2);
1521
1522            // First attempt should be immediate (no delay)
1523            prop_assert!(times[0].as_millis() < 5);
1524
1525            // Check subsequent retries have appropriate delays
1526            for i in 1..times.len() {
1527                let delay_from_previous = if i == 1 {
1528                    times[i] - times[0]
1529                } else {
1530                    times[i] - times[i - 1]
1531                };
1532
1533                // The delay should be at least base_delay_ms
1534                prop_assert!(
1535                    delay_from_previous.as_millis() >= base_delay_ms as u128,
1536                    "Retry {} delay {}ms is less than base {}ms",
1537                    i, delay_from_previous.as_millis(), base_delay_ms
1538                );
1539
1540                // Delay should be at most base_delay + jitter
1541                prop_assert!(
1542                    delay_from_previous.as_millis() <= (base_delay_ms + jitter_ms + 1) as u128,
1543                    "Retry {} delay {}ms exceeds base {} + jitter {}",
1544                    i, delay_from_previous.as_millis(), base_delay_ms, jitter_ms
1545                );
1546            }
1547        }
1548
1549        #[rstest]
1550        fn test_immediate_first_property(
1551            immediate_first in any::<bool>(),
1552            initial_delay_ms in 10u64..30,
1553        ) {
1554            let rt = tokio::runtime::Builder::new_current_thread()
1555                .enable_time()
1556                .start_paused(true)
1557                .build()
1558                .unwrap();
1559
1560            let config = RetryConfig {
1561                max_retries: 2,
1562                initial_delay_ms,
1563                max_delay_ms: initial_delay_ms * 2,
1564                backoff_factor: 2.0,
1565                jitter_ms: 0,
1566                operation_timeout_ms: None,
1567                immediate_first,
1568                max_elapsed_ms: None,
1569            };
1570
1571            let manager = RetryManager::new(config);
1572            let attempt_times = Arc::new(std::sync::Mutex::new(Vec::new()));
1573            let attempt_times_for_block = attempt_times.clone();
1574
1575            rt.block_on(async move {
1576                let attempt_times_for_wait = attempt_times_for_block.clone();
1577                let handle = tokio::spawn({
1578                    let attempt_times_for_task = attempt_times_for_block.clone();
1579                    let manager = manager;
1580                    async move {
1581                        let start = tokio::time::Instant::now();
1582                        let _ = manager
1583                            .execute_with_retry(
1584                                "immediate_test",
1585                                move || {
1586                                    let attempt_times_inner = attempt_times_for_task.clone();
1587                                    async move {
1588                                        let elapsed = start.elapsed();
1589                                        attempt_times_inner.lock().expect(MUTEX_POISONED).push(elapsed);
1590                                        Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1591                                    }
1592                                },
1593                                |e: &TestError| matches!(e, TestError::Retryable(_)),
1594                                TestError::Timeout,
1595                            )
1596                            .await;
1597                    }
1598                });
1599
1600                yield_until(|| !attempt_times_for_wait.lock().expect(MUTEX_POISONED).is_empty()).await;
1601                advance_until(|| attempt_times_for_wait.lock().expect(MUTEX_POISONED).len() >= 2).await;
1602                advance_until(|| attempt_times_for_wait.lock().expect(MUTEX_POISONED).len() >= 3).await;
1603
1604                handle.await.unwrap();
1605            });
1606
1607            let times = attempt_times.lock().expect(MUTEX_POISONED);
1608            prop_assert!(times.len() >= 2);
1609
1610            if immediate_first {
1611                // First retry should be immediate
1612                prop_assert!(times[1].as_millis() < 20,
1613                    "With immediate_first=true, first retry took {}ms",
1614                    times[1].as_millis());
1615            } else {
1616                // First retry should have delay
1617                prop_assert!(times[1].as_millis() >= (initial_delay_ms - 1) as u128,
1618                    "With immediate_first=false, first retry was too fast: {}ms",
1619                    times[1].as_millis());
1620            }
1621        }
1622
1623        #[rstest]
1624        fn test_non_retryable_stops_immediately(
1625            attempt_before_non_retryable in 0usize..3,
1626            max_retries in 3u32..5,
1627        ) {
1628            let rt = tokio::runtime::Builder::new_current_thread()
1629                .enable_time()
1630                .build()
1631                .unwrap();
1632
1633            let config = RetryConfig {
1634                max_retries,
1635                initial_delay_ms: 10,
1636                max_delay_ms: 100,
1637                backoff_factor: 2.0,
1638                jitter_ms: 0,
1639                operation_timeout_ms: None,
1640                immediate_first: false,
1641                max_elapsed_ms: None,
1642            };
1643
1644            let manager = RetryManager::new(config);
1645            let attempt_counter = Arc::new(AtomicU32::new(0));
1646            let counter_clone = attempt_counter.clone();
1647
1648            let result: Result<i32, TestError> = rt.block_on(manager.execute_with_retry(
1649                "non_retryable_test",
1650                move || {
1651                    let counter = counter_clone.clone();
1652                    async move {
1653                        let attempts = counter.fetch_add(1, Ordering::SeqCst) as usize;
1654                        if attempts == attempt_before_non_retryable {
1655                            Err(TestError::NonRetryable("stop".to_string()))
1656                        } else {
1657                            Err(TestError::Retryable("retry".to_string()))
1658                        }
1659                    }
1660                },
1661                |e: &TestError| matches!(e, TestError::Retryable(_)),
1662                TestError::Timeout,
1663            ));
1664
1665            let attempts = attempt_counter.load(Ordering::SeqCst) as usize;
1666
1667            prop_assert!(result.is_err());
1668            prop_assert!(matches!(result.unwrap_err(), TestError::NonRetryable(_)));
1669            // Should stop exactly when non-retryable error occurs
1670            prop_assert_eq!(attempts, attempt_before_non_retryable + 1);
1671        }
1672
1673        #[rstest]
1674        fn test_cancellation_stops_immediately(
1675            cancel_after_ms in 10u64..100,
1676            initial_delay_ms in 200u64..500,
1677        ) {
1678            use tokio_util::sync::CancellationToken;
1679
1680            let rt = tokio::runtime::Builder::new_current_thread()
1681                .enable_time()
1682                .start_paused(true)
1683                .build()
1684                .unwrap();
1685
1686            let config = RetryConfig {
1687                max_retries: 10,
1688                initial_delay_ms,
1689                max_delay_ms: initial_delay_ms * 2,
1690                backoff_factor: 2.0,
1691                jitter_ms: 0,
1692                operation_timeout_ms: None,
1693                immediate_first: false,
1694                max_elapsed_ms: None,
1695            };
1696
1697            let manager = RetryManager::new(config);
1698            let token = CancellationToken::new();
1699            let token_clone = token.clone();
1700
1701            let result: Result<i32, TestError> = rt.block_on(async {
1702                // Spawn cancellation task
1703                tokio::spawn(async move {
1704                    tokio::time::sleep(Duration::from_millis(cancel_after_ms)).await;
1705                    token_clone.cancel();
1706                });
1707
1708                let operation_future = manager.execute_with_retry_with_cancel(
1709                    "cancellation_test",
1710                    || async {
1711                        Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1712                    },
1713                    |e: &TestError| matches!(e, TestError::Retryable(_)),
1714                    create_test_error,
1715                    &token,
1716                );
1717
1718                // Advance time to trigger cancellation
1719                tokio::time::advance(Duration::from_millis(cancel_after_ms + 10)).await;
1720                operation_future.await
1721            });
1722
1723            // Should be canceled
1724            prop_assert!(result.is_err());
1725            let error_msg = format!("{}", result.unwrap_err());
1726            prop_assert!(error_msg.contains("canceled"));
1727        }
1728
1729        #[rstest]
1730        fn test_budget_clamp_prevents_overshoot(
1731            max_elapsed_ms in 10u64..30,
1732            delay_per_retry in 20u64..50,
1733        ) {
1734            let rt = tokio::runtime::Builder::new_current_thread()
1735                .enable_time()
1736                .start_paused(true)
1737                .build()
1738                .unwrap();
1739
1740            // Configure so that first retry delay would exceed budget
1741            let config = RetryConfig {
1742                max_retries: 5,
1743                initial_delay_ms: delay_per_retry,
1744                max_delay_ms: delay_per_retry * 2,
1745                backoff_factor: 1.0,
1746                jitter_ms: 0,
1747                operation_timeout_ms: None,
1748                immediate_first: false,
1749                max_elapsed_ms: Some(max_elapsed_ms),
1750            };
1751
1752            let manager = RetryManager::new(config);
1753
1754            let _result = rt.block_on(async {
1755                let operation_future = manager.execute_with_retry(
1756                    "budget_clamp_test",
1757                    || async {
1758                        // Fast operation to focus on delay timing
1759                        Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1760                    },
1761                    |e: &TestError| matches!(e, TestError::Retryable(_)),
1762                    create_test_error,
1763                );
1764
1765                // Advance time past max_elapsed_ms
1766                tokio::time::advance(Duration::from_millis(max_elapsed_ms + delay_per_retry)).await;
1767                operation_future.await
1768            });
1769
1770            // With deterministic time, operation completes without wall-clock delay
1771            // The budget constraint is still enforced by the retry manager
1772        }
1773
1774        #[rstest]
1775        fn test_success_on_kth_attempt(
1776            k in 1usize..5,
1777            initial_delay_ms in 5u64..20,
1778        ) {
1779            let rt = tokio::runtime::Builder::new_current_thread()
1780                .enable_time()
1781                .start_paused(true)
1782                .build()
1783                .unwrap();
1784
1785            let config = RetryConfig {
1786                max_retries: 10, // More than k
1787                initial_delay_ms,
1788                max_delay_ms: initial_delay_ms * 4,
1789                backoff_factor: 2.0,
1790                jitter_ms: 0,
1791                operation_timeout_ms: None,
1792                immediate_first: false,
1793                max_elapsed_ms: None,
1794            };
1795
1796            let manager = RetryManager::new(config);
1797            let attempt_counter = Arc::new(AtomicU32::new(0));
1798            let counter_clone = attempt_counter.clone();
1799            let target_k = k;
1800
1801            let (result, _elapsed) = rt.block_on(async {
1802                let start = tokio::time::Instant::now();
1803
1804                let operation_future = manager.execute_with_retry(
1805                    "kth_attempt_test",
1806                    move || {
1807                        let counter = counter_clone.clone();
1808                        async move {
1809                            let attempt = counter.fetch_add(1, Ordering::SeqCst) as usize;
1810                            if attempt + 1 == target_k {
1811                                Ok(42)
1812                            } else {
1813                                Err(TestError::Retryable("retry".to_string()))
1814                            }
1815                        }
1816                    },
1817                    |e: &TestError| matches!(e, TestError::Retryable(_)),
1818                    create_test_error,
1819                );
1820
1821                // Advance time to allow enough retries
1822                for _ in 0..k {
1823                    tokio::time::advance(Duration::from_millis(initial_delay_ms * 4)).await;
1824                }
1825
1826                let result = operation_future.await;
1827                let elapsed = start.elapsed();
1828
1829                (result, elapsed)
1830            });
1831
1832            let attempts = attempt_counter.load(Ordering::SeqCst) as usize;
1833
1834            // Using paused Tokio time (start_paused + advance); assert behavior only (no wall-clock timing)
1835            prop_assert!(result.is_ok());
1836            prop_assert_eq!(result.unwrap(), 42);
1837            prop_assert_eq!(attempts, k);
1838        }
1839    }
1840}