aws_smithy_runtime/client/retries/strategy/
standard.rs

1/*
2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6use std::sync::Mutex;
7use std::time::{Duration, SystemTime};
8
9use tokio::sync::OwnedSemaphorePermit;
10use tracing::{debug, trace};
11
12use aws_smithy_runtime_api::box_error::BoxError;
13use aws_smithy_runtime_api::client::interceptors::context::{
14    BeforeTransmitInterceptorContextMut, InterceptorContext,
15};
16use aws_smithy_runtime_api::client::interceptors::{dyn_dispatch_hint, Intercept};
17use aws_smithy_runtime_api::client::retries::classifiers::{RetryAction, RetryReason};
18use aws_smithy_runtime_api::client::retries::{RequestAttempts, RetryStrategy, ShouldAttempt};
19use aws_smithy_runtime_api::client::runtime_components::RuntimeComponents;
20use aws_smithy_types::config_bag::{ConfigBag, Layer, Storable, StoreReplace};
21use aws_smithy_types::retry::{ErrorKind, RetryConfig, RetryMode, RetrySpec};
22
23use crate::client::retries::classifiers::run_classifiers_on_ctx;
24use crate::client::retries::client_rate_limiter::{ClientRateLimiter, RequestReason};
25use crate::client::retries::strategy::standard::ReleaseResult::{
26    APermitWasReleased, NoPermitWasReleased,
27};
28use crate::client::retries::token_bucket::TokenBucket;
29use crate::client::retries::{
30    ClientRateLimiterPartition, LongPollingBackoff, RetryPartition, RetryPartitionInner,
31};
32use crate::static_partition_map::StaticPartitionMap;
33
34static CLIENT_RATE_LIMITER: StaticPartitionMap<ClientRateLimiterPartition, ClientRateLimiter> =
35    StaticPartitionMap::new();
36
37/// Used by token bucket interceptor to ensure a TokenBucket always exists in config bag
38static TOKEN_BUCKET: StaticPartitionMap<RetryPartition, TokenBucket> = StaticPartitionMap::new();
39
40/// Retry strategy with exponential backoff, max attempts, and a token bucket.
41#[derive(Debug, Default)]
42pub struct StandardRetryStrategy {
43    retry_permit: Mutex<Option<OwnedSemaphorePermit>>,
44}
45
46impl Storable for StandardRetryStrategy {
47    type Storer = StoreReplace<Self>;
48}
49
50impl StandardRetryStrategy {
51    /// Create a new standard retry strategy with the given config.
52    pub fn new() -> Self {
53        Default::default()
54    }
55
56    fn release_retry_permit(&self, token_bucket: &TokenBucket) -> ReleaseResult {
57        let mut retry_permit = self.retry_permit.lock().unwrap();
58        match retry_permit.take() {
59            Some(p) => {
60                // Retry succeeded: reward success and forget permit if configured, otherwise release permit back
61                if token_bucket.success_reward() > 0.0 {
62                    token_bucket.reward_success();
63                    p.forget();
64                } else {
65                    drop(p); // Original behavior - release back to bucket
66                }
67                APermitWasReleased
68            }
69            None => {
70                // First-attempt success: reward success or regenerate token
71                if token_bucket.success_reward() > 0.0 {
72                    token_bucket.reward_success();
73                } else {
74                    token_bucket.regenerate_a_token();
75                }
76                NoPermitWasReleased
77            }
78        }
79    }
80
81    fn set_retry_permit(&self, new_retry_permit: OwnedSemaphorePermit) {
82        let mut old_retry_permit = self.retry_permit.lock().unwrap();
83        if let Some(p) = old_retry_permit.replace(new_retry_permit) {
84            // Whenever we set a new retry permit, and it replaces the old one, we need to "forget"
85            // the old permit, removing it from the bucket forever.
86            p.forget()
87        }
88    }
89
90    /// Returns a [`ClientRateLimiter`] if adaptive retry is configured.
91    fn adaptive_retry_rate_limiter(
92        runtime_components: &RuntimeComponents,
93        cfg: &ConfigBag,
94    ) -> Option<ClientRateLimiter> {
95        let retry_config = cfg.load::<RetryConfig>().expect("retry config is required");
96        if retry_config.mode() == RetryMode::Adaptive {
97            if let Some(time_source) = runtime_components.time_source() {
98                let retry_partition = cfg.load::<RetryPartition>().expect("set in default config");
99                let seconds_since_unix_epoch = time_source
100                    .now()
101                    .duration_since(SystemTime::UNIX_EPOCH)
102                    .expect("the present takes place after the UNIX_EPOCH")
103                    .as_secs_f64();
104                let client_rate_limiter = match &retry_partition.inner {
105                    RetryPartitionInner::Default(_) => {
106                        let client_rate_limiter_partition =
107                            ClientRateLimiterPartition::new(retry_partition.clone());
108                        CLIENT_RATE_LIMITER.get_or_init(client_rate_limiter_partition, || {
109                            ClientRateLimiter::new(seconds_since_unix_epoch)
110                        })
111                    }
112                    RetryPartitionInner::Custom {
113                        client_rate_limiter,
114                        ..
115                    } => client_rate_limiter.clone(),
116                };
117                return Some(client_rate_limiter);
118            }
119        }
120        None
121    }
122
123    fn calculate_backoff(
124        &self,
125        runtime_components: &RuntimeComponents,
126        cfg: &ConfigBag,
127        retry_cfg: &RetryConfig,
128        retry_reason: &RetryAction,
129    ) -> Result<Duration, ShouldAttempt> {
130        let request_attempts = cfg
131            .load::<RequestAttempts>()
132            .expect("at least one request attempt is made before any retry is attempted")
133            .attempts();
134
135        match retry_reason {
136            RetryAction::RetryIndicated(RetryReason::RetryableError { kind, retry_after }) => {
137                let initial_backoff = if *kind != ErrorKind::ThrottlingError {
138                    retry_cfg
139                        .retry_spec()
140                        .map(|s| s.non_throttling_initial_backoff())
141                        .unwrap_or(retry_cfg.initial_backoff())
142                        .as_secs_f64()
143                } else {
144                    retry_cfg.initial_backoff().as_secs_f64()
145                };
146
147                if let Some(delay) = check_rate_limiter_for_delay(runtime_components, cfg, *kind) {
148                    let delay = delay.min(retry_cfg.max_backoff());
149                    debug!("rate limiter has requested a {delay:?} delay before retrying");
150                    Ok(delay)
151                } else {
152                    let base = if retry_cfg.use_static_exponential_base() {
153                        1.0
154                    } else {
155                        fastrand::f64()
156                    };
157                    let t_i = calculate_exponential_backoff(
158                        base,
159                        initial_backoff,
160                        request_attempts - 1,
161                        retry_cfg.max_backoff(),
162                    );
163
164                    if let Some(retry_after) = *retry_after {
165                        if retry_cfg
166                            .retry_spec()
167                            .is_some_and(|s| s.is_at_least(RetrySpec::V2_1))
168                        {
169                            let delay = retry_after.clamp(t_i, t_i + Duration::from_secs(5));
170                            debug!("x-amz-retry-after bounded to {delay:?} (t_i={t_i:?})");
171                            Ok(delay)
172                        } else {
173                            let delay = retry_after.min(retry_cfg.max_backoff());
174                            debug!(
175                                "explicit request from server to delay {delay:?} before retrying"
176                            );
177                            Ok(delay)
178                        }
179                    } else {
180                        Ok(t_i)
181                    }
182                }
183            }
184            RetryAction::RetryForbidden | RetryAction::NoActionIndicated => {
185                debug!(
186                    attempts = request_attempts,
187                    max_attempts = retry_cfg.max_attempts(),
188                    "encountered un-retryable error"
189                );
190                Err(ShouldAttempt::No)
191            }
192            _ => unreachable!("RetryAction is non-exhaustive"),
193        }
194    }
195}
196
197enum ReleaseResult {
198    APermitWasReleased,
199    NoPermitWasReleased,
200}
201
202impl RetryStrategy for StandardRetryStrategy {
203    fn should_attempt_initial_request(
204        &self,
205        runtime_components: &RuntimeComponents,
206        cfg: &ConfigBag,
207    ) -> Result<ShouldAttempt, BoxError> {
208        if let Some(crl) = Self::adaptive_retry_rate_limiter(runtime_components, cfg) {
209            let seconds_since_unix_epoch = get_seconds_since_unix_epoch(runtime_components);
210            if let Err(delay) = crl.acquire_permission_to_send_a_request(
211                seconds_since_unix_epoch,
212                RequestReason::InitialRequest,
213            ) {
214                return Ok(ShouldAttempt::YesAfterDelay(delay));
215            }
216        } else {
217            debug!("no client rate limiter configured, so no token is required for the initial request.");
218        }
219
220        Ok(ShouldAttempt::Yes)
221    }
222
223    fn should_attempt_retry(
224        &self,
225        ctx: &InterceptorContext,
226        runtime_components: &RuntimeComponents,
227        cfg: &ConfigBag,
228    ) -> Result<ShouldAttempt, BoxError> {
229        let retry_cfg = cfg.load::<RetryConfig>().expect("retry config is required");
230
231        // bookkeeping
232        let token_bucket = cfg.load::<TokenBucket>().expect("token bucket is required");
233        // run the classifier against the context to determine if we should retry
234        let retry_classifiers = runtime_components.retry_classifiers();
235        let classifier_result = run_classifiers_on_ctx(retry_classifiers, ctx);
236
237        // (adaptive only): update fill rate
238        // NOTE: SEP indicates doing bookkeeping before asking if we should retry. We need to know if
239        // the error was a throttling error though to do adaptive retry bookkeeping so we take
240        // advantage of that information being available via the classifier result
241        let error_kind = error_kind(&classifier_result);
242        let is_throttling_error = error_kind
243            .map(|kind| kind == ErrorKind::ThrottlingError)
244            .unwrap_or(false);
245        update_rate_limiter_if_exists(runtime_components, cfg, is_throttling_error);
246
247        // on success release any retry quota held by previous attempts, reward success when indicated
248        if !ctx.is_failed() {
249            self.release_retry_permit(token_bucket);
250        }
251        // end bookkeeping
252
253        let request_attempts = cfg
254            .load::<RequestAttempts>()
255            .expect("at least one request attempt is made before any retry is attempted")
256            .attempts();
257
258        // check if retry should be attempted
259        if !classifier_result.should_retry() {
260            debug!(
261                "attempt #{request_attempts} classified as {:?}, not retrying",
262                classifier_result
263            );
264            return Ok(ShouldAttempt::No);
265        }
266
267        // check if we're out of attempts
268        if request_attempts >= retry_cfg.max_attempts() {
269            debug!(
270                attempts = request_attempts,
271                max_attempts = retry_cfg.max_attempts(),
272                "not retrying because we are out of attempts"
273            );
274            return Ok(ShouldAttempt::No);
275        }
276
277        //  acquire permit for retry
278        let error_kind = error_kind.expect("result was classified retryable");
279        let is_long_polling = retry_cfg.retry_spec().is_some_and(|s| s.long_polling());
280
281        // Calculate backoff before token check. For long-polling services, this ensures
282        // the caller's loop is slowed down even when the token bucket is empty.
283        let backoff =
284            match self.calculate_backoff(runtime_components, cfg, retry_cfg, &classifier_result) {
285                Ok(value) => value,
286                Err(value) => return Ok(value),
287            };
288
289        //  acquire permit for retry
290        match token_bucket.acquire(
291            &error_kind,
292            &runtime_components.time_source().unwrap_or_default(),
293        ) {
294            Some(permit) => self.set_retry_permit(permit),
295            None => {
296                debug!("attempt #{request_attempts} failed with {error_kind:?}; not enough retry quota.");
297                if is_long_polling {
298                    if let Some(hint) = cfg.load::<LongPollingBackoff>() {
299                        hint.set(backoff);
300                    }
301                }
302                return Ok(ShouldAttempt::No);
303            }
304        }
305
306        debug!(
307            "attempt #{request_attempts} failed with {:?}; retrying after {:?}",
308            classifier_result, backoff
309        );
310        Ok(ShouldAttempt::YesAfterDelay(backoff))
311    }
312}
313
314/// extract the error kind from the classifier result if available
315fn error_kind(classifier_result: &RetryAction) -> Option<ErrorKind> {
316    match classifier_result {
317        RetryAction::RetryIndicated(RetryReason::RetryableError { kind, .. }) => Some(*kind),
318        _ => None,
319    }
320}
321
322fn update_rate_limiter_if_exists(
323    runtime_components: &RuntimeComponents,
324    cfg: &ConfigBag,
325    is_throttling_error: bool,
326) {
327    if let Some(crl) = StandardRetryStrategy::adaptive_retry_rate_limiter(runtime_components, cfg) {
328        let seconds_since_unix_epoch = get_seconds_since_unix_epoch(runtime_components);
329        crl.update_rate_limiter(seconds_since_unix_epoch, is_throttling_error);
330    }
331}
332
333fn check_rate_limiter_for_delay(
334    runtime_components: &RuntimeComponents,
335    cfg: &ConfigBag,
336    kind: ErrorKind,
337) -> Option<Duration> {
338    if let Some(crl) = StandardRetryStrategy::adaptive_retry_rate_limiter(runtime_components, cfg) {
339        let retry_reason = if kind == ErrorKind::ThrottlingError {
340            RequestReason::RetryTimeout
341        } else {
342            RequestReason::Retry
343        };
344        if let Err(delay) = crl.acquire_permission_to_send_a_request(
345            get_seconds_since_unix_epoch(runtime_components),
346            retry_reason,
347        ) {
348            return Some(delay);
349        }
350    }
351
352    None
353}
354
355pub(super) fn calculate_exponential_backoff(
356    base: f64,
357    initial_backoff: f64,
358    retry_attempts: u32,
359    max_backoff: Duration,
360) -> Duration {
361    let result = match 2_u32
362        .checked_pow(retry_attempts)
363        .map(|power| (power as f64) * initial_backoff)
364    {
365        Some(backoff) => match Duration::try_from_secs_f64(backoff) {
366            Ok(result) => result.min(max_backoff),
367            Err(e) => {
368                tracing::warn!("falling back to {max_backoff:?} as `Duration` could not be created for exponential backoff: {e}");
369                max_backoff
370            }
371        },
372        None => max_backoff,
373    };
374
375    // Apply jitter to `result`, and note that it can be applied to `max_backoff`.
376    // Won't panic because `base` is either in range 0..1 or a constant 1 in testing (if configured).
377    result.mul_f64(base)
378}
379
380pub(super) fn get_seconds_since_unix_epoch(runtime_components: &RuntimeComponents) -> f64 {
381    let request_time = runtime_components
382        .time_source()
383        .expect("time source required for retries");
384    request_time
385        .now()
386        .duration_since(SystemTime::UNIX_EPOCH)
387        .unwrap()
388        .as_secs_f64()
389}
390
391/// Interceptor registered in default retry plugin that ensures a token bucket exists in config
392/// bag for every operation. Token bucket provided is partitioned by the retry partition **in the
393/// config bag** at the time an operation is executed.
394#[derive(Debug)]
395pub(crate) struct TokenBucketProvider {
396    default_partition: RetryPartition,
397    token_bucket: TokenBucket,
398}
399
400impl TokenBucketProvider {
401    /// Create a new token bucket provider with the given default retry partition.
402    ///
403    /// NOTE: This partition should be the one used for every operation on a client
404    /// unless config is overridden.
405    pub(crate) fn new(
406        default_partition: RetryPartition,
407        init: impl FnOnce() -> TokenBucket,
408    ) -> Self {
409        let token_bucket = TOKEN_BUCKET.get_or_init(default_partition.clone(), init);
410        Self {
411            default_partition,
412            token_bucket,
413        }
414    }
415}
416
417#[dyn_dispatch_hint]
418impl Intercept for TokenBucketProvider {
419    fn name(&self) -> &'static str {
420        "TokenBucketProvider"
421    }
422
423    fn modify_before_retry_loop(
424        &self,
425        _context: &mut BeforeTransmitInterceptorContextMut<'_>,
426        _runtime_components: &RuntimeComponents,
427        cfg: &mut ConfigBag,
428    ) -> Result<(), BoxError> {
429        let retry_partition = cfg.load::<RetryPartition>().expect("set in default config");
430
431        let tb = match &retry_partition.inner {
432            RetryPartitionInner::Default(name) => {
433                // we store the original retry partition configured and associated token bucket
434                // for the client when created so that we can avoid locking on _every_ request
435                // from _every_ client
436                if name == self.default_partition.name() {
437                    // avoid contention on the global lock
438                    self.token_bucket.clone()
439                } else {
440                    TOKEN_BUCKET.get_or_init_default(retry_partition.clone())
441                }
442            }
443            RetryPartitionInner::Custom { token_bucket, .. } => token_bucket.clone(),
444        };
445
446        trace!("token bucket for {retry_partition:?} added to config bag");
447        let mut layer = Layer::new("token_bucket_partition");
448        layer.store_put(tb);
449        cfg.push_layer(layer);
450        Ok(())
451    }
452}
453
454#[cfg(test)]
455mod tests {
456    #[allow(unused_imports)] // will be unused with `--no-default-features --features client`
457    use std::fmt;
458    use std::sync::Mutex;
459    use std::time::Duration;
460
461    use aws_smithy_async::time::SystemTimeSource;
462    use aws_smithy_runtime_api::client::interceptors::context::{
463        Input, InterceptorContext, Output,
464    };
465    use aws_smithy_runtime_api::client::orchestrator::OrchestratorError;
466    use aws_smithy_runtime_api::client::retries::classifiers::{
467        ClassifyRetry, RetryAction, SharedRetryClassifier,
468    };
469    use aws_smithy_runtime_api::client::retries::{
470        AlwaysRetry, RequestAttempts, RetryStrategy, ShouldAttempt,
471    };
472    use aws_smithy_runtime_api::client::runtime_components::{
473        RuntimeComponents, RuntimeComponentsBuilder,
474    };
475    use aws_smithy_types::config_bag::{ConfigBag, Layer};
476    use aws_smithy_types::retry::{ErrorKind, RetryConfig};
477
478    #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
479    use aws_smithy_types::retry::RetrySpec;
480
481    use super::{calculate_exponential_backoff, StandardRetryStrategy};
482    #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
483    use crate::client::retries::token_bucket::{
484        DEFAULT_CAPACITY, DEFAULT_RETRY_COST, DEFAULT_RETRY_TIMEOUT_COST, THROTTLING_RETRY_COST,
485    };
486    #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
487    use crate::client::retries::LongPollingBackoff;
488    use crate::client::retries::{ClientRateLimiter, RetryPartition, TokenBucket};
489
490    #[test]
491    fn no_retry_necessary_for_ok_result() {
492        let cfg = ConfigBag::of_layers(vec![{
493            let mut layer = Layer::new("test");
494            layer.store_put(RetryConfig::standard());
495            layer.store_put(RequestAttempts::new(1));
496            layer.store_put(TokenBucket::default());
497            layer
498        }]);
499        let rc = RuntimeComponentsBuilder::for_tests().build().unwrap();
500        let mut ctx = InterceptorContext::new(Input::doesnt_matter());
501        let strategy = StandardRetryStrategy::default();
502        ctx.set_output_or_error(Ok(Output::doesnt_matter()));
503
504        let actual = strategy
505            .should_attempt_retry(&ctx, &rc, &cfg)
506            .expect("method is infallible for this use");
507        assert_eq!(ShouldAttempt::No, actual);
508    }
509
510    fn set_up_cfg_and_context(
511        error_kind: ErrorKind,
512        current_request_attempts: u32,
513        retry_config: RetryConfig,
514    ) -> (InterceptorContext, RuntimeComponents, ConfigBag) {
515        let mut ctx = InterceptorContext::new(Input::doesnt_matter());
516        ctx.set_output_or_error(Err(OrchestratorError::other("doesn't matter")));
517        let rc = RuntimeComponentsBuilder::for_tests()
518            .with_retry_classifier(SharedRetryClassifier::new(AlwaysRetry(error_kind)))
519            .build()
520            .unwrap();
521        let mut layer = Layer::new("test");
522        layer.store_put(RequestAttempts::new(current_request_attempts));
523        layer.store_put(retry_config);
524        layer.store_put(TokenBucket::default());
525        let cfg = ConfigBag::of_layers(vec![layer]);
526
527        (ctx, rc, cfg)
528    }
529
530    // Test that error kinds produce the correct "retry after X seconds" output.
531    // All error kinds are handled in the same way for the standard strategy.
532    fn test_should_retry_error_kind(error_kind: ErrorKind) {
533        let (ctx, rc, cfg) = set_up_cfg_and_context(
534            error_kind,
535            3,
536            RetryConfig::standard()
537                .with_use_static_exponential_base(true)
538                .with_max_attempts(4),
539        );
540        let strategy = StandardRetryStrategy::new();
541        let actual = strategy
542            .should_attempt_retry(&ctx, &rc, &cfg)
543            .expect("method is infallible for this use");
544        assert_eq!(ShouldAttempt::YesAfterDelay(Duration::from_secs(4)), actual);
545    }
546
547    #[test]
548    fn should_retry_transient_error_result_after_2s() {
549        test_should_retry_error_kind(ErrorKind::TransientError);
550    }
551
552    #[test]
553    fn should_retry_client_error_result_after_2s() {
554        test_should_retry_error_kind(ErrorKind::ClientError);
555    }
556
557    #[test]
558    fn should_retry_server_error_result_after_2s() {
559        test_should_retry_error_kind(ErrorKind::ServerError);
560    }
561
562    #[test]
563    fn should_retry_throttling_error_result_after_2s() {
564        test_should_retry_error_kind(ErrorKind::ThrottlingError);
565    }
566
567    #[test]
568    fn dont_retry_when_out_of_attempts() {
569        let current_attempts = 4;
570        let max_attempts = current_attempts;
571        let (ctx, rc, cfg) = set_up_cfg_and_context(
572            ErrorKind::TransientError,
573            current_attempts,
574            RetryConfig::standard()
575                .with_use_static_exponential_base(true)
576                .with_max_attempts(max_attempts),
577        );
578        let strategy = StandardRetryStrategy::new();
579        let actual = strategy
580            .should_attempt_retry(&ctx, &rc, &cfg)
581            .expect("method is infallible for this use");
582        assert_eq!(ShouldAttempt::No, actual);
583    }
584
585    #[test]
586    fn should_not_panic_when_exponential_backoff_duration_could_not_be_created() {
587        let (ctx, rc, cfg) = set_up_cfg_and_context(
588            ErrorKind::TransientError,
589            // Greater than 32 when subtracted by 1 in `calculate_backoff`, causing overflow in `calculate_exponential_backoff`
590            33,
591            RetryConfig::standard()
592                .with_use_static_exponential_base(true)
593                .with_max_attempts(100), // Any value greater than 33 will do
594        );
595        let strategy = StandardRetryStrategy::new();
596        let actual = strategy
597            .should_attempt_retry(&ctx, &rc, &cfg)
598            .expect("method is infallible for this use");
599        assert_eq!(ShouldAttempt::YesAfterDelay(MAX_BACKOFF), actual);
600    }
601
602    #[test]
603    fn should_yield_client_rate_limiter_from_custom_partition() {
604        let expected = ClientRateLimiter::builder().token_refill_rate(3.14).build();
605        let cfg = ConfigBag::of_layers(vec![
606            // Emulate default config layer overriden by a user config layer
607            {
608                let mut layer = Layer::new("default");
609                layer.store_put(RetryPartition::new("default"));
610                layer
611            },
612            {
613                let mut layer = Layer::new("user");
614                layer.store_put(RetryConfig::adaptive());
615                layer.store_put(
616                    RetryPartition::custom("user")
617                        .client_rate_limiter(expected.clone())
618                        .build(),
619                );
620                layer
621            },
622        ]);
623        let rc = RuntimeComponentsBuilder::for_tests()
624            .with_time_source(Some(SystemTimeSource::new()))
625            .build()
626            .unwrap();
627        let actual = StandardRetryStrategy::adaptive_retry_rate_limiter(&rc, &cfg)
628            .expect("should yield client rate limiter from custom partition");
629        assert!(std::sync::Arc::ptr_eq(&expected.inner, &actual.inner));
630    }
631
632    #[allow(dead_code)] // will be unused with `--no-default-features --features client`
633    #[derive(Debug)]
634    struct PresetReasonRetryClassifier {
635        retry_actions: Mutex<Vec<RetryAction>>,
636    }
637
638    #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
639    impl PresetReasonRetryClassifier {
640        fn new(mut retry_reasons: Vec<RetryAction>) -> Self {
641            // We'll pop the retry_reasons in reverse order, so we reverse the list to fix that.
642            retry_reasons.reverse();
643            Self {
644                retry_actions: Mutex::new(retry_reasons),
645            }
646        }
647    }
648
649    impl ClassifyRetry for PresetReasonRetryClassifier {
650        fn classify_retry(&self, ctx: &InterceptorContext) -> RetryAction {
651            // Check for a result
652            let output_or_error = ctx.output_or_error();
653            // Check for an error
654            match output_or_error {
655                Some(Ok(_)) | None => return RetryAction::NoActionIndicated,
656                _ => (),
657            };
658
659            let mut retry_actions = self.retry_actions.lock().unwrap();
660            if retry_actions.len() == 1 {
661                retry_actions.first().unwrap().clone()
662            } else {
663                retry_actions.pop().unwrap()
664            }
665        }
666
667        fn name(&self) -> &'static str {
668            "Always returns a preset retry reason"
669        }
670    }
671
672    #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
673    fn setup_test(
674        retry_reasons: Vec<RetryAction>,
675        retry_config: RetryConfig,
676    ) -> (ConfigBag, RuntimeComponents, InterceptorContext) {
677        let rc = RuntimeComponentsBuilder::for_tests()
678            .with_retry_classifier(SharedRetryClassifier::new(
679                PresetReasonRetryClassifier::new(retry_reasons),
680            ))
681            .build()
682            .unwrap();
683        let mut layer = Layer::new("test");
684        layer.store_put(retry_config);
685        let cfg = ConfigBag::of_layers(vec![layer]);
686        let mut ctx = InterceptorContext::new(Input::doesnt_matter());
687        // This type doesn't matter b/c the classifier will just return whatever we tell it to.
688        ctx.set_output_or_error(Err(OrchestratorError::other("doesn't matter")));
689
690        (cfg, rc, ctx)
691    }
692
693    #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
694    #[test]
695    fn eventual_success() {
696        let (mut cfg, rc, mut ctx) = setup_test(
697            vec![RetryAction::server_error()],
698            RetryConfig::standard()
699                .with_use_static_exponential_base(true)
700                .with_max_attempts(5),
701        );
702        let strategy = StandardRetryStrategy::new();
703        cfg.interceptor_state().store_put(TokenBucket::default());
704        let token_bucket = cfg.load::<TokenBucket>().unwrap().clone();
705
706        cfg.interceptor_state().store_put(RequestAttempts::new(1));
707        let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
708        let dur = should_retry.expect_delay();
709        assert_eq!(dur, Duration::from_secs(1));
710        assert_eq!(token_bucket.available_permits(), 495);
711
712        cfg.interceptor_state().store_put(RequestAttempts::new(2));
713        let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
714        let dur = should_retry.expect_delay();
715        assert_eq!(dur, Duration::from_secs(2));
716        assert_eq!(token_bucket.available_permits(), 490);
717
718        ctx.set_output_or_error(Ok(Output::doesnt_matter()));
719
720        cfg.interceptor_state().store_put(RequestAttempts::new(3));
721        let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
722        assert_eq!(no_retry, ShouldAttempt::No);
723        assert_eq!(token_bucket.available_permits(), 495);
724    }
725
726    #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
727    #[test]
728    fn no_more_attempts() {
729        let (mut cfg, rc, ctx) = setup_test(
730            vec![RetryAction::server_error()],
731            RetryConfig::standard()
732                .with_use_static_exponential_base(true)
733                .with_max_attempts(3),
734        );
735        let strategy = StandardRetryStrategy::new();
736        cfg.interceptor_state().store_put(TokenBucket::default());
737        let token_bucket = cfg.load::<TokenBucket>().unwrap().clone();
738
739        cfg.interceptor_state().store_put(RequestAttempts::new(1));
740        let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
741        let dur = should_retry.expect_delay();
742        assert_eq!(dur, Duration::from_secs(1));
743        assert_eq!(token_bucket.available_permits(), 495);
744
745        cfg.interceptor_state().store_put(RequestAttempts::new(2));
746        let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
747        let dur = should_retry.expect_delay();
748        assert_eq!(dur, Duration::from_secs(2));
749        assert_eq!(token_bucket.available_permits(), 490);
750
751        cfg.interceptor_state().store_put(RequestAttempts::new(3));
752        let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
753        assert_eq!(no_retry, ShouldAttempt::No);
754        assert_eq!(token_bucket.available_permits(), 490);
755    }
756
757    #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
758    #[test]
759    fn successful_request_and_deser_should_be_retryable() {
760        #[derive(Clone, Copy, Debug)]
761        enum LongRunningOperationStatus {
762            Running,
763            Complete,
764        }
765
766        #[derive(Debug)]
767        struct LongRunningOperationOutput {
768            status: Option<LongRunningOperationStatus>,
769        }
770
771        impl LongRunningOperationOutput {
772            fn status(&self) -> Option<LongRunningOperationStatus> {
773                self.status
774            }
775        }
776
777        struct WaiterRetryClassifier {}
778
779        impl WaiterRetryClassifier {
780            fn new() -> Self {
781                WaiterRetryClassifier {}
782            }
783        }
784
785        impl fmt::Debug for WaiterRetryClassifier {
786            fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
787                write!(f, "WaiterRetryClassifier")
788            }
789        }
790        impl ClassifyRetry for WaiterRetryClassifier {
791            fn classify_retry(&self, ctx: &InterceptorContext) -> RetryAction {
792                let status: Option<LongRunningOperationStatus> =
793                    ctx.output_or_error().and_then(|res| {
794                        res.ok().and_then(|output| {
795                            output
796                                .downcast_ref::<LongRunningOperationOutput>()
797                                .and_then(|output| output.status())
798                        })
799                    });
800
801                if let Some(LongRunningOperationStatus::Running) = status {
802                    return RetryAction::server_error();
803                };
804
805                RetryAction::NoActionIndicated
806            }
807
808            fn name(&self) -> &'static str {
809                "waiter retry classifier"
810            }
811        }
812
813        let retry_config = RetryConfig::standard()
814            .with_use_static_exponential_base(true)
815            .with_max_attempts(5);
816
817        let rc = RuntimeComponentsBuilder::for_tests()
818            .with_retry_classifier(SharedRetryClassifier::new(WaiterRetryClassifier::new()))
819            .build()
820            .unwrap();
821        let mut layer = Layer::new("test");
822        layer.store_put(retry_config);
823        let mut cfg = ConfigBag::of_layers(vec![layer]);
824        let mut ctx = InterceptorContext::new(Input::doesnt_matter());
825        let strategy = StandardRetryStrategy::new();
826
827        ctx.set_output_or_error(Ok(Output::erase(LongRunningOperationOutput {
828            status: Some(LongRunningOperationStatus::Running),
829        })));
830
831        cfg.interceptor_state().store_put(TokenBucket::new(5));
832        let token_bucket = cfg.load::<TokenBucket>().unwrap().clone();
833
834        cfg.interceptor_state().store_put(RequestAttempts::new(1));
835        let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
836        let dur = should_retry.expect_delay();
837        assert_eq!(dur, Duration::from_secs(1));
838        assert_eq!(token_bucket.available_permits(), 0);
839
840        ctx.set_output_or_error(Ok(Output::erase(LongRunningOperationOutput {
841            status: Some(LongRunningOperationStatus::Complete),
842        })));
843        cfg.interceptor_state().store_put(RequestAttempts::new(2));
844        let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
845        should_retry.expect_no();
846        assert_eq!(token_bucket.available_permits(), 5);
847    }
848
849    #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
850    #[test]
851    fn no_quota() {
852        let (mut cfg, rc, ctx) = setup_test(
853            vec![RetryAction::server_error()],
854            RetryConfig::standard()
855                .with_use_static_exponential_base(true)
856                .with_max_attempts(5),
857        );
858        let strategy = StandardRetryStrategy::new();
859        cfg.interceptor_state().store_put(TokenBucket::new(5));
860        let token_bucket = cfg.load::<TokenBucket>().unwrap().clone();
861
862        cfg.interceptor_state().store_put(RequestAttempts::new(1));
863        let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
864        let dur = should_retry.expect_delay();
865        assert_eq!(dur, Duration::from_secs(1));
866        assert_eq!(token_bucket.available_permits(), 0);
867
868        cfg.interceptor_state().store_put(RequestAttempts::new(2));
869        let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
870        assert_eq!(no_retry, ShouldAttempt::No);
871        assert_eq!(token_bucket.available_permits(), 0);
872    }
873
874    #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
875    #[test]
876    fn quota_replenishes_on_success() {
877        let (mut cfg, rc, mut ctx) = setup_test(
878            vec![
879                RetryAction::transient_error(),
880                RetryAction::retryable_error_with_explicit_delay(
881                    ErrorKind::TransientError,
882                    Duration::from_secs(1),
883                ),
884            ],
885            RetryConfig::standard()
886                .with_use_static_exponential_base(true)
887                .with_max_attempts(5),
888        );
889        let strategy = StandardRetryStrategy::new();
890        cfg.interceptor_state().store_put(TokenBucket::new(100));
891        let token_bucket = cfg.load::<TokenBucket>().unwrap().clone();
892
893        cfg.interceptor_state().store_put(RequestAttempts::new(1));
894        let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
895        let dur = should_retry.expect_delay();
896        assert_eq!(dur, Duration::from_secs(1));
897        assert_eq!(token_bucket.available_permits(), 90);
898
899        cfg.interceptor_state().store_put(RequestAttempts::new(2));
900        let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
901        let dur = should_retry.expect_delay();
902        assert_eq!(dur, Duration::from_secs(1));
903        assert_eq!(token_bucket.available_permits(), 80);
904
905        ctx.set_output_or_error(Ok(Output::doesnt_matter()));
906
907        cfg.interceptor_state().store_put(RequestAttempts::new(3));
908        let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
909        assert_eq!(no_retry, ShouldAttempt::No);
910
911        assert_eq!(token_bucket.available_permits(), 90);
912    }
913
914    #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
915    #[test]
916    fn quota_replenishes_on_first_try_success() {
917        const PERMIT_COUNT: usize = 20;
918        let (mut cfg, rc, mut ctx) = setup_test(
919            vec![RetryAction::transient_error()],
920            RetryConfig::standard()
921                .with_use_static_exponential_base(true)
922                .with_max_attempts(u32::MAX),
923        );
924        let strategy = StandardRetryStrategy::new();
925        cfg.interceptor_state()
926            .store_put(TokenBucket::new(PERMIT_COUNT));
927        let token_bucket = cfg.load::<TokenBucket>().unwrap().clone();
928
929        let mut attempt = 1;
930
931        // Drain all available permits with failed attempts
932        while token_bucket.available_permits() > 0 {
933            // Draining should complete in 2 attempts
934            if attempt > 2 {
935                panic!("This test should have completed by now (drain)");
936            }
937
938            cfg.interceptor_state()
939                .store_put(RequestAttempts::new(attempt));
940            let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
941            assert!(matches!(should_retry, ShouldAttempt::YesAfterDelay(_)));
942            attempt += 1;
943        }
944
945        // Forget the permit so that we can only refill by "success on first try".
946        let permit = strategy.retry_permit.lock().unwrap().take().unwrap();
947        permit.forget();
948
949        ctx.set_output_or_error(Ok(Output::doesnt_matter()));
950
951        // Replenish permits until we get back to `PERMIT_COUNT`
952        while token_bucket.available_permits() < PERMIT_COUNT {
953            if attempt > 23 {
954                panic!("This test should have completed by now (fill-up)");
955            }
956
957            cfg.interceptor_state()
958                .store_put(RequestAttempts::new(attempt));
959            let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
960            assert_eq!(no_retry, ShouldAttempt::No);
961            attempt += 1;
962        }
963
964        assert_eq!(attempt, 23);
965        assert_eq!(token_bucket.available_permits(), PERMIT_COUNT);
966    }
967
968    #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
969    #[test]
970    fn backoff_timing() {
971        let (mut cfg, rc, ctx) = setup_test(
972            vec![RetryAction::server_error()],
973            RetryConfig::standard()
974                .with_use_static_exponential_base(true)
975                .with_max_attempts(5),
976        );
977        let strategy = StandardRetryStrategy::new();
978        cfg.interceptor_state().store_put(TokenBucket::default());
979        let token_bucket = cfg.load::<TokenBucket>().unwrap().clone();
980
981        cfg.interceptor_state().store_put(RequestAttempts::new(1));
982        let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
983        let dur = should_retry.expect_delay();
984        assert_eq!(dur, Duration::from_secs(1));
985        assert_eq!(token_bucket.available_permits(), 495);
986
987        cfg.interceptor_state().store_put(RequestAttempts::new(2));
988        let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
989        let dur = should_retry.expect_delay();
990        assert_eq!(dur, Duration::from_secs(2));
991        assert_eq!(token_bucket.available_permits(), 490);
992
993        cfg.interceptor_state().store_put(RequestAttempts::new(3));
994        let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
995        let dur = should_retry.expect_delay();
996        assert_eq!(dur, Duration::from_secs(4));
997        assert_eq!(token_bucket.available_permits(), 485);
998
999        cfg.interceptor_state().store_put(RequestAttempts::new(4));
1000        let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1001        let dur = should_retry.expect_delay();
1002        assert_eq!(dur, Duration::from_secs(8));
1003        assert_eq!(token_bucket.available_permits(), 480);
1004
1005        cfg.interceptor_state().store_put(RequestAttempts::new(5));
1006        let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1007        assert_eq!(no_retry, ShouldAttempt::No);
1008        assert_eq!(token_bucket.available_permits(), 480);
1009    }
1010
1011    #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
1012    #[test]
1013    fn max_backoff_time() {
1014        let (mut cfg, rc, ctx) = setup_test(
1015            vec![RetryAction::server_error()],
1016            RetryConfig::standard()
1017                .with_use_static_exponential_base(true)
1018                .with_max_attempts(5)
1019                .with_initial_backoff(Duration::from_secs(1))
1020                .with_max_backoff(Duration::from_secs(3)),
1021        );
1022        let strategy = StandardRetryStrategy::new();
1023        cfg.interceptor_state().store_put(TokenBucket::default());
1024        let token_bucket = cfg.load::<TokenBucket>().unwrap().clone();
1025
1026        cfg.interceptor_state().store_put(RequestAttempts::new(1));
1027        let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1028        let dur = should_retry.expect_delay();
1029        assert_eq!(dur, Duration::from_secs(1));
1030        assert_eq!(token_bucket.available_permits(), 495);
1031
1032        cfg.interceptor_state().store_put(RequestAttempts::new(2));
1033        let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1034        let dur = should_retry.expect_delay();
1035        assert_eq!(dur, Duration::from_secs(2));
1036        assert_eq!(token_bucket.available_permits(), 490);
1037
1038        cfg.interceptor_state().store_put(RequestAttempts::new(3));
1039        let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1040        let dur = should_retry.expect_delay();
1041        assert_eq!(dur, Duration::from_secs(3));
1042        assert_eq!(token_bucket.available_permits(), 485);
1043
1044        cfg.interceptor_state().store_put(RequestAttempts::new(4));
1045        let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1046        let dur = should_retry.expect_delay();
1047        assert_eq!(dur, Duration::from_secs(3));
1048        assert_eq!(token_bucket.available_permits(), 480);
1049
1050        cfg.interceptor_state().store_put(RequestAttempts::new(5));
1051        let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1052        assert_eq!(no_retry, ShouldAttempt::No);
1053        assert_eq!(token_bucket.available_permits(), 480);
1054    }
1055
1056    const MAX_BACKOFF: Duration = Duration::from_secs(20);
1057
1058    #[test]
1059    fn calculate_exponential_backoff_where_initial_backoff_is_one() {
1060        let initial_backoff = 1.0;
1061
1062        for (attempt, expected_backoff) in [initial_backoff, 2.0, 4.0].into_iter().enumerate() {
1063            let actual_backoff =
1064                calculate_exponential_backoff(1.0, initial_backoff, attempt as u32, MAX_BACKOFF);
1065            assert_eq!(Duration::from_secs_f64(expected_backoff), actual_backoff);
1066        }
1067    }
1068
1069    #[test]
1070    fn calculate_exponential_backoff_where_initial_backoff_is_greater_than_one() {
1071        let initial_backoff = 3.0;
1072
1073        for (attempt, expected_backoff) in [initial_backoff, 6.0, 12.0].into_iter().enumerate() {
1074            let actual_backoff =
1075                calculate_exponential_backoff(1.0, initial_backoff, attempt as u32, MAX_BACKOFF);
1076            assert_eq!(Duration::from_secs_f64(expected_backoff), actual_backoff);
1077        }
1078    }
1079
1080    #[test]
1081    fn calculate_exponential_backoff_where_initial_backoff_is_less_than_one() {
1082        let initial_backoff = 0.03;
1083
1084        for (attempt, expected_backoff) in [initial_backoff, 0.06, 0.12].into_iter().enumerate() {
1085            let actual_backoff =
1086                calculate_exponential_backoff(1.0, initial_backoff, attempt as u32, MAX_BACKOFF);
1087            assert_eq!(Duration::from_secs_f64(expected_backoff), actual_backoff);
1088        }
1089    }
1090
1091    #[test]
1092    fn calculate_backoff_overflow_should_gracefully_fallback_to_max_backoff() {
1093        // avoid overflow for a silly large amount of retry attempts
1094        assert_eq!(
1095            MAX_BACKOFF,
1096            calculate_exponential_backoff(1_f64, 10_f64, 100000, MAX_BACKOFF),
1097        );
1098    }
1099
1100    #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
1101    #[test]
1102    fn v2_1_non_throttling_uses_50ms_backoff() {
1103        let (ctx, rc, cfg) = set_up_cfg_and_context(
1104            ErrorKind::ServerError,
1105            1,
1106            RetryConfig::standard()
1107                .with_use_static_exponential_base(true)
1108                .with_max_attempts(3)
1109                .with_retry_spec(RetrySpec::v2_1()),
1110        );
1111        let strategy = StandardRetryStrategy::new();
1112        let actual = strategy
1113            .should_attempt_retry(&ctx, &rc, &cfg)
1114            .expect("method is infallible for this use");
1115        // 50ms * 2^0 = 50ms
1116        assert_eq!(
1117            ShouldAttempt::YesAfterDelay(Duration::from_millis(50)),
1118            actual
1119        );
1120    }
1121
1122    #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
1123    #[test]
1124    fn v2_1_throttling_uses_1s_backoff() {
1125        let (ctx, rc, cfg) = set_up_cfg_and_context(
1126            ErrorKind::ThrottlingError,
1127            1,
1128            RetryConfig::standard()
1129                .with_use_static_exponential_base(true)
1130                .with_max_attempts(3)
1131                .with_retry_spec(RetrySpec::v2_1()),
1132        );
1133        let strategy = StandardRetryStrategy::new();
1134        let actual = strategy
1135            .should_attempt_retry(&ctx, &rc, &cfg)
1136            .expect("method is infallible for this use");
1137        // 1s * 2^0 = 1s (throttling keeps legacy backoff)
1138        assert_eq!(ShouldAttempt::YesAfterDelay(Duration::from_secs(1)), actual);
1139    }
1140
1141    #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
1142    #[test]
1143    fn v2_0_non_throttling_uses_1s_backoff() {
1144        let (ctx, rc, cfg) = set_up_cfg_and_context(
1145            ErrorKind::ServerError,
1146            1,
1147            RetryConfig::standard()
1148                .with_use_static_exponential_base(true)
1149                .with_max_attempts(3)
1150                .with_retry_spec(RetrySpec::v2_0()),
1151        );
1152        let strategy = StandardRetryStrategy::new();
1153        let actual = strategy
1154            .should_attempt_retry(&ctx, &rc, &cfg)
1155            .expect("method is infallible for this use");
1156        // 1s * 2^0 = 1s (v2.0 keeps legacy backoff)
1157        assert_eq!(ShouldAttempt::YesAfterDelay(Duration::from_secs(1)), actual);
1158    }
1159
1160    #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
1161    #[test]
1162    fn v2_1_retry_after_bounded_between_t_i_and_t_i_plus_5s() {
1163        let (mut cfg, rc, ctx) = setup_test(
1164            vec![RetryAction::retryable_error_with_explicit_delay(
1165                ErrorKind::ServerError,
1166                Duration::from_secs(3),
1167            )],
1168            RetryConfig::standard()
1169                .with_use_static_exponential_base(true)
1170                .with_max_attempts(3)
1171                .with_retry_spec(RetrySpec::v2_1()),
1172        );
1173        let strategy = StandardRetryStrategy::new();
1174        cfg.interceptor_state().store_put(TokenBucket::default());
1175        cfg.interceptor_state().store_put(RequestAttempts::new(1));
1176        let actual = strategy
1177            .should_attempt_retry(&ctx, &rc, &cfg)
1178            .expect("method is infallible for this use");
1179        // t_i = 50ms, retry_after = 3s, clamp(3s, 50ms, 5.05s) = 3s
1180        assert_eq!(ShouldAttempt::YesAfterDelay(Duration::from_secs(3)), actual);
1181    }
1182
1183    #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
1184    #[test]
1185    fn v2_1_retry_after_below_t_i_uses_t_i() {
1186        let (mut cfg, rc, ctx) = setup_test(
1187            vec![RetryAction::retryable_error_with_explicit_delay(
1188                ErrorKind::ServerError,
1189                Duration::from_millis(10),
1190            )],
1191            RetryConfig::standard()
1192                .with_use_static_exponential_base(true)
1193                .with_max_attempts(3)
1194                .with_retry_spec(RetrySpec::v2_1()),
1195        );
1196        let strategy = StandardRetryStrategy::new();
1197        cfg.interceptor_state().store_put(TokenBucket::default());
1198        cfg.interceptor_state().store_put(RequestAttempts::new(1));
1199        let actual = strategy
1200            .should_attempt_retry(&ctx, &rc, &cfg)
1201            .expect("method is infallible for this use");
1202        // t_i = 50ms, retry_after = 10ms < t_i, so use t_i = 50ms
1203        assert_eq!(
1204            ShouldAttempt::YesAfterDelay(Duration::from_millis(50)),
1205            actual
1206        );
1207    }
1208
1209    #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
1210    #[test]
1211    fn v2_1_retry_after_above_t_i_plus_5s_capped() {
1212        let (mut cfg, rc, ctx) = setup_test(
1213            vec![RetryAction::retryable_error_with_explicit_delay(
1214                ErrorKind::ServerError,
1215                Duration::from_secs(10),
1216            )],
1217            RetryConfig::standard()
1218                .with_use_static_exponential_base(true)
1219                .with_max_attempts(3)
1220                .with_retry_spec(RetrySpec::v2_1()),
1221        );
1222        let strategy = StandardRetryStrategy::new();
1223        cfg.interceptor_state().store_put(TokenBucket::default());
1224        cfg.interceptor_state().store_put(RequestAttempts::new(1));
1225        let actual = strategy
1226            .should_attempt_retry(&ctx, &rc, &cfg)
1227            .expect("method is infallible for this use");
1228        // t_i = 50ms, retry_after = 10s > t_i + 5s = 5.05s, so cap at 5.05s
1229        assert_eq!(
1230            ShouldAttempt::YesAfterDelay(Duration::from_millis(5050)),
1231            actual
1232        );
1233    }
1234
1235    #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
1236    #[test]
1237    fn v2_0_retry_after_capped_at_max_backoff() {
1238        let (mut cfg, rc, ctx) = setup_test(
1239            vec![RetryAction::retryable_error_with_explicit_delay(
1240                ErrorKind::ServerError,
1241                Duration::from_secs(30),
1242            )],
1243            RetryConfig::standard()
1244                .with_use_static_exponential_base(true)
1245                .with_max_attempts(3)
1246                .with_retry_spec(RetrySpec::v2_0()),
1247        );
1248        let strategy = StandardRetryStrategy::new();
1249        cfg.interceptor_state().store_put(TokenBucket::default());
1250        cfg.interceptor_state().store_put(RequestAttempts::new(1));
1251        let actual = strategy
1252            .should_attempt_retry(&ctx, &rc, &cfg)
1253            .expect("method is infallible for this use");
1254        // v2.0: retry_after = 30s, capped at max_backoff = 20s
1255        assert_eq!(
1256            ShouldAttempt::YesAfterDelay(Duration::from_secs(20)),
1257            actual
1258        );
1259    }
1260
1261    #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
1262    #[test]
1263    fn long_polling_backs_off_when_token_bucket_empty() {
1264        let (mut cfg, rc, ctx) = setup_test(
1265            vec![RetryAction::server_error()],
1266            RetryConfig::standard()
1267                .with_use_static_exponential_base(true)
1268                .with_max_attempts(5)
1269                .with_retry_spec(RetrySpec::v2_1().with_long_polling(true)),
1270        );
1271        let strategy = StandardRetryStrategy::new();
1272        cfg.interceptor_state().store_put(TokenBucket::new(0));
1273        cfg.interceptor_state().store_put(RequestAttempts::new(1));
1274        let hint = LongPollingBackoff::default();
1275        cfg.interceptor_state().store_put(hint.clone());
1276
1277        let result = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1278        assert_eq!(result, ShouldAttempt::No);
1279        assert_eq!(hint.take(), Some(Duration::from_millis(50)));
1280    }
1281
1282    #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
1283    #[test]
1284    fn non_long_polling_no_backoff_when_token_bucket_empty() {
1285        let (mut cfg, rc, ctx) = setup_test(
1286            vec![RetryAction::server_error()],
1287            RetryConfig::standard()
1288                .with_use_static_exponential_base(true)
1289                .with_max_attempts(5)
1290                .with_retry_spec(RetrySpec::v2_0()),
1291        );
1292        let strategy = StandardRetryStrategy::new();
1293        cfg.interceptor_state().store_put(TokenBucket::new(0));
1294        cfg.interceptor_state().store_put(RequestAttempts::new(1));
1295
1296        let result = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1297        assert_eq!(result, ShouldAttempt::No);
1298    }
1299
1300    #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
1301    fn v2_1_token_bucket_with_capacity(capacity: usize) -> TokenBucket {
1302        TokenBucket::builder()
1303            .capacity(capacity)
1304            .retry_cost(DEFAULT_RETRY_COST)
1305            .throttling_retry_cost(THROTTLING_RETRY_COST)
1306            .timeout_retry_cost(DEFAULT_RETRY_TIMEOUT_COST)
1307            .build()
1308    }
1309
1310    #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
1311    fn v2_1_token_bucket() -> TokenBucket {
1312        v2_1_token_bucket_with_capacity(DEFAULT_CAPACITY)
1313    }
1314
1315    #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
1316    #[test]
1317    fn retry_eventually_succeeds() {
1318        let (mut cfg, rc, mut ctx) = setup_test(
1319            vec![RetryAction::server_error()],
1320            RetryConfig::standard()
1321                .with_use_static_exponential_base(true)
1322                .with_retry_spec(RetrySpec::v2_1()),
1323        );
1324        let strategy = StandardRetryStrategy::new();
1325        let tb = v2_1_token_bucket();
1326        cfg.interceptor_state().store_put(tb.clone());
1327
1328        cfg.interceptor_state().store_put(RequestAttempts::new(1));
1329        let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1330        assert_eq!(should_retry.expect_delay(), Duration::from_millis(50));
1331        assert_eq!(
1332            tb.available_permits(),
1333            DEFAULT_CAPACITY - DEFAULT_RETRY_COST as usize
1334        );
1335
1336        cfg.interceptor_state().store_put(RequestAttempts::new(2));
1337        let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1338        assert_eq!(should_retry.expect_delay(), Duration::from_millis(100));
1339        assert_eq!(
1340            tb.available_permits(),
1341            DEFAULT_CAPACITY - 2 * DEFAULT_RETRY_COST as usize
1342        );
1343
1344        ctx.set_output_or_error(Ok(Output::doesnt_matter()));
1345        cfg.interceptor_state().store_put(RequestAttempts::new(3));
1346        let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1347        assert_eq!(no_retry, ShouldAttempt::No);
1348        assert_eq!(
1349            tb.available_permits(),
1350            DEFAULT_CAPACITY - DEFAULT_RETRY_COST as usize
1351        );
1352    }
1353
1354    #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
1355    #[test]
1356    fn fail_due_to_max_attempts_reached() {
1357        let (mut cfg, rc, ctx) = setup_test(
1358            vec![RetryAction::server_error()],
1359            RetryConfig::standard()
1360                .with_use_static_exponential_base(true)
1361                .with_retry_spec(RetrySpec::v2_1()),
1362        );
1363        let strategy = StandardRetryStrategy::new();
1364        let tb = v2_1_token_bucket();
1365        cfg.interceptor_state().store_put(tb.clone());
1366
1367        cfg.interceptor_state().store_put(RequestAttempts::new(1));
1368        let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1369        assert_eq!(should_retry.expect_delay(), Duration::from_millis(50));
1370        assert_eq!(
1371            tb.available_permits(),
1372            DEFAULT_CAPACITY - DEFAULT_RETRY_COST as usize
1373        );
1374
1375        cfg.interceptor_state().store_put(RequestAttempts::new(2));
1376        let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1377        assert_eq!(should_retry.expect_delay(), Duration::from_millis(100));
1378        assert_eq!(
1379            tb.available_permits(),
1380            DEFAULT_CAPACITY - 2 * DEFAULT_RETRY_COST as usize
1381        );
1382
1383        cfg.interceptor_state().store_put(RequestAttempts::new(3));
1384        let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1385        assert_eq!(no_retry, ShouldAttempt::No);
1386        assert_eq!(
1387            tb.available_permits(),
1388            DEFAULT_CAPACITY - 2 * DEFAULT_RETRY_COST as usize
1389        );
1390    }
1391
1392    #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
1393    #[test]
1394    fn retry_quota_reached_after_single_retry() {
1395        let (mut cfg, rc, ctx) = setup_test(
1396            vec![RetryAction::server_error()],
1397            RetryConfig::standard()
1398                .with_use_static_exponential_base(true)
1399                .with_max_attempts(5)
1400                .with_retry_spec(RetrySpec::v2_1()),
1401        );
1402        let strategy = StandardRetryStrategy::new();
1403        let tb = v2_1_token_bucket_with_capacity(DEFAULT_RETRY_COST as usize);
1404        cfg.interceptor_state().store_put(tb.clone());
1405
1406        cfg.interceptor_state().store_put(RequestAttempts::new(1));
1407        let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1408        assert_eq!(should_retry.expect_delay(), Duration::from_millis(50));
1409        assert_eq!(tb.available_permits(), 0);
1410
1411        cfg.interceptor_state().store_put(RequestAttempts::new(2));
1412        let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1413        assert_eq!(no_retry, ShouldAttempt::No);
1414        assert_eq!(tb.available_permits(), 0);
1415    }
1416
1417    #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
1418    #[test]
1419    fn no_retries_if_retry_quota_is_zero() {
1420        let (mut cfg, rc, ctx) = setup_test(
1421            vec![RetryAction::server_error()],
1422            RetryConfig::standard()
1423                .with_use_static_exponential_base(true)
1424                .with_retry_spec(RetrySpec::v2_1()),
1425        );
1426        let strategy = StandardRetryStrategy::new();
1427        let tb = v2_1_token_bucket_with_capacity(0);
1428        cfg.interceptor_state().store_put(tb.clone());
1429
1430        cfg.interceptor_state().store_put(RequestAttempts::new(1));
1431        let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1432        assert_eq!(no_retry, ShouldAttempt::No);
1433        assert_eq!(tb.available_permits(), 0);
1434    }
1435
1436    #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
1437    #[test]
1438    fn retry_stops_after_retry_quota_exhaustion() {
1439        let (mut cfg, rc, ctx) = setup_test(
1440            vec![RetryAction::server_error()],
1441            RetryConfig::standard()
1442                .with_use_static_exponential_base(true)
1443                .with_max_attempts(5)
1444                .with_retry_spec(RetrySpec::v2_1()),
1445        );
1446        let strategy = StandardRetryStrategy::new();
1447        let tb = v2_1_token_bucket_with_capacity(20);
1448        cfg.interceptor_state().store_put(tb.clone());
1449
1450        cfg.interceptor_state().store_put(RequestAttempts::new(1));
1451        let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1452        assert_eq!(should_retry.expect_delay(), Duration::from_millis(50));
1453        assert_eq!(tb.available_permits(), 20 - DEFAULT_RETRY_COST as usize);
1454
1455        cfg.interceptor_state().store_put(RequestAttempts::new(2));
1456        let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1457        assert_eq!(no_retry, ShouldAttempt::No);
1458        assert_eq!(tb.available_permits(), 20 - DEFAULT_RETRY_COST as usize);
1459    }
1460
1461    #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
1462    #[test]
1463    fn retry_quota_recovery_after_successful_responses() {
1464        let (mut cfg, rc, mut ctx) = setup_test(
1465            vec![RetryAction::server_error()],
1466            RetryConfig::standard()
1467                .with_use_static_exponential_base(true)
1468                .with_max_attempts(5)
1469                .with_retry_spec(RetrySpec::v2_1()),
1470        );
1471        let strategy = StandardRetryStrategy::new();
1472        let tb = v2_1_token_bucket_with_capacity(30);
1473        cfg.interceptor_state().store_put(tb.clone());
1474
1475        cfg.interceptor_state().store_put(RequestAttempts::new(1));
1476        let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1477        assert_eq!(should_retry.expect_delay(), Duration::from_millis(50));
1478        assert_eq!(tb.available_permits(), 30 - DEFAULT_RETRY_COST as usize);
1479
1480        cfg.interceptor_state().store_put(RequestAttempts::new(2));
1481        let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1482        assert_eq!(should_retry.expect_delay(), Duration::from_millis(100));
1483        assert_eq!(tb.available_permits(), 30 - 2 * DEFAULT_RETRY_COST as usize);
1484
1485        ctx.set_output_or_error(Ok(Output::doesnt_matter()));
1486        cfg.interceptor_state().store_put(RequestAttempts::new(3));
1487        let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1488        assert_eq!(no_retry, ShouldAttempt::No);
1489        assert_eq!(tb.available_permits(), 30 - DEFAULT_RETRY_COST as usize);
1490
1491        ctx.set_output_or_error(Err(OrchestratorError::other("doesn't matter")));
1492        cfg.interceptor_state().store_put(RequestAttempts::new(1));
1493        let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1494        assert_eq!(should_retry.expect_delay(), Duration::from_millis(50));
1495        assert_eq!(tb.available_permits(), 30 - 2 * DEFAULT_RETRY_COST as usize);
1496
1497        ctx.set_output_or_error(Ok(Output::doesnt_matter()));
1498        cfg.interceptor_state().store_put(RequestAttempts::new(2));
1499        let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1500        assert_eq!(no_retry, ShouldAttempt::No);
1501        assert_eq!(tb.available_permits(), 30 - DEFAULT_RETRY_COST as usize);
1502    }
1503
1504    #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
1505    #[test]
1506    fn throttling_error_token_bucket_drain_and_backoff() {
1507        let (mut cfg, rc, mut ctx) = setup_test(
1508            vec![RetryAction::retryable_error(ErrorKind::ThrottlingError)],
1509            RetryConfig::standard()
1510                .with_use_static_exponential_base(true)
1511                .with_retry_spec(RetrySpec::v2_1()),
1512        );
1513        let strategy = StandardRetryStrategy::new();
1514        let tb = v2_1_token_bucket();
1515        cfg.interceptor_state().store_put(tb.clone());
1516
1517        cfg.interceptor_state().store_put(RequestAttempts::new(1));
1518        let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1519        assert_eq!(should_retry.expect_delay(), Duration::from_secs(1));
1520        assert_eq!(
1521            tb.available_permits(),
1522            DEFAULT_CAPACITY - THROTTLING_RETRY_COST as usize
1523        );
1524
1525        ctx.set_output_or_error(Ok(Output::doesnt_matter()));
1526        cfg.interceptor_state().store_put(RequestAttempts::new(2));
1527        let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1528        assert_eq!(no_retry, ShouldAttempt::No);
1529        assert_eq!(tb.available_permits(), DEFAULT_CAPACITY);
1530    }
1531
1532    #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
1533    #[test]
1534    fn long_polling_backoff_after_throttling_error_when_token_bucket_empty() {
1535        let (mut cfg, rc, ctx) = setup_test(
1536            vec![RetryAction::retryable_error(ErrorKind::ThrottlingError)],
1537            RetryConfig::standard()
1538                .with_use_static_exponential_base(true)
1539                .with_max_attempts(5)
1540                .with_retry_spec(RetrySpec::v2_1().with_long_polling(true)),
1541        );
1542        let strategy = StandardRetryStrategy::new();
1543        let tb = v2_1_token_bucket_with_capacity(0);
1544        cfg.interceptor_state().store_put(tb.clone());
1545        cfg.interceptor_state().store_put(RequestAttempts::new(1));
1546        let hint = LongPollingBackoff::default();
1547        cfg.interceptor_state().store_put(hint.clone());
1548
1549        let result = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1550        assert_eq!(result, ShouldAttempt::No);
1551        assert_eq!(hint.take(), Some(Duration::from_secs(1)));
1552        assert_eq!(tb.available_permits(), 0);
1553    }
1554
1555    #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
1556    #[test]
1557    fn long_polling_max_attempts_exceeded_must_not_delay() {
1558        let (mut cfg, rc, ctx) = setup_test(
1559            vec![RetryAction::server_error()],
1560            RetryConfig::standard()
1561                .with_use_static_exponential_base(true)
1562                .with_max_attempts(2)
1563                .with_retry_spec(RetrySpec::v2_1().with_long_polling(true)),
1564        );
1565        let strategy = StandardRetryStrategy::new();
1566        let tb = v2_1_token_bucket();
1567        cfg.interceptor_state().store_put(tb.clone());
1568
1569        cfg.interceptor_state().store_put(RequestAttempts::new(1));
1570        let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1571        assert_eq!(should_retry.expect_delay(), Duration::from_millis(50));
1572
1573        cfg.interceptor_state().store_put(RequestAttempts::new(2));
1574        let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1575        assert_eq!(no_retry, ShouldAttempt::No);
1576    }
1577
1578    #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
1579    #[test]
1580    fn long_polling_success_must_not_delay() {
1581        let (mut cfg, rc, mut ctx) = setup_test(
1582            vec![RetryAction::server_error()],
1583            RetryConfig::standard()
1584                .with_use_static_exponential_base(true)
1585                .with_max_attempts(2)
1586                .with_retry_spec(RetrySpec::v2_1().with_long_polling(true)),
1587        );
1588        let strategy = StandardRetryStrategy::new();
1589        let tb = v2_1_token_bucket();
1590        cfg.interceptor_state().store_put(tb.clone());
1591
1592        cfg.interceptor_state().store_put(RequestAttempts::new(1));
1593        let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1594        assert_eq!(should_retry.expect_delay(), Duration::from_millis(50));
1595
1596        ctx.set_output_or_error(Ok(Output::doesnt_matter()));
1597        cfg.interceptor_state().store_put(RequestAttempts::new(2));
1598        let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1599        assert_eq!(no_retry, ShouldAttempt::No);
1600    }
1601
1602    #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
1603    #[test]
1604    fn long_polling_non_retryable_errors_must_not_delay() {
1605        let (mut cfg, rc, ctx) = setup_test(
1606            vec![RetryAction::NoActionIndicated],
1607            RetryConfig::standard()
1608                .with_use_static_exponential_base(true)
1609                .with_max_attempts(2)
1610                .with_retry_spec(RetrySpec::v2_1().with_long_polling(true)),
1611        );
1612        let strategy = StandardRetryStrategy::new();
1613        let tb = v2_1_token_bucket();
1614        cfg.interceptor_state().store_put(tb.clone());
1615        cfg.interceptor_state().store_put(RequestAttempts::new(1));
1616
1617        let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1618        assert_eq!(no_retry, ShouldAttempt::No);
1619    }
1620
1621    #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
1622    #[test]
1623    fn verify_max_backoff_time() {
1624        let (mut cfg, rc, ctx) = setup_test(
1625            vec![RetryAction::server_error()],
1626            RetryConfig::standard()
1627                .with_use_static_exponential_base(true)
1628                .with_max_attempts(5)
1629                .with_max_backoff(Duration::from_millis(200))
1630                .with_retry_spec(RetrySpec::v2_1()),
1631        );
1632        let strategy = StandardRetryStrategy::new();
1633        let tb = v2_1_token_bucket();
1634        cfg.interceptor_state().store_put(tb.clone());
1635
1636        cfg.interceptor_state().store_put(RequestAttempts::new(1));
1637        let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1638        assert_eq!(should_retry.expect_delay(), Duration::from_millis(50));
1639        assert_eq!(
1640            tb.available_permits(),
1641            DEFAULT_CAPACITY - DEFAULT_RETRY_COST as usize
1642        );
1643
1644        cfg.interceptor_state().store_put(RequestAttempts::new(2));
1645        let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1646        assert_eq!(should_retry.expect_delay(), Duration::from_millis(100));
1647        assert_eq!(
1648            tb.available_permits(),
1649            DEFAULT_CAPACITY - 2 * DEFAULT_RETRY_COST as usize
1650        );
1651
1652        cfg.interceptor_state().store_put(RequestAttempts::new(3));
1653        let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1654        assert_eq!(should_retry.expect_delay(), Duration::from_millis(200));
1655        assert_eq!(
1656            tb.available_permits(),
1657            DEFAULT_CAPACITY - 3 * DEFAULT_RETRY_COST as usize
1658        );
1659
1660        // 50ms * 2^3 = 400ms, capped at max_backoff 200ms
1661        cfg.interceptor_state().store_put(RequestAttempts::new(4));
1662        let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1663        assert_eq!(should_retry.expect_delay(), Duration::from_millis(200));
1664        assert_eq!(
1665            tb.available_permits(),
1666            DEFAULT_CAPACITY - 4 * DEFAULT_RETRY_COST as usize
1667        );
1668
1669        cfg.interceptor_state().store_put(RequestAttempts::new(5));
1670        let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1671        assert_eq!(no_retry, ShouldAttempt::No);
1672        assert_eq!(
1673            tb.available_permits(),
1674            DEFAULT_CAPACITY - 4 * DEFAULT_RETRY_COST as usize
1675        );
1676    }
1677}