nautilus_network/
retry.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Generic retry mechanism for network operations.
17
18use std::{future::Future, marker::PhantomData, time::Duration};
19
20use tokio_util::sync::CancellationToken;
21
22use crate::backoff::ExponentialBackoff;
23
24/// Configuration for retry behavior.
25#[derive(Debug, Clone)]
26pub struct RetryConfig {
27    /// Maximum number of retry attempts (total attempts = 1 initial + `max_retries`).
28    pub max_retries: u32,
29    /// Initial delay between retries in milliseconds.
30    pub initial_delay_ms: u64,
31    /// Maximum delay between retries in milliseconds.
32    pub max_delay_ms: u64,
33    /// Backoff multiplier factor.
34    pub backoff_factor: f64,
35    /// Maximum jitter in milliseconds to add to delays.
36    pub jitter_ms: u64,
37    /// Optional timeout for individual operations in milliseconds.
38    /// If None, no timeout is applied.
39    pub operation_timeout_ms: Option<u64>,
40    /// Whether the first retry should happen immediately without delay.
41    /// Should be false for HTTP/order operations, true for connection operations.
42    pub immediate_first: bool,
43    /// Optional maximum total elapsed time for all retries in milliseconds.
44    /// If exceeded, retries stop even if `max_retries` hasn't been reached.
45    pub max_elapsed_ms: Option<u64>,
46}
47
48impl Default for RetryConfig {
49    fn default() -> Self {
50        Self {
51            max_retries: 3,
52            initial_delay_ms: 1_000,
53            max_delay_ms: 10_000,
54            backoff_factor: 2.0,
55            jitter_ms: 100,
56            operation_timeout_ms: Some(30_000),
57            immediate_first: false,
58            max_elapsed_ms: None,
59        }
60    }
61}
62
63/// Generic retry manager for network operations.
64///
65/// Stateless and thread-safe - each operation maintains its own backoff state.
66#[derive(Clone, Debug)]
67pub struct RetryManager<E> {
68    config: RetryConfig,
69    _phantom: PhantomData<E>,
70}
71
72impl<E> RetryManager<E>
73where
74    E: std::error::Error,
75{
76    /// Creates a new retry manager with the given configuration.
77    ///
78    /// # Errors
79    ///
80    /// Returns an error if the configuration is invalid.
81    pub const fn new(config: RetryConfig) -> anyhow::Result<Self> {
82        Ok(Self {
83            config,
84            _phantom: PhantomData,
85        })
86    }
87
88    /// Executes an operation with retry logic and optional cancellation.
89    ///
90    /// Cancellation is checked at three points:
91    /// (1) Before each operation attempt.
92    /// (2) During operation execution (via `tokio::select!`).
93    /// (3) During retry delays.
94    ///
95    /// This means cancellation may be delayed by up to one operation timeout if it occurs mid-execution.
96    ///
97    /// # Errors
98    ///
99    /// Returns an error if the operation fails after exhausting all retries,
100    /// if the operation times out, if creating the backoff state fails, or if canceled.
101    pub async fn execute_with_retry_inner<F, Fut, T>(
102        &self,
103        operation_name: &str,
104        mut operation: F,
105        should_retry: impl Fn(&E) -> bool,
106        create_error: impl Fn(String) -> E,
107        cancel: Option<&CancellationToken>,
108    ) -> Result<T, E>
109    where
110        F: FnMut() -> Fut,
111        Fut: Future<Output = Result<T, E>>,
112    {
113        let mut backoff = ExponentialBackoff::new(
114            Duration::from_millis(self.config.initial_delay_ms),
115            Duration::from_millis(self.config.max_delay_ms),
116            self.config.backoff_factor,
117            self.config.jitter_ms,
118            self.config.immediate_first,
119        )
120        .map_err(|e| create_error(format!("Invalid configuration: {e}")))?;
121
122        let mut attempt = 0;
123        let start_time = tokio::time::Instant::now();
124
125        loop {
126            if let Some(token) = cancel
127                && token.is_cancelled()
128            {
129                tracing::debug!(
130                    operation = %operation_name,
131                    attempts = attempt,
132                    "Operation canceled"
133                );
134                return Err(create_error("canceled".to_string()));
135            }
136
137            if let Some(max_elapsed_ms) = self.config.max_elapsed_ms {
138                let elapsed = start_time.elapsed();
139                if elapsed.as_millis() >= u128::from(max_elapsed_ms) {
140                    let e = create_error("Budget exceeded".to_string());
141                    tracing::trace!(
142                        operation = %operation_name,
143                        attempts = attempt + 1,
144                        budget_ms = max_elapsed_ms,
145                        "Retry budget exceeded"
146                    );
147                    return Err(e);
148                }
149            }
150
151            let result = match (self.config.operation_timeout_ms, cancel) {
152                (Some(timeout_ms), Some(token)) => {
153                    tokio::select! {
154                        result = tokio::time::timeout(Duration::from_millis(timeout_ms), operation()) => result,
155                        () = token.cancelled() => {
156                            tracing::debug!(
157                                operation = %operation_name,
158                                "Operation canceled during execution"
159                            );
160                            return Err(create_error("canceled".to_string()));
161                        }
162                    }
163                }
164                (Some(timeout_ms), None) => {
165                    tokio::time::timeout(Duration::from_millis(timeout_ms), operation()).await
166                }
167                (None, Some(token)) => {
168                    tokio::select! {
169                        result = operation() => Ok(result),
170                        () = token.cancelled() => {
171                            tracing::debug!(
172                                operation = %operation_name,
173                                "Operation canceled during execution"
174                            );
175                            return Err(create_error("canceled".to_string()));
176                        }
177                    }
178                }
179                (None, None) => Ok(operation().await),
180            };
181
182            match result {
183                Ok(Ok(success)) => {
184                    if attempt > 0 {
185                        tracing::trace!(
186                            operation = %operation_name,
187                            attempts = attempt + 1,
188                            "Retry succeeded"
189                        );
190                    }
191                    return Ok(success);
192                }
193                Ok(Err(e)) => {
194                    if !should_retry(&e) {
195                        tracing::trace!(
196                            operation = %operation_name,
197                            error = %e,
198                            "Non-retryable error"
199                        );
200                        return Err(e);
201                    }
202
203                    if attempt >= self.config.max_retries {
204                        tracing::trace!(
205                            operation = %operation_name,
206                            attempts = attempt + 1,
207                            error = %e,
208                            "Retries exhausted"
209                        );
210                        return Err(e);
211                    }
212
213                    let mut delay = backoff.next_duration();
214
215                    if let Some(max_elapsed_ms) = self.config.max_elapsed_ms {
216                        let elapsed = start_time.elapsed();
217                        let remaining =
218                            Duration::from_millis(max_elapsed_ms).saturating_sub(elapsed);
219
220                        if remaining.is_zero() {
221                            let e = create_error("Budget exceeded".to_string());
222                            tracing::trace!(
223                                operation = %operation_name,
224                                attempts = attempt + 1,
225                                budget_ms = max_elapsed_ms,
226                                "Retry budget exceeded"
227                            );
228                            return Err(e);
229                        }
230
231                        delay = delay.min(remaining);
232                    }
233
234                    tracing::trace!(
235                        operation = %operation_name,
236                        attempt = attempt + 1,
237                        delay_ms = delay.as_millis() as u64,
238                        error = %e,
239                        "Retrying after failure"
240                    );
241
242                    // Yield even on zero-delay to avoid busy-wait loop
243                    if delay.is_zero() {
244                        tokio::task::yield_now().await;
245                        attempt += 1;
246                        continue;
247                    }
248
249                    if let Some(token) = cancel {
250                        tokio::select! {
251                            () = tokio::time::sleep(delay) => {},
252                            () = token.cancelled() => {
253                                tracing::debug!(
254                                    operation = %operation_name,
255                                    attempt = attempt + 1,
256                                    "Operation canceled during retry delay"
257                                );
258                                return Err(create_error("canceled".to_string()));
259                            }
260                        }
261                    } else {
262                        tokio::time::sleep(delay).await;
263                    }
264                    attempt += 1;
265                }
266                Err(_) => {
267                    let e = create_error(format!(
268                        "Timed out after {}ms",
269                        self.config.operation_timeout_ms.unwrap_or(0)
270                    ));
271
272                    if !should_retry(&e) {
273                        tracing::trace!(
274                            operation = %operation_name,
275                            error = %e,
276                            "Non-retryable timeout"
277                        );
278                        return Err(e);
279                    }
280
281                    if attempt >= self.config.max_retries {
282                        tracing::trace!(
283                            operation = %operation_name,
284                            attempts = attempt + 1,
285                            error = %e,
286                            "Retries exhausted after timeout"
287                        );
288                        return Err(e);
289                    }
290
291                    let mut delay = backoff.next_duration();
292
293                    if let Some(max_elapsed_ms) = self.config.max_elapsed_ms {
294                        let elapsed = start_time.elapsed();
295                        let remaining =
296                            Duration::from_millis(max_elapsed_ms).saturating_sub(elapsed);
297
298                        if remaining.is_zero() {
299                            let e = create_error("Budget exceeded".to_string());
300                            tracing::trace!(
301                                operation = %operation_name,
302                                attempts = attempt + 1,
303                                budget_ms = max_elapsed_ms,
304                                "Retry budget exceeded"
305                            );
306                            return Err(e);
307                        }
308
309                        delay = delay.min(remaining);
310                    }
311
312                    tracing::trace!(
313                        operation = %operation_name,
314                        attempt = attempt + 1,
315                        delay_ms = delay.as_millis() as u64,
316                        error = %e,
317                        "Retrying after timeout"
318                    );
319
320                    // Yield even on zero-delay to avoid busy-wait loop
321                    if delay.is_zero() {
322                        tokio::task::yield_now().await;
323                        attempt += 1;
324                        continue;
325                    }
326
327                    if let Some(token) = cancel {
328                        tokio::select! {
329                            () = tokio::time::sleep(delay) => {},
330                            () = token.cancelled() => {
331                                tracing::debug!(
332                                    operation = %operation_name,
333                                    attempt = attempt + 1,
334                                    "Operation canceled during retry delay"
335                                );
336                                return Err(create_error("canceled".to_string()));
337                            }
338                        }
339                    } else {
340                        tokio::time::sleep(delay).await;
341                    }
342                    attempt += 1;
343                }
344            }
345        }
346    }
347
348    /// Executes an operation with retry logic.
349    ///
350    /// # Errors
351    ///
352    /// Returns an error if the operation fails after exhausting all retries,
353    /// if the operation times out, or if creating the backoff state fails.
354    pub async fn execute_with_retry<F, Fut, T>(
355        &self,
356        operation_name: &str,
357        operation: F,
358        should_retry: impl Fn(&E) -> bool,
359        create_error: impl Fn(String) -> E,
360    ) -> Result<T, E>
361    where
362        F: FnMut() -> Fut,
363        Fut: Future<Output = Result<T, E>>,
364    {
365        self.execute_with_retry_inner(operation_name, operation, should_retry, create_error, None)
366            .await
367    }
368
369    /// Executes an operation with retry logic and cancellation support.
370    ///
371    /// # Errors
372    ///
373    /// Returns an error if the operation fails after exhausting all retries,
374    /// if the operation times out, if creating the backoff state fails, or if canceled.
375    pub async fn execute_with_retry_with_cancel<F, Fut, T>(
376        &self,
377        operation_name: &str,
378        operation: F,
379        should_retry: impl Fn(&E) -> bool,
380        create_error: impl Fn(String) -> E,
381        cancellation_token: &CancellationToken,
382    ) -> Result<T, E>
383    where
384        F: FnMut() -> Fut,
385        Fut: Future<Output = Result<T, E>>,
386    {
387        self.execute_with_retry_inner(
388            operation_name,
389            operation,
390            should_retry,
391            create_error,
392            Some(cancellation_token),
393        )
394        .await
395    }
396}
397
398/// Convenience function to create a retry manager with default configuration.
399///
400/// # Errors
401///
402/// Returns an error if the default configuration is invalid.
403pub fn create_default_retry_manager<E>() -> anyhow::Result<RetryManager<E>>
404where
405    E: std::error::Error,
406{
407    RetryManager::new(RetryConfig::default())
408}
409
410/// Convenience function to create a retry manager for HTTP operations.
411///
412/// # Errors
413///
414/// Returns an error if the HTTP configuration is invalid.
415pub const fn create_http_retry_manager<E>() -> anyhow::Result<RetryManager<E>>
416where
417    E: std::error::Error,
418{
419    let config = RetryConfig {
420        max_retries: 3,
421        initial_delay_ms: 1_000,
422        max_delay_ms: 10_000,
423        backoff_factor: 2.0,
424        jitter_ms: 1_000,
425        operation_timeout_ms: Some(60_000), // 60s for HTTP requests
426        immediate_first: false,
427        max_elapsed_ms: Some(180_000), // 3 minutes total budget
428    };
429    RetryManager::new(config)
430}
431
432/// Convenience function to create a retry manager for WebSocket operations.
433///
434/// # Errors
435///
436/// Returns an error if the WebSocket configuration is invalid.
437pub const fn create_websocket_retry_manager<E>() -> anyhow::Result<RetryManager<E>>
438where
439    E: std::error::Error,
440{
441    let config = RetryConfig {
442        max_retries: 5,
443        initial_delay_ms: 1_000,
444        max_delay_ms: 10_000,
445        backoff_factor: 2.0,
446        jitter_ms: 1_000,
447        operation_timeout_ms: Some(30_000), // 30s for WebSocket operations
448        immediate_first: true,
449        max_elapsed_ms: Some(120_000), // 2 minutes total budget
450    };
451    RetryManager::new(config)
452}
453
454////////////////////////////////////////////////////////////////////////////////
455// Tests
456////////////////////////////////////////////////////////////////////////////////
457
458#[cfg(test)]
459mod test_utils {
460    #[derive(Debug, thiserror::Error)]
461    pub enum TestError {
462        #[error("Retryable error: {0}")]
463        Retryable(String),
464        #[error("Non-retryable error: {0}")]
465        NonRetryable(String),
466        #[error("Timeout error: {0}")]
467        Timeout(String),
468    }
469
470    pub fn should_retry_test_error(error: &TestError) -> bool {
471        matches!(error, TestError::Retryable(_))
472    }
473
474    pub fn create_test_error(msg: String) -> TestError {
475        TestError::Timeout(msg)
476    }
477}
478
479#[cfg(test)]
480mod tests {
481    use std::sync::{
482        Arc,
483        atomic::{AtomicU32, Ordering},
484    };
485
486    use nautilus_core::MUTEX_POISONED;
487    use rstest::rstest;
488
489    use super::{test_utils::*, *};
490
491    const MAX_WAIT_ITERS: usize = 10_000;
492    const MAX_ADVANCE_ITERS: usize = 10_000;
493
494    pub(crate) async fn yield_until<F>(mut condition: F)
495    where
496        F: FnMut() -> bool,
497    {
498        for _ in 0..MAX_WAIT_ITERS {
499            if condition() {
500                return;
501            }
502            tokio::task::yield_now().await;
503        }
504
505        panic!("yield_until timed out waiting for condition");
506    }
507
508    pub(crate) async fn advance_until<F>(mut condition: F)
509    where
510        F: FnMut() -> bool,
511    {
512        for _ in 0..MAX_ADVANCE_ITERS {
513            if condition() {
514                return;
515            }
516            tokio::time::advance(Duration::from_millis(1)).await;
517            tokio::task::yield_now().await;
518        }
519
520        panic!("advance_until timed out waiting for condition");
521    }
522
523    #[rstest]
524    fn test_retry_config_default() {
525        let config = RetryConfig::default();
526        assert_eq!(config.max_retries, 3);
527        assert_eq!(config.initial_delay_ms, 1_000);
528        assert_eq!(config.max_delay_ms, 10_000);
529        assert_eq!(config.backoff_factor, 2.0);
530        assert_eq!(config.jitter_ms, 100);
531        assert_eq!(config.operation_timeout_ms, Some(30_000));
532        assert!(!config.immediate_first);
533        assert_eq!(config.max_elapsed_ms, None);
534    }
535
536    #[tokio::test]
537    async fn test_retry_manager_success_first_attempt() {
538        let manager = RetryManager::new(RetryConfig::default()).unwrap();
539
540        let result = manager
541            .execute_with_retry(
542                "test_operation",
543                || async { Ok::<i32, TestError>(42) },
544                should_retry_test_error,
545                create_test_error,
546            )
547            .await;
548
549        assert_eq!(result.unwrap(), 42);
550    }
551
552    #[tokio::test]
553    async fn test_retry_manager_non_retryable_error() {
554        let manager = RetryManager::new(RetryConfig::default()).unwrap();
555
556        let result = manager
557            .execute_with_retry(
558                "test_operation",
559                || async { Err::<i32, TestError>(TestError::NonRetryable("test".to_string())) },
560                should_retry_test_error,
561                create_test_error,
562            )
563            .await;
564
565        assert!(result.is_err());
566        assert!(matches!(result.unwrap_err(), TestError::NonRetryable(_)));
567    }
568
569    #[tokio::test]
570    async fn test_retry_manager_retryable_error_exhausted() {
571        let config = RetryConfig {
572            max_retries: 2,
573            initial_delay_ms: 10,
574            max_delay_ms: 50,
575            backoff_factor: 2.0,
576            jitter_ms: 0,
577            operation_timeout_ms: None,
578            immediate_first: false,
579            max_elapsed_ms: None,
580        };
581        let manager = RetryManager::new(config).unwrap();
582
583        let result = manager
584            .execute_with_retry(
585                "test_operation",
586                || async { Err::<i32, TestError>(TestError::Retryable("test".to_string())) },
587                should_retry_test_error,
588                create_test_error,
589            )
590            .await;
591
592        assert!(result.is_err());
593        assert!(matches!(result.unwrap_err(), TestError::Retryable(_)));
594    }
595
596    #[tokio::test]
597    async fn test_timeout_path() {
598        let config = RetryConfig {
599            max_retries: 2,
600            initial_delay_ms: 10,
601            max_delay_ms: 50,
602            backoff_factor: 2.0,
603            jitter_ms: 0,
604            operation_timeout_ms: Some(50),
605            immediate_first: false,
606            max_elapsed_ms: None,
607        };
608        let manager = RetryManager::new(config).unwrap();
609
610        let result = manager
611            .execute_with_retry(
612                "test_timeout",
613                || async {
614                    tokio::time::sleep(Duration::from_millis(100)).await;
615                    Ok::<i32, TestError>(42)
616                },
617                should_retry_test_error,
618                create_test_error,
619            )
620            .await;
621
622        assert!(result.is_err());
623        assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
624    }
625
626    #[tokio::test]
627    async fn test_max_elapsed_time_budget() {
628        let config = RetryConfig {
629            max_retries: 10,
630            initial_delay_ms: 50,
631            max_delay_ms: 100,
632            backoff_factor: 2.0,
633            jitter_ms: 0,
634            operation_timeout_ms: None,
635            immediate_first: false,
636            max_elapsed_ms: Some(200),
637        };
638        let manager = RetryManager::new(config).unwrap();
639
640        let start = tokio::time::Instant::now();
641        let result = manager
642            .execute_with_retry(
643                "test_budget",
644                || async { Err::<i32, TestError>(TestError::Retryable("test".to_string())) },
645                should_retry_test_error,
646                create_test_error,
647            )
648            .await;
649
650        let elapsed = start.elapsed();
651        assert!(result.is_err());
652        assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
653        assert!(elapsed.as_millis() >= 150);
654        assert!(elapsed.as_millis() < 1000);
655    }
656
657    #[rstest]
658    fn test_http_retry_manager_config() {
659        let manager = create_http_retry_manager::<TestError>().unwrap();
660        assert_eq!(manager.config.max_retries, 3);
661        assert!(!manager.config.immediate_first);
662        assert_eq!(manager.config.max_elapsed_ms, Some(180_000));
663    }
664
665    #[rstest]
666    fn test_websocket_retry_manager_config() {
667        let manager = create_websocket_retry_manager::<TestError>().unwrap();
668        assert_eq!(manager.config.max_retries, 5);
669        assert!(manager.config.immediate_first);
670        assert_eq!(manager.config.max_elapsed_ms, Some(120_000));
671    }
672
673    #[tokio::test]
674    async fn test_timeout_respects_retry_predicate() {
675        let config = RetryConfig {
676            max_retries: 3,
677            initial_delay_ms: 10,
678            max_delay_ms: 50,
679            backoff_factor: 2.0,
680            jitter_ms: 0,
681            operation_timeout_ms: Some(50),
682            immediate_first: false,
683            max_elapsed_ms: None,
684        };
685        let manager = RetryManager::new(config).unwrap();
686
687        // Test with retry predicate that rejects timeouts
688        let should_not_retry_timeouts = |error: &TestError| !matches!(error, TestError::Timeout(_));
689
690        let result = manager
691            .execute_with_retry(
692                "test_timeout_non_retryable",
693                || async {
694                    tokio::time::sleep(Duration::from_millis(100)).await;
695                    Ok::<i32, TestError>(42)
696                },
697                should_not_retry_timeouts,
698                create_test_error,
699            )
700            .await;
701
702        // Should fail immediately without retries since timeout is non-retryable
703        assert!(result.is_err());
704        assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
705    }
706
707    #[tokio::test]
708    async fn test_timeout_retries_when_predicate_allows() {
709        let config = RetryConfig {
710            max_retries: 2,
711            initial_delay_ms: 10,
712            max_delay_ms: 50,
713            backoff_factor: 2.0,
714            jitter_ms: 0,
715            operation_timeout_ms: Some(50),
716            immediate_first: false,
717            max_elapsed_ms: None,
718        };
719        let manager = RetryManager::new(config).unwrap();
720
721        // Test with retry predicate that allows timeouts
722        let should_retry_timeouts = |error: &TestError| matches!(error, TestError::Timeout(_));
723
724        let start = tokio::time::Instant::now();
725        let result = manager
726            .execute_with_retry(
727                "test_timeout_retryable",
728                || async {
729                    tokio::time::sleep(Duration::from_millis(100)).await;
730                    Ok::<i32, TestError>(42)
731                },
732                should_retry_timeouts,
733                create_test_error,
734            )
735            .await;
736
737        let elapsed = start.elapsed();
738
739        // Should fail after retries (not immediately)
740        assert!(result.is_err());
741        assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
742        // Should have taken time for retries (at least 2 timeouts + delays)
743        assert!(elapsed.as_millis() > 80); // More than just one timeout
744    }
745
746    #[tokio::test]
747    async fn test_successful_retry_after_failures() {
748        let config = RetryConfig {
749            max_retries: 3,
750            initial_delay_ms: 10,
751            max_delay_ms: 50,
752            backoff_factor: 2.0,
753            jitter_ms: 0,
754            operation_timeout_ms: None,
755            immediate_first: false,
756            max_elapsed_ms: None,
757        };
758        let manager = RetryManager::new(config).unwrap();
759
760        let attempt_counter = Arc::new(AtomicU32::new(0));
761        let counter_clone = attempt_counter.clone();
762
763        let result = manager
764            .execute_with_retry(
765                "test_eventual_success",
766                move || {
767                    let counter = counter_clone.clone();
768                    async move {
769                        let attempts = counter.fetch_add(1, Ordering::SeqCst);
770                        if attempts < 2 {
771                            Err(TestError::Retryable("temporary failure".to_string()))
772                        } else {
773                            Ok(42)
774                        }
775                    }
776                },
777                should_retry_test_error,
778                create_test_error,
779            )
780            .await;
781
782        assert_eq!(result.unwrap(), 42);
783        assert_eq!(attempt_counter.load(Ordering::SeqCst), 3);
784    }
785
786    #[tokio::test(start_paused = true)]
787    async fn test_immediate_first_retry() {
788        let config = RetryConfig {
789            max_retries: 2,
790            initial_delay_ms: 100,
791            max_delay_ms: 200,
792            backoff_factor: 2.0,
793            jitter_ms: 0,
794            operation_timeout_ms: None,
795            immediate_first: true,
796            max_elapsed_ms: None,
797        };
798        let manager = RetryManager::new(config).unwrap();
799
800        let attempt_times = Arc::new(std::sync::Mutex::new(Vec::new()));
801        let times_clone = attempt_times.clone();
802        let start = tokio::time::Instant::now();
803
804        let handle = tokio::spawn({
805            let times_clone = times_clone.clone();
806            async move {
807                let _ = manager
808                    .execute_with_retry(
809                        "test_immediate",
810                        move || {
811                            let times = times_clone.clone();
812                            async move {
813                                times.lock().expect(MUTEX_POISONED).push(start.elapsed());
814                                Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
815                            }
816                        },
817                        should_retry_test_error,
818                        create_test_error,
819                    )
820                    .await;
821            }
822        });
823
824        // Allow initial attempt and immediate retry to run without advancing time
825        yield_until(|| attempt_times.lock().expect(MUTEX_POISONED).len() >= 2).await;
826
827        // Advance time for the next backoff interval
828        tokio::time::advance(Duration::from_millis(100)).await;
829        tokio::task::yield_now().await;
830
831        // Wait for the final retry to be recorded
832        yield_until(|| attempt_times.lock().expect(MUTEX_POISONED).len() >= 3).await;
833
834        handle.await.unwrap();
835
836        let times = attempt_times.lock().expect(MUTEX_POISONED);
837        assert_eq!(times.len(), 3); // Initial + 2 retries
838
839        // First retry should be immediate (within 1ms tolerance)
840        assert!(times[1] <= Duration::from_millis(1));
841        // Second retry should have backoff delay (at least 100ms from start)
842        assert!(times[2] >= Duration::from_millis(100));
843        assert!(times[2] <= Duration::from_millis(110));
844    }
845
846    #[tokio::test]
847    async fn test_operation_without_timeout() {
848        let config = RetryConfig {
849            max_retries: 2,
850            initial_delay_ms: 10,
851            max_delay_ms: 50,
852            backoff_factor: 2.0,
853            jitter_ms: 0,
854            operation_timeout_ms: None, // No timeout
855            immediate_first: false,
856            max_elapsed_ms: None,
857        };
858        let manager = RetryManager::new(config).unwrap();
859
860        let start = tokio::time::Instant::now();
861        let result = manager
862            .execute_with_retry(
863                "test_no_timeout",
864                || async {
865                    tokio::time::sleep(Duration::from_millis(50)).await;
866                    Ok::<i32, TestError>(42)
867                },
868                should_retry_test_error,
869                create_test_error,
870            )
871            .await;
872
873        let elapsed = start.elapsed();
874        assert_eq!(result.unwrap(), 42);
875        // Should complete without timing out
876        assert!(elapsed.as_millis() >= 30);
877        assert!(elapsed.as_millis() < 200);
878    }
879
880    #[tokio::test]
881    async fn test_zero_retries() {
882        let config = RetryConfig {
883            max_retries: 0,
884            initial_delay_ms: 10,
885            max_delay_ms: 50,
886            backoff_factor: 2.0,
887            jitter_ms: 0,
888            operation_timeout_ms: None,
889            immediate_first: false,
890            max_elapsed_ms: None,
891        };
892        let manager = RetryManager::new(config).unwrap();
893
894        let attempt_counter = Arc::new(AtomicU32::new(0));
895        let counter_clone = attempt_counter.clone();
896
897        let result = manager
898            .execute_with_retry(
899                "test_no_retries",
900                move || {
901                    let counter = counter_clone.clone();
902                    async move {
903                        counter.fetch_add(1, Ordering::SeqCst);
904                        Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
905                    }
906                },
907                should_retry_test_error,
908                create_test_error,
909            )
910            .await;
911
912        assert!(result.is_err());
913        // Should only attempt once (no retries)
914        assert_eq!(attempt_counter.load(Ordering::SeqCst), 1);
915    }
916
917    #[tokio::test(start_paused = true)]
918    async fn test_jitter_applied() {
919        let config = RetryConfig {
920            max_retries: 2,
921            initial_delay_ms: 50,
922            max_delay_ms: 100,
923            backoff_factor: 2.0,
924            jitter_ms: 50, // Significant jitter
925            operation_timeout_ms: None,
926            immediate_first: false,
927            max_elapsed_ms: None,
928        };
929        let manager = RetryManager::new(config).unwrap();
930
931        let delays = Arc::new(std::sync::Mutex::new(Vec::new()));
932        let delays_clone = delays.clone();
933        let last_time = Arc::new(std::sync::Mutex::new(tokio::time::Instant::now()));
934        let last_time_clone = last_time.clone();
935
936        let handle = tokio::spawn({
937            let delays_clone = delays_clone.clone();
938            async move {
939                let _ = manager
940                    .execute_with_retry(
941                        "test_jitter",
942                        move || {
943                            let delays = delays_clone.clone();
944                            let last_time = last_time_clone.clone();
945                            async move {
946                                let now = tokio::time::Instant::now();
947                                let delay = {
948                                    let mut last = last_time.lock().expect(MUTEX_POISONED);
949                                    let d = now.duration_since(*last);
950                                    *last = now;
951                                    d
952                                };
953                                delays.lock().expect(MUTEX_POISONED).push(delay);
954                                Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
955                            }
956                        },
957                        should_retry_test_error,
958                        create_test_error,
959                    )
960                    .await;
961            }
962        });
963
964        yield_until(|| !delays.lock().expect(MUTEX_POISONED).is_empty()).await;
965        advance_until(|| delays.lock().expect(MUTEX_POISONED).len() >= 2).await;
966        advance_until(|| delays.lock().expect(MUTEX_POISONED).len() >= 3).await;
967
968        handle.await.unwrap();
969
970        let delays = delays.lock().expect(MUTEX_POISONED);
971        // Skip the first delay (initial attempt)
972        for delay in delays.iter().skip(1) {
973            // Each delay should be at least the base delay (50ms for first retry)
974            assert!(delay.as_millis() >= 50);
975            // But no more than base + jitter (allow small tolerance for step advance)
976            assert!(delay.as_millis() <= 151);
977        }
978    }
979
980    #[tokio::test]
981    async fn test_max_elapsed_stops_early() {
982        let config = RetryConfig {
983            max_retries: 100, // Very high retry count
984            initial_delay_ms: 50,
985            max_delay_ms: 100,
986            backoff_factor: 1.5,
987            jitter_ms: 0,
988            operation_timeout_ms: None,
989            immediate_first: false,
990            max_elapsed_ms: Some(150), // Should stop after ~3 attempts
991        };
992        let manager = RetryManager::new(config).unwrap();
993
994        let attempt_counter = Arc::new(AtomicU32::new(0));
995        let counter_clone = attempt_counter.clone();
996
997        let start = tokio::time::Instant::now();
998        let result = manager
999            .execute_with_retry(
1000                "test_elapsed_limit",
1001                move || {
1002                    let counter = counter_clone.clone();
1003                    async move {
1004                        counter.fetch_add(1, Ordering::SeqCst);
1005                        Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1006                    }
1007                },
1008                should_retry_test_error,
1009                create_test_error,
1010            )
1011            .await;
1012
1013        let elapsed = start.elapsed();
1014        assert!(result.is_err());
1015        assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
1016
1017        // Should have stopped due to time limit, not retry count
1018        let attempts = attempt_counter.load(Ordering::SeqCst);
1019        assert!(attempts < 10); // Much less than max_retries
1020        assert!(elapsed.as_millis() >= 100);
1021    }
1022
1023    #[tokio::test]
1024    async fn test_mixed_errors_retry_behavior() {
1025        let config = RetryConfig {
1026            max_retries: 5,
1027            initial_delay_ms: 10,
1028            max_delay_ms: 50,
1029            backoff_factor: 2.0,
1030            jitter_ms: 0,
1031            operation_timeout_ms: None,
1032            immediate_first: false,
1033            max_elapsed_ms: None,
1034        };
1035        let manager = RetryManager::new(config).unwrap();
1036
1037        let attempt_counter = Arc::new(AtomicU32::new(0));
1038        let counter_clone = attempt_counter.clone();
1039
1040        let result = manager
1041            .execute_with_retry(
1042                "test_mixed_errors",
1043                move || {
1044                    let counter = counter_clone.clone();
1045                    async move {
1046                        let attempts = counter.fetch_add(1, Ordering::SeqCst);
1047                        match attempts {
1048                            0 => Err(TestError::Retryable("retry 1".to_string())),
1049                            1 => Err(TestError::Retryable("retry 2".to_string())),
1050                            2 => Err(TestError::NonRetryable("stop here".to_string())),
1051                            _ => Ok(42),
1052                        }
1053                    }
1054                },
1055                should_retry_test_error,
1056                create_test_error,
1057            )
1058            .await;
1059
1060        assert!(result.is_err());
1061        assert!(matches!(result.unwrap_err(), TestError::NonRetryable(_)));
1062        // Should stop at the non-retryable error
1063        assert_eq!(attempt_counter.load(Ordering::SeqCst), 3);
1064    }
1065
1066    #[tokio::test]
1067    async fn test_cancellation_during_retry_delay() {
1068        use tokio_util::sync::CancellationToken;
1069
1070        let config = RetryConfig {
1071            max_retries: 10,
1072            initial_delay_ms: 500, // Long delay to ensure cancellation happens during sleep
1073            max_delay_ms: 1000,
1074            backoff_factor: 2.0,
1075            jitter_ms: 0,
1076            operation_timeout_ms: None,
1077            immediate_first: false,
1078            max_elapsed_ms: None,
1079        };
1080        let manager = RetryManager::new(config).unwrap();
1081
1082        let token = CancellationToken::new();
1083        let token_clone = token.clone();
1084
1085        // Cancel after a short delay
1086        tokio::spawn(async move {
1087            tokio::time::sleep(Duration::from_millis(100)).await;
1088            token_clone.cancel();
1089        });
1090
1091        let attempt_counter = Arc::new(AtomicU32::new(0));
1092        let counter_clone = attempt_counter.clone();
1093
1094        let start = tokio::time::Instant::now();
1095        let result = manager
1096            .execute_with_retry_with_cancel(
1097                "test_cancellation",
1098                move || {
1099                    let counter = counter_clone.clone();
1100                    async move {
1101                        counter.fetch_add(1, Ordering::SeqCst);
1102                        Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1103                    }
1104                },
1105                should_retry_test_error,
1106                create_test_error,
1107                &token,
1108            )
1109            .await;
1110
1111        let elapsed = start.elapsed();
1112
1113        // Should be canceled quickly
1114        assert!(result.is_err());
1115        let error_msg = format!("{}", result.unwrap_err());
1116        assert!(error_msg.contains("canceled"));
1117
1118        // Should not have taken the full delay time
1119        assert!(elapsed.as_millis() < 600);
1120
1121        // Should have made at least one attempt
1122        let attempts = attempt_counter.load(Ordering::SeqCst);
1123        assert!(attempts >= 1);
1124    }
1125
1126    #[tokio::test]
1127    async fn test_cancellation_during_operation_execution() {
1128        use tokio_util::sync::CancellationToken;
1129
1130        let config = RetryConfig {
1131            max_retries: 5,
1132            initial_delay_ms: 50,
1133            max_delay_ms: 100,
1134            backoff_factor: 2.0,
1135            jitter_ms: 0,
1136            operation_timeout_ms: None,
1137            immediate_first: false,
1138            max_elapsed_ms: None,
1139        };
1140        let manager = RetryManager::new(config).unwrap();
1141
1142        let token = CancellationToken::new();
1143        let token_clone = token.clone();
1144
1145        // Cancel after a short delay
1146        tokio::spawn(async move {
1147            tokio::time::sleep(Duration::from_millis(50)).await;
1148            token_clone.cancel();
1149        });
1150
1151        let start = tokio::time::Instant::now();
1152        let result = manager
1153            .execute_with_retry_with_cancel(
1154                "test_cancellation_during_op",
1155                || async {
1156                    // Long-running operation
1157                    tokio::time::sleep(Duration::from_millis(200)).await;
1158                    Ok::<i32, TestError>(42)
1159                },
1160                should_retry_test_error,
1161                create_test_error,
1162                &token,
1163            )
1164            .await;
1165
1166        let elapsed = start.elapsed();
1167
1168        // Should be canceled during the operation
1169        assert!(result.is_err());
1170        let error_msg = format!("{}", result.unwrap_err());
1171        assert!(error_msg.contains("canceled"));
1172
1173        // Should not have completed the long operation
1174        assert!(elapsed.as_millis() < 250);
1175    }
1176
1177    #[tokio::test]
1178    async fn test_cancellation_error_message() {
1179        use tokio_util::sync::CancellationToken;
1180
1181        let config = RetryConfig::default();
1182        let manager = RetryManager::new(config).unwrap();
1183
1184        let token = CancellationToken::new();
1185        token.cancel(); // Pre-cancel for immediate cancellation
1186
1187        let result = manager
1188            .execute_with_retry_with_cancel(
1189                "test_operation",
1190                || async { Ok::<i32, TestError>(42) },
1191                should_retry_test_error,
1192                create_test_error,
1193                &token,
1194            )
1195            .await;
1196
1197        assert!(result.is_err());
1198        let error_msg = format!("{}", result.unwrap_err());
1199        assert!(error_msg.contains("canceled"));
1200    }
1201}
1202
1203#[cfg(test)]
1204mod proptest_tests {
1205    use std::sync::{
1206        Arc,
1207        atomic::{AtomicU32, Ordering},
1208    };
1209
1210    use nautilus_core::MUTEX_POISONED;
1211    use proptest::prelude::*;
1212    // Import rstest attribute macro used within proptest! tests
1213    use rstest::rstest;
1214
1215    use super::{
1216        test_utils::*,
1217        tests::{advance_until, yield_until},
1218        *,
1219    };
1220
1221    proptest! {
1222        #[rstest]
1223        fn test_retry_config_valid_ranges(
1224            max_retries in 0u32..100,
1225            initial_delay_ms in 1u64..10_000,
1226            max_delay_ms in 1u64..60_000,
1227            backoff_factor in 1.0f64..10.0,
1228            jitter_ms in 0u64..1_000,
1229            operation_timeout_ms in prop::option::of(1u64..120_000),
1230            immediate_first in any::<bool>(),
1231            max_elapsed_ms in prop::option::of(1u64..300_000)
1232        ) {
1233            // Ensure max_delay >= initial_delay for valid config
1234            let max_delay_ms = max_delay_ms.max(initial_delay_ms);
1235
1236            let config = RetryConfig {
1237                max_retries,
1238                initial_delay_ms,
1239                max_delay_ms,
1240                backoff_factor,
1241                jitter_ms,
1242                operation_timeout_ms,
1243                immediate_first,
1244                max_elapsed_ms,
1245            };
1246
1247            // Should always be able to create a RetryManager with valid config
1248            let manager = RetryManager::<std::io::Error>::new(config);
1249            prop_assert!(manager.is_ok());
1250        }
1251
1252        #[rstest]
1253        fn test_retry_attempts_bounded(
1254            max_retries in 0u32..5,
1255            initial_delay_ms in 1u64..10,
1256            backoff_factor in 1.0f64..2.0,
1257        ) {
1258            let rt = tokio::runtime::Builder::new_current_thread()
1259                .enable_time()
1260                .build()
1261                .unwrap();
1262
1263            let config = RetryConfig {
1264                max_retries,
1265                initial_delay_ms,
1266                max_delay_ms: initial_delay_ms * 2,
1267                backoff_factor,
1268                jitter_ms: 0,
1269                operation_timeout_ms: None,
1270                immediate_first: false,
1271                max_elapsed_ms: None,
1272            };
1273
1274            let manager = RetryManager::new(config).unwrap();
1275            let attempt_counter = Arc::new(AtomicU32::new(0));
1276            let counter_clone = attempt_counter.clone();
1277
1278            let _result = rt.block_on(manager.execute_with_retry(
1279                "prop_test",
1280                move || {
1281                    let counter = counter_clone.clone();
1282                    async move {
1283                        counter.fetch_add(1, Ordering::SeqCst);
1284                        Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1285                    }
1286                },
1287                |e: &TestError| matches!(e, TestError::Retryable(_)),
1288                TestError::Timeout,
1289            ));
1290
1291            let attempts = attempt_counter.load(Ordering::SeqCst);
1292            // Total attempts should be 1 (initial) + max_retries
1293            prop_assert_eq!(attempts, max_retries + 1);
1294        }
1295
1296        #[rstest]
1297        fn test_timeout_always_respected(
1298            timeout_ms in 10u64..50,
1299            operation_delay_ms in 60u64..100,
1300        ) {
1301            let rt = tokio::runtime::Builder::new_current_thread()
1302                .enable_time()
1303                .start_paused(true)
1304                .build()
1305                .unwrap();
1306
1307            let config = RetryConfig {
1308                max_retries: 0, // No retries to isolate timeout behavior
1309                initial_delay_ms: 10,
1310                max_delay_ms: 100,
1311                backoff_factor: 2.0,
1312                jitter_ms: 0,
1313                operation_timeout_ms: Some(timeout_ms),
1314                immediate_first: false,
1315                max_elapsed_ms: None,
1316            };
1317
1318            let manager = RetryManager::new(config).unwrap();
1319
1320            let result = rt.block_on(async {
1321                let operation_future = manager.execute_with_retry(
1322                    "timeout_test",
1323                    move || async move {
1324                        tokio::time::sleep(Duration::from_millis(operation_delay_ms)).await;
1325                        Ok::<i32, TestError>(42)
1326                    },
1327                    |_: &TestError| true,
1328                    TestError::Timeout,
1329                );
1330
1331                // Advance time to trigger timeout
1332                tokio::time::advance(Duration::from_millis(timeout_ms + 10)).await;
1333                operation_future.await
1334            });
1335
1336            // Operation should timeout
1337            prop_assert!(result.is_err());
1338            prop_assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
1339        }
1340
1341        #[rstest]
1342        fn test_max_elapsed_always_respected(
1343            max_elapsed_ms in 20u64..50,
1344            delay_per_retry in 15u64..30,
1345            max_retries in 10u32..20,
1346        ) {
1347            let rt = tokio::runtime::Builder::new_current_thread()
1348                .enable_time()
1349                .start_paused(true)
1350                .build()
1351                .unwrap();
1352
1353            // Set up config where we would exceed max_elapsed_ms before max_retries
1354            let config = RetryConfig {
1355                max_retries,
1356                initial_delay_ms: delay_per_retry,
1357                max_delay_ms: delay_per_retry * 2,
1358                backoff_factor: 1.0, // No backoff to make timing predictable
1359                jitter_ms: 0,
1360                operation_timeout_ms: None,
1361                immediate_first: false,
1362                max_elapsed_ms: Some(max_elapsed_ms),
1363            };
1364
1365            let manager = RetryManager::new(config).unwrap();
1366            let attempt_counter = Arc::new(AtomicU32::new(0));
1367            let counter_clone = attempt_counter.clone();
1368
1369            let result = rt.block_on(async {
1370                let operation_future = manager.execute_with_retry(
1371                    "elapsed_test",
1372                    move || {
1373                        let counter = counter_clone.clone();
1374                        async move {
1375                            counter.fetch_add(1, Ordering::SeqCst);
1376                            Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1377                        }
1378                    },
1379                    |e: &TestError| matches!(e, TestError::Retryable(_)),
1380                    TestError::Timeout,
1381                );
1382
1383                // Advance time past max_elapsed_ms
1384                tokio::time::advance(Duration::from_millis(max_elapsed_ms + delay_per_retry)).await;
1385                operation_future.await
1386            });
1387
1388            let attempts = attempt_counter.load(Ordering::SeqCst);
1389
1390            // Should have failed with timeout error
1391            prop_assert!(result.is_err());
1392            prop_assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
1393
1394            // Should have stopped before exhausting all retries
1395            prop_assert!(attempts <= max_retries + 1);
1396        }
1397
1398        #[rstest]
1399        fn test_jitter_bounds(
1400            jitter_ms in 0u64..20,
1401            base_delay_ms in 10u64..30,
1402        ) {
1403            let rt = tokio::runtime::Builder::new_current_thread()
1404                .enable_time()
1405                .start_paused(true)
1406                .build()
1407                .unwrap();
1408
1409            let config = RetryConfig {
1410                max_retries: 2,
1411                initial_delay_ms: base_delay_ms,
1412                max_delay_ms: base_delay_ms * 2,
1413                backoff_factor: 1.0, // No backoff to isolate jitter
1414                jitter_ms,
1415                operation_timeout_ms: None,
1416                immediate_first: false,
1417                max_elapsed_ms: None,
1418            };
1419
1420            let manager = RetryManager::new(config).unwrap();
1421            let attempt_times = Arc::new(std::sync::Mutex::new(Vec::new()));
1422            let attempt_times_for_block = attempt_times.clone();
1423
1424            rt.block_on(async move {
1425                let attempt_times_for_wait = attempt_times_for_block.clone();
1426                let handle = tokio::spawn({
1427                    let attempt_times_for_task = attempt_times_for_block.clone();
1428                    let manager = manager;
1429                    async move {
1430                        let start_time = tokio::time::Instant::now();
1431                        let _ = manager
1432                            .execute_with_retry(
1433                                "jitter_test",
1434                                move || {
1435                                    let attempt_times_inner = attempt_times_for_task.clone();
1436                                    async move {
1437                                        attempt_times_inner
1438                                            .lock()
1439                                            .unwrap()
1440                                            .push(start_time.elapsed());
1441                                        Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1442                                    }
1443                                },
1444                                |e: &TestError| matches!(e, TestError::Retryable(_)),
1445                                TestError::Timeout,
1446                            )
1447                            .await;
1448                    }
1449                });
1450
1451                yield_until(|| !attempt_times_for_wait.lock().expect(MUTEX_POISONED).is_empty()).await;
1452                advance_until(|| attempt_times_for_wait.lock().expect(MUTEX_POISONED).len() >= 2).await;
1453                advance_until(|| attempt_times_for_wait.lock().expect(MUTEX_POISONED).len() >= 3).await;
1454
1455                handle.await.unwrap();
1456            });
1457
1458            let times = attempt_times.lock().expect(MUTEX_POISONED);
1459
1460            // We expect at least 2 attempts total (initial + at least 1 retry)
1461            prop_assert!(times.len() >= 2);
1462
1463            // First attempt should be immediate (no delay)
1464            prop_assert!(times[0].as_millis() < 5);
1465
1466            // Check subsequent retries have appropriate delays
1467            for i in 1..times.len() {
1468                let delay_from_previous = if i == 1 {
1469                    times[i] - times[0]
1470                } else {
1471                    times[i] - times[i - 1]
1472                };
1473
1474                // The delay should be at least base_delay_ms
1475                prop_assert!(
1476                    delay_from_previous.as_millis() >= base_delay_ms as u128,
1477                    "Retry {} delay {}ms is less than base {}ms",
1478                    i, delay_from_previous.as_millis(), base_delay_ms
1479                );
1480
1481                // Delay should be at most base_delay + jitter
1482                prop_assert!(
1483                    delay_from_previous.as_millis() <= (base_delay_ms + jitter_ms + 1) as u128,
1484                    "Retry {} delay {}ms exceeds base {} + jitter {}",
1485                    i, delay_from_previous.as_millis(), base_delay_ms, jitter_ms
1486                );
1487            }
1488        }
1489
1490        #[rstest]
1491        fn test_immediate_first_property(
1492            immediate_first in any::<bool>(),
1493            initial_delay_ms in 10u64..30,
1494        ) {
1495            let rt = tokio::runtime::Builder::new_current_thread()
1496                .enable_time()
1497                .start_paused(true)
1498                .build()
1499                .unwrap();
1500
1501            let config = RetryConfig {
1502                max_retries: 2,
1503                initial_delay_ms,
1504                max_delay_ms: initial_delay_ms * 2,
1505                backoff_factor: 2.0,
1506                jitter_ms: 0,
1507                operation_timeout_ms: None,
1508                immediate_first,
1509                max_elapsed_ms: None,
1510            };
1511
1512            let manager = RetryManager::new(config).unwrap();
1513            let attempt_times = Arc::new(std::sync::Mutex::new(Vec::new()));
1514            let attempt_times_for_block = attempt_times.clone();
1515
1516            rt.block_on(async move {
1517                let attempt_times_for_wait = attempt_times_for_block.clone();
1518                let handle = tokio::spawn({
1519                    let attempt_times_for_task = attempt_times_for_block.clone();
1520                    let manager = manager;
1521                    async move {
1522                        let start = tokio::time::Instant::now();
1523                        let _ = manager
1524                            .execute_with_retry(
1525                                "immediate_test",
1526                                move || {
1527                                    let attempt_times_inner = attempt_times_for_task.clone();
1528                                    async move {
1529                                        let elapsed = start.elapsed();
1530                                        attempt_times_inner.lock().expect(MUTEX_POISONED).push(elapsed);
1531                                        Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1532                                    }
1533                                },
1534                                |e: &TestError| matches!(e, TestError::Retryable(_)),
1535                                TestError::Timeout,
1536                            )
1537                            .await;
1538                    }
1539                });
1540
1541                yield_until(|| !attempt_times_for_wait.lock().expect(MUTEX_POISONED).is_empty()).await;
1542                advance_until(|| attempt_times_for_wait.lock().expect(MUTEX_POISONED).len() >= 2).await;
1543                advance_until(|| attempt_times_for_wait.lock().expect(MUTEX_POISONED).len() >= 3).await;
1544
1545                handle.await.unwrap();
1546            });
1547
1548            let times = attempt_times.lock().expect(MUTEX_POISONED);
1549            prop_assert!(times.len() >= 2);
1550
1551            if immediate_first {
1552                // First retry should be immediate
1553                prop_assert!(times[1].as_millis() < 20,
1554                    "With immediate_first=true, first retry took {}ms",
1555                    times[1].as_millis());
1556            } else {
1557                // First retry should have delay
1558                prop_assert!(times[1].as_millis() >= (initial_delay_ms - 1) as u128,
1559                    "With immediate_first=false, first retry was too fast: {}ms",
1560                    times[1].as_millis());
1561            }
1562        }
1563
1564        #[rstest]
1565        fn test_non_retryable_stops_immediately(
1566            attempt_before_non_retryable in 0usize..3,
1567            max_retries in 3u32..5,
1568        ) {
1569            let rt = tokio::runtime::Builder::new_current_thread()
1570                .enable_time()
1571                .build()
1572                .unwrap();
1573
1574            let config = RetryConfig {
1575                max_retries,
1576                initial_delay_ms: 10,
1577                max_delay_ms: 100,
1578                backoff_factor: 2.0,
1579                jitter_ms: 0,
1580                operation_timeout_ms: None,
1581                immediate_first: false,
1582                max_elapsed_ms: None,
1583            };
1584
1585            let manager = RetryManager::new(config).unwrap();
1586            let attempt_counter = Arc::new(AtomicU32::new(0));
1587            let counter_clone = attempt_counter.clone();
1588
1589            let result: Result<i32, TestError> = rt.block_on(manager.execute_with_retry(
1590                "non_retryable_test",
1591                move || {
1592                    let counter = counter_clone.clone();
1593                    async move {
1594                        let attempts = counter.fetch_add(1, Ordering::SeqCst) as usize;
1595                        if attempts == attempt_before_non_retryable {
1596                            Err(TestError::NonRetryable("stop".to_string()))
1597                        } else {
1598                            Err(TestError::Retryable("retry".to_string()))
1599                        }
1600                    }
1601                },
1602                |e: &TestError| matches!(e, TestError::Retryable(_)),
1603                TestError::Timeout,
1604            ));
1605
1606            let attempts = attempt_counter.load(Ordering::SeqCst) as usize;
1607
1608            prop_assert!(result.is_err());
1609            prop_assert!(matches!(result.unwrap_err(), TestError::NonRetryable(_)));
1610            // Should stop exactly when non-retryable error occurs
1611            prop_assert_eq!(attempts, attempt_before_non_retryable + 1);
1612        }
1613
1614        #[rstest]
1615        fn test_cancellation_stops_immediately(
1616            cancel_after_ms in 10u64..100,
1617            initial_delay_ms in 200u64..500,
1618        ) {
1619            use tokio_util::sync::CancellationToken;
1620
1621            let rt = tokio::runtime::Builder::new_current_thread()
1622                .enable_time()
1623                .start_paused(true)
1624                .build()
1625                .unwrap();
1626
1627            let config = RetryConfig {
1628                max_retries: 10,
1629                initial_delay_ms,
1630                max_delay_ms: initial_delay_ms * 2,
1631                backoff_factor: 2.0,
1632                jitter_ms: 0,
1633                operation_timeout_ms: None,
1634                immediate_first: false,
1635                max_elapsed_ms: None,
1636            };
1637
1638            let manager = RetryManager::new(config).unwrap();
1639            let token = CancellationToken::new();
1640            let token_clone = token.clone();
1641
1642            let result: Result<i32, TestError> = rt.block_on(async {
1643                // Spawn cancellation task
1644                tokio::spawn(async move {
1645                    tokio::time::sleep(Duration::from_millis(cancel_after_ms)).await;
1646                    token_clone.cancel();
1647                });
1648
1649                let operation_future = manager.execute_with_retry_with_cancel(
1650                    "cancellation_test",
1651                    || async {
1652                        Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1653                    },
1654                    |e: &TestError| matches!(e, TestError::Retryable(_)),
1655                    create_test_error,
1656                    &token,
1657                );
1658
1659                // Advance time to trigger cancellation
1660                tokio::time::advance(Duration::from_millis(cancel_after_ms + 10)).await;
1661                operation_future.await
1662            });
1663
1664            // Should be canceled
1665            prop_assert!(result.is_err());
1666            let error_msg = format!("{}", result.unwrap_err());
1667            prop_assert!(error_msg.contains("canceled"));
1668        }
1669
1670        #[rstest]
1671        fn test_budget_clamp_prevents_overshoot(
1672            max_elapsed_ms in 10u64..30,
1673            delay_per_retry in 20u64..50,
1674        ) {
1675            let rt = tokio::runtime::Builder::new_current_thread()
1676                .enable_time()
1677                .start_paused(true)
1678                .build()
1679                .unwrap();
1680
1681            // Configure so that first retry delay would exceed budget
1682            let config = RetryConfig {
1683                max_retries: 5,
1684                initial_delay_ms: delay_per_retry,
1685                max_delay_ms: delay_per_retry * 2,
1686                backoff_factor: 1.0,
1687                jitter_ms: 0,
1688                operation_timeout_ms: None,
1689                immediate_first: false,
1690                max_elapsed_ms: Some(max_elapsed_ms),
1691            };
1692
1693            let manager = RetryManager::new(config).unwrap();
1694
1695            let _result = rt.block_on(async {
1696                let operation_future = manager.execute_with_retry(
1697                    "budget_clamp_test",
1698                    || async {
1699                        // Fast operation to focus on delay timing
1700                        Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1701                    },
1702                    |e: &TestError| matches!(e, TestError::Retryable(_)),
1703                    create_test_error,
1704                );
1705
1706                // Advance time past max_elapsed_ms
1707                tokio::time::advance(Duration::from_millis(max_elapsed_ms + delay_per_retry)).await;
1708                operation_future.await
1709            });
1710
1711            // With deterministic time, operation completes without wall-clock delay
1712            // The budget constraint is still enforced by the retry manager
1713        }
1714
1715        #[rstest]
1716        fn test_success_on_kth_attempt(
1717            k in 1usize..5,
1718            initial_delay_ms in 5u64..20,
1719        ) {
1720            let rt = tokio::runtime::Builder::new_current_thread()
1721                .enable_time()
1722                .start_paused(true)
1723                .build()
1724                .unwrap();
1725
1726            let config = RetryConfig {
1727                max_retries: 10, // More than k
1728                initial_delay_ms,
1729                max_delay_ms: initial_delay_ms * 4,
1730                backoff_factor: 2.0,
1731                jitter_ms: 0,
1732                operation_timeout_ms: None,
1733                immediate_first: false,
1734                max_elapsed_ms: None,
1735            };
1736
1737            let manager = RetryManager::new(config).unwrap();
1738            let attempt_counter = Arc::new(AtomicU32::new(0));
1739            let counter_clone = attempt_counter.clone();
1740            let target_k = k;
1741
1742            let (result, _elapsed) = rt.block_on(async {
1743                let start = tokio::time::Instant::now();
1744
1745                let operation_future = manager.execute_with_retry(
1746                    "kth_attempt_test",
1747                    move || {
1748                        let counter = counter_clone.clone();
1749                        async move {
1750                            let attempt = counter.fetch_add(1, Ordering::SeqCst) as usize;
1751                            if attempt + 1 == target_k {
1752                                Ok(42)
1753                            } else {
1754                                Err(TestError::Retryable("retry".to_string()))
1755                            }
1756                        }
1757                    },
1758                    |e: &TestError| matches!(e, TestError::Retryable(_)),
1759                    create_test_error,
1760                );
1761
1762                // Advance time to allow enough retries
1763                for _ in 0..k {
1764                    tokio::time::advance(Duration::from_millis(initial_delay_ms * 4)).await;
1765                }
1766
1767                let result = operation_future.await;
1768                let elapsed = start.elapsed();
1769
1770                (result, elapsed)
1771            });
1772
1773            let attempts = attempt_counter.load(Ordering::SeqCst) as usize;
1774
1775            // Using paused Tokio time (start_paused + advance); assert behavior only (no wall-clock timing)
1776            prop_assert!(result.is_ok());
1777            prop_assert_eq!(result.unwrap(), 42);
1778            prop_assert_eq!(attempts, k);
1779        }
1780    }
1781}