1use std::sync::Mutex;
7use std::time::{Duration, SystemTime};
8
9use tokio::sync::OwnedSemaphorePermit;
10use tracing::{debug, trace};
11
12use aws_smithy_runtime_api::box_error::BoxError;
13use aws_smithy_runtime_api::client::interceptors::context::{
14 BeforeTransmitInterceptorContextMut, InterceptorContext,
15};
16use aws_smithy_runtime_api::client::interceptors::{dyn_dispatch_hint, Intercept};
17use aws_smithy_runtime_api::client::retries::classifiers::{RetryAction, RetryReason};
18use aws_smithy_runtime_api::client::retries::{RequestAttempts, RetryStrategy, ShouldAttempt};
19use aws_smithy_runtime_api::client::runtime_components::RuntimeComponents;
20use aws_smithy_types::config_bag::{ConfigBag, Layer, Storable, StoreReplace};
21use aws_smithy_types::retry::{ErrorKind, RetryConfig, RetryMode, RetrySpec};
22
23use crate::client::retries::classifiers::run_classifiers_on_ctx;
24use crate::client::retries::client_rate_limiter::{ClientRateLimiter, RequestReason};
25use crate::client::retries::strategy::standard::ReleaseResult::{
26 APermitWasReleased, NoPermitWasReleased,
27};
28use crate::client::retries::token_bucket::TokenBucket;
29use crate::client::retries::{
30 ClientRateLimiterPartition, LongPollingBackoff, RetryPartition, RetryPartitionInner,
31};
32use crate::static_partition_map::StaticPartitionMap;
33
34static CLIENT_RATE_LIMITER: StaticPartitionMap<ClientRateLimiterPartition, ClientRateLimiter> =
35 StaticPartitionMap::new();
36
37static TOKEN_BUCKET: StaticPartitionMap<RetryPartition, TokenBucket> = StaticPartitionMap::new();
39
40#[derive(Debug, Default)]
42pub struct StandardRetryStrategy {
43 retry_permit: Mutex<Option<OwnedSemaphorePermit>>,
44}
45
46impl Storable for StandardRetryStrategy {
47 type Storer = StoreReplace<Self>;
48}
49
50impl StandardRetryStrategy {
51 pub fn new() -> Self {
53 Default::default()
54 }
55
56 fn release_retry_permit(&self, token_bucket: &TokenBucket) -> ReleaseResult {
57 let mut retry_permit = self.retry_permit.lock().unwrap();
58 match retry_permit.take() {
59 Some(p) => {
60 if token_bucket.success_reward() > 0.0 {
62 token_bucket.reward_success();
63 p.forget();
64 } else {
65 drop(p); }
67 APermitWasReleased
68 }
69 None => {
70 if token_bucket.success_reward() > 0.0 {
72 token_bucket.reward_success();
73 } else {
74 token_bucket.regenerate_a_token();
75 }
76 NoPermitWasReleased
77 }
78 }
79 }
80
81 fn set_retry_permit(&self, new_retry_permit: OwnedSemaphorePermit) {
82 let mut old_retry_permit = self.retry_permit.lock().unwrap();
83 if let Some(p) = old_retry_permit.replace(new_retry_permit) {
84 p.forget()
87 }
88 }
89
90 fn adaptive_retry_rate_limiter(
92 runtime_components: &RuntimeComponents,
93 cfg: &ConfigBag,
94 ) -> Option<ClientRateLimiter> {
95 let retry_config = cfg.load::<RetryConfig>().expect("retry config is required");
96 if retry_config.mode() == RetryMode::Adaptive {
97 if let Some(time_source) = runtime_components.time_source() {
98 let retry_partition = cfg.load::<RetryPartition>().expect("set in default config");
99 let seconds_since_unix_epoch = time_source
100 .now()
101 .duration_since(SystemTime::UNIX_EPOCH)
102 .expect("the present takes place after the UNIX_EPOCH")
103 .as_secs_f64();
104 let client_rate_limiter = match &retry_partition.inner {
105 RetryPartitionInner::Default(_) => {
106 let client_rate_limiter_partition =
107 ClientRateLimiterPartition::new(retry_partition.clone());
108 CLIENT_RATE_LIMITER.get_or_init(client_rate_limiter_partition, || {
109 ClientRateLimiter::new(seconds_since_unix_epoch)
110 })
111 }
112 RetryPartitionInner::Custom {
113 client_rate_limiter,
114 ..
115 } => client_rate_limiter.clone(),
116 };
117 return Some(client_rate_limiter);
118 }
119 }
120 None
121 }
122
123 fn calculate_backoff(
124 &self,
125 runtime_components: &RuntimeComponents,
126 cfg: &ConfigBag,
127 retry_cfg: &RetryConfig,
128 retry_reason: &RetryAction,
129 ) -> Result<Duration, ShouldAttempt> {
130 let request_attempts = cfg
131 .load::<RequestAttempts>()
132 .expect("at least one request attempt is made before any retry is attempted")
133 .attempts();
134
135 match retry_reason {
136 RetryAction::RetryIndicated(RetryReason::RetryableError { kind, retry_after }) => {
137 let initial_backoff = if *kind != ErrorKind::ThrottlingError {
138 retry_cfg
139 .retry_spec()
140 .map(|s| s.non_throttling_initial_backoff())
141 .unwrap_or(retry_cfg.initial_backoff())
142 .as_secs_f64()
143 } else {
144 retry_cfg.initial_backoff().as_secs_f64()
145 };
146
147 if let Some(delay) = check_rate_limiter_for_delay(runtime_components, cfg, *kind) {
148 let delay = delay.min(retry_cfg.max_backoff());
149 debug!("rate limiter has requested a {delay:?} delay before retrying");
150 Ok(delay)
151 } else {
152 let base = if retry_cfg.use_static_exponential_base() {
153 1.0
154 } else {
155 fastrand::f64()
156 };
157 let t_i = calculate_exponential_backoff(
158 base,
159 initial_backoff,
160 request_attempts - 1,
161 retry_cfg.max_backoff(),
162 );
163
164 if let Some(retry_after) = *retry_after {
165 if retry_cfg
166 .retry_spec()
167 .is_some_and(|s| s.is_at_least(RetrySpec::V2_1))
168 {
169 let delay = retry_after.clamp(t_i, t_i + Duration::from_secs(5));
170 debug!("x-amz-retry-after bounded to {delay:?} (t_i={t_i:?})");
171 Ok(delay)
172 } else {
173 let delay = retry_after.min(retry_cfg.max_backoff());
174 debug!(
175 "explicit request from server to delay {delay:?} before retrying"
176 );
177 Ok(delay)
178 }
179 } else {
180 Ok(t_i)
181 }
182 }
183 }
184 RetryAction::RetryForbidden | RetryAction::NoActionIndicated => {
185 debug!(
186 attempts = request_attempts,
187 max_attempts = retry_cfg.max_attempts(),
188 "encountered un-retryable error"
189 );
190 Err(ShouldAttempt::No)
191 }
192 _ => unreachable!("RetryAction is non-exhaustive"),
193 }
194 }
195}
196
197enum ReleaseResult {
198 APermitWasReleased,
199 NoPermitWasReleased,
200}
201
202impl RetryStrategy for StandardRetryStrategy {
203 fn should_attempt_initial_request(
204 &self,
205 runtime_components: &RuntimeComponents,
206 cfg: &ConfigBag,
207 ) -> Result<ShouldAttempt, BoxError> {
208 if let Some(crl) = Self::adaptive_retry_rate_limiter(runtime_components, cfg) {
209 let seconds_since_unix_epoch = get_seconds_since_unix_epoch(runtime_components);
210 if let Err(delay) = crl.acquire_permission_to_send_a_request(
211 seconds_since_unix_epoch,
212 RequestReason::InitialRequest,
213 ) {
214 return Ok(ShouldAttempt::YesAfterDelay(delay));
215 }
216 } else {
217 debug!("no client rate limiter configured, so no token is required for the initial request.");
218 }
219
220 Ok(ShouldAttempt::Yes)
221 }
222
223 fn should_attempt_retry(
224 &self,
225 ctx: &InterceptorContext,
226 runtime_components: &RuntimeComponents,
227 cfg: &ConfigBag,
228 ) -> Result<ShouldAttempt, BoxError> {
229 let retry_cfg = cfg.load::<RetryConfig>().expect("retry config is required");
230
231 let token_bucket = cfg.load::<TokenBucket>().expect("token bucket is required");
233 let retry_classifiers = runtime_components.retry_classifiers();
235 let classifier_result = run_classifiers_on_ctx(retry_classifiers, ctx);
236
237 let error_kind = error_kind(&classifier_result);
242 let is_throttling_error = error_kind
243 .map(|kind| kind == ErrorKind::ThrottlingError)
244 .unwrap_or(false);
245 update_rate_limiter_if_exists(runtime_components, cfg, is_throttling_error);
246
247 if !ctx.is_failed() {
249 self.release_retry_permit(token_bucket);
250 }
251 let request_attempts = cfg
254 .load::<RequestAttempts>()
255 .expect("at least one request attempt is made before any retry is attempted")
256 .attempts();
257
258 if !classifier_result.should_retry() {
260 debug!(
261 "attempt #{request_attempts} classified as {:?}, not retrying",
262 classifier_result
263 );
264 return Ok(ShouldAttempt::No);
265 }
266
267 if request_attempts >= retry_cfg.max_attempts() {
269 debug!(
270 attempts = request_attempts,
271 max_attempts = retry_cfg.max_attempts(),
272 "not retrying because we are out of attempts"
273 );
274 return Ok(ShouldAttempt::No);
275 }
276
277 let error_kind = error_kind.expect("result was classified retryable");
279 let is_long_polling = retry_cfg.retry_spec().is_some_and(|s| s.long_polling());
280
281 let backoff =
284 match self.calculate_backoff(runtime_components, cfg, retry_cfg, &classifier_result) {
285 Ok(value) => value,
286 Err(value) => return Ok(value),
287 };
288
289 match token_bucket.acquire(
291 &error_kind,
292 &runtime_components.time_source().unwrap_or_default(),
293 ) {
294 Some(permit) => self.set_retry_permit(permit),
295 None => {
296 debug!("attempt #{request_attempts} failed with {error_kind:?}; not enough retry quota.");
297 if is_long_polling {
298 if let Some(hint) = cfg.load::<LongPollingBackoff>() {
299 hint.set(backoff);
300 }
301 }
302 return Ok(ShouldAttempt::No);
303 }
304 }
305
306 debug!(
307 "attempt #{request_attempts} failed with {:?}; retrying after {:?}",
308 classifier_result, backoff
309 );
310 Ok(ShouldAttempt::YesAfterDelay(backoff))
311 }
312}
313
314fn error_kind(classifier_result: &RetryAction) -> Option<ErrorKind> {
316 match classifier_result {
317 RetryAction::RetryIndicated(RetryReason::RetryableError { kind, .. }) => Some(*kind),
318 _ => None,
319 }
320}
321
322fn update_rate_limiter_if_exists(
323 runtime_components: &RuntimeComponents,
324 cfg: &ConfigBag,
325 is_throttling_error: bool,
326) {
327 if let Some(crl) = StandardRetryStrategy::adaptive_retry_rate_limiter(runtime_components, cfg) {
328 let seconds_since_unix_epoch = get_seconds_since_unix_epoch(runtime_components);
329 crl.update_rate_limiter(seconds_since_unix_epoch, is_throttling_error);
330 }
331}
332
333fn check_rate_limiter_for_delay(
334 runtime_components: &RuntimeComponents,
335 cfg: &ConfigBag,
336 kind: ErrorKind,
337) -> Option<Duration> {
338 if let Some(crl) = StandardRetryStrategy::adaptive_retry_rate_limiter(runtime_components, cfg) {
339 let retry_reason = if kind == ErrorKind::ThrottlingError {
340 RequestReason::RetryTimeout
341 } else {
342 RequestReason::Retry
343 };
344 if let Err(delay) = crl.acquire_permission_to_send_a_request(
345 get_seconds_since_unix_epoch(runtime_components),
346 retry_reason,
347 ) {
348 return Some(delay);
349 }
350 }
351
352 None
353}
354
355pub(super) fn calculate_exponential_backoff(
356 base: f64,
357 initial_backoff: f64,
358 retry_attempts: u32,
359 max_backoff: Duration,
360) -> Duration {
361 let result = match 2_u32
362 .checked_pow(retry_attempts)
363 .map(|power| (power as f64) * initial_backoff)
364 {
365 Some(backoff) => match Duration::try_from_secs_f64(backoff) {
366 Ok(result) => result.min(max_backoff),
367 Err(e) => {
368 tracing::warn!("falling back to {max_backoff:?} as `Duration` could not be created for exponential backoff: {e}");
369 max_backoff
370 }
371 },
372 None => max_backoff,
373 };
374
375 result.mul_f64(base)
378}
379
380pub(super) fn get_seconds_since_unix_epoch(runtime_components: &RuntimeComponents) -> f64 {
381 let request_time = runtime_components
382 .time_source()
383 .expect("time source required for retries");
384 request_time
385 .now()
386 .duration_since(SystemTime::UNIX_EPOCH)
387 .unwrap()
388 .as_secs_f64()
389}
390
391#[derive(Debug)]
395pub(crate) struct TokenBucketProvider {
396 default_partition: RetryPartition,
397 token_bucket: TokenBucket,
398}
399
400impl TokenBucketProvider {
401 pub(crate) fn new(
406 default_partition: RetryPartition,
407 init: impl FnOnce() -> TokenBucket,
408 ) -> Self {
409 let token_bucket = TOKEN_BUCKET.get_or_init(default_partition.clone(), init);
410 Self {
411 default_partition,
412 token_bucket,
413 }
414 }
415}
416
417#[dyn_dispatch_hint]
418impl Intercept for TokenBucketProvider {
419 fn name(&self) -> &'static str {
420 "TokenBucketProvider"
421 }
422
423 fn modify_before_retry_loop(
424 &self,
425 _context: &mut BeforeTransmitInterceptorContextMut<'_>,
426 _runtime_components: &RuntimeComponents,
427 cfg: &mut ConfigBag,
428 ) -> Result<(), BoxError> {
429 let retry_partition = cfg.load::<RetryPartition>().expect("set in default config");
430
431 let tb = match &retry_partition.inner {
432 RetryPartitionInner::Default(name) => {
433 if name == self.default_partition.name() {
437 self.token_bucket.clone()
439 } else {
440 TOKEN_BUCKET.get_or_init_default(retry_partition.clone())
441 }
442 }
443 RetryPartitionInner::Custom { token_bucket, .. } => token_bucket.clone(),
444 };
445
446 trace!("token bucket for {retry_partition:?} added to config bag");
447 let mut layer = Layer::new("token_bucket_partition");
448 layer.store_put(tb);
449 cfg.push_layer(layer);
450 Ok(())
451 }
452}
453
454#[cfg(test)]
455mod tests {
456 #[allow(unused_imports)] use std::fmt;
458 use std::sync::Mutex;
459 use std::time::Duration;
460
461 use aws_smithy_async::time::SystemTimeSource;
462 use aws_smithy_runtime_api::client::interceptors::context::{
463 Input, InterceptorContext, Output,
464 };
465 use aws_smithy_runtime_api::client::orchestrator::OrchestratorError;
466 use aws_smithy_runtime_api::client::retries::classifiers::{
467 ClassifyRetry, RetryAction, SharedRetryClassifier,
468 };
469 use aws_smithy_runtime_api::client::retries::{
470 AlwaysRetry, RequestAttempts, RetryStrategy, ShouldAttempt,
471 };
472 use aws_smithy_runtime_api::client::runtime_components::{
473 RuntimeComponents, RuntimeComponentsBuilder,
474 };
475 use aws_smithy_types::config_bag::{ConfigBag, Layer};
476 use aws_smithy_types::retry::{ErrorKind, RetryConfig};
477
478 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
479 use aws_smithy_types::retry::RetrySpec;
480
481 use super::{calculate_exponential_backoff, StandardRetryStrategy};
482 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
483 use crate::client::retries::token_bucket::{
484 DEFAULT_CAPACITY, DEFAULT_RETRY_COST, DEFAULT_RETRY_TIMEOUT_COST, THROTTLING_RETRY_COST,
485 };
486 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
487 use crate::client::retries::LongPollingBackoff;
488 use crate::client::retries::{ClientRateLimiter, RetryPartition, TokenBucket};
489
490 #[test]
491 fn no_retry_necessary_for_ok_result() {
492 let cfg = ConfigBag::of_layers(vec![{
493 let mut layer = Layer::new("test");
494 layer.store_put(RetryConfig::standard());
495 layer.store_put(RequestAttempts::new(1));
496 layer.store_put(TokenBucket::default());
497 layer
498 }]);
499 let rc = RuntimeComponentsBuilder::for_tests().build().unwrap();
500 let mut ctx = InterceptorContext::new(Input::doesnt_matter());
501 let strategy = StandardRetryStrategy::default();
502 ctx.set_output_or_error(Ok(Output::doesnt_matter()));
503
504 let actual = strategy
505 .should_attempt_retry(&ctx, &rc, &cfg)
506 .expect("method is infallible for this use");
507 assert_eq!(ShouldAttempt::No, actual);
508 }
509
510 fn set_up_cfg_and_context(
511 error_kind: ErrorKind,
512 current_request_attempts: u32,
513 retry_config: RetryConfig,
514 ) -> (InterceptorContext, RuntimeComponents, ConfigBag) {
515 let mut ctx = InterceptorContext::new(Input::doesnt_matter());
516 ctx.set_output_or_error(Err(OrchestratorError::other("doesn't matter")));
517 let rc = RuntimeComponentsBuilder::for_tests()
518 .with_retry_classifier(SharedRetryClassifier::new(AlwaysRetry(error_kind)))
519 .build()
520 .unwrap();
521 let mut layer = Layer::new("test");
522 layer.store_put(RequestAttempts::new(current_request_attempts));
523 layer.store_put(retry_config);
524 layer.store_put(TokenBucket::default());
525 let cfg = ConfigBag::of_layers(vec![layer]);
526
527 (ctx, rc, cfg)
528 }
529
530 fn test_should_retry_error_kind(error_kind: ErrorKind) {
533 let (ctx, rc, cfg) = set_up_cfg_and_context(
534 error_kind,
535 3,
536 RetryConfig::standard()
537 .with_use_static_exponential_base(true)
538 .with_max_attempts(4),
539 );
540 let strategy = StandardRetryStrategy::new();
541 let actual = strategy
542 .should_attempt_retry(&ctx, &rc, &cfg)
543 .expect("method is infallible for this use");
544 assert_eq!(ShouldAttempt::YesAfterDelay(Duration::from_secs(4)), actual);
545 }
546
547 #[test]
548 fn should_retry_transient_error_result_after_2s() {
549 test_should_retry_error_kind(ErrorKind::TransientError);
550 }
551
552 #[test]
553 fn should_retry_client_error_result_after_2s() {
554 test_should_retry_error_kind(ErrorKind::ClientError);
555 }
556
557 #[test]
558 fn should_retry_server_error_result_after_2s() {
559 test_should_retry_error_kind(ErrorKind::ServerError);
560 }
561
562 #[test]
563 fn should_retry_throttling_error_result_after_2s() {
564 test_should_retry_error_kind(ErrorKind::ThrottlingError);
565 }
566
567 #[test]
568 fn dont_retry_when_out_of_attempts() {
569 let current_attempts = 4;
570 let max_attempts = current_attempts;
571 let (ctx, rc, cfg) = set_up_cfg_and_context(
572 ErrorKind::TransientError,
573 current_attempts,
574 RetryConfig::standard()
575 .with_use_static_exponential_base(true)
576 .with_max_attempts(max_attempts),
577 );
578 let strategy = StandardRetryStrategy::new();
579 let actual = strategy
580 .should_attempt_retry(&ctx, &rc, &cfg)
581 .expect("method is infallible for this use");
582 assert_eq!(ShouldAttempt::No, actual);
583 }
584
585 #[test]
586 fn should_not_panic_when_exponential_backoff_duration_could_not_be_created() {
587 let (ctx, rc, cfg) = set_up_cfg_and_context(
588 ErrorKind::TransientError,
589 33,
591 RetryConfig::standard()
592 .with_use_static_exponential_base(true)
593 .with_max_attempts(100), );
595 let strategy = StandardRetryStrategy::new();
596 let actual = strategy
597 .should_attempt_retry(&ctx, &rc, &cfg)
598 .expect("method is infallible for this use");
599 assert_eq!(ShouldAttempt::YesAfterDelay(MAX_BACKOFF), actual);
600 }
601
602 #[test]
603 fn should_yield_client_rate_limiter_from_custom_partition() {
604 let expected = ClientRateLimiter::builder().token_refill_rate(3.14).build();
605 let cfg = ConfigBag::of_layers(vec![
606 {
608 let mut layer = Layer::new("default");
609 layer.store_put(RetryPartition::new("default"));
610 layer
611 },
612 {
613 let mut layer = Layer::new("user");
614 layer.store_put(RetryConfig::adaptive());
615 layer.store_put(
616 RetryPartition::custom("user")
617 .client_rate_limiter(expected.clone())
618 .build(),
619 );
620 layer
621 },
622 ]);
623 let rc = RuntimeComponentsBuilder::for_tests()
624 .with_time_source(Some(SystemTimeSource::new()))
625 .build()
626 .unwrap();
627 let actual = StandardRetryStrategy::adaptive_retry_rate_limiter(&rc, &cfg)
628 .expect("should yield client rate limiter from custom partition");
629 assert!(std::sync::Arc::ptr_eq(&expected.inner, &actual.inner));
630 }
631
632 #[allow(dead_code)] #[derive(Debug)]
634 struct PresetReasonRetryClassifier {
635 retry_actions: Mutex<Vec<RetryAction>>,
636 }
637
638 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
639 impl PresetReasonRetryClassifier {
640 fn new(mut retry_reasons: Vec<RetryAction>) -> Self {
641 retry_reasons.reverse();
643 Self {
644 retry_actions: Mutex::new(retry_reasons),
645 }
646 }
647 }
648
649 impl ClassifyRetry for PresetReasonRetryClassifier {
650 fn classify_retry(&self, ctx: &InterceptorContext) -> RetryAction {
651 let output_or_error = ctx.output_or_error();
653 match output_or_error {
655 Some(Ok(_)) | None => return RetryAction::NoActionIndicated,
656 _ => (),
657 };
658
659 let mut retry_actions = self.retry_actions.lock().unwrap();
660 if retry_actions.len() == 1 {
661 retry_actions.first().unwrap().clone()
662 } else {
663 retry_actions.pop().unwrap()
664 }
665 }
666
667 fn name(&self) -> &'static str {
668 "Always returns a preset retry reason"
669 }
670 }
671
672 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
673 fn setup_test(
674 retry_reasons: Vec<RetryAction>,
675 retry_config: RetryConfig,
676 ) -> (ConfigBag, RuntimeComponents, InterceptorContext) {
677 let rc = RuntimeComponentsBuilder::for_tests()
678 .with_retry_classifier(SharedRetryClassifier::new(
679 PresetReasonRetryClassifier::new(retry_reasons),
680 ))
681 .build()
682 .unwrap();
683 let mut layer = Layer::new("test");
684 layer.store_put(retry_config);
685 let cfg = ConfigBag::of_layers(vec![layer]);
686 let mut ctx = InterceptorContext::new(Input::doesnt_matter());
687 ctx.set_output_or_error(Err(OrchestratorError::other("doesn't matter")));
689
690 (cfg, rc, ctx)
691 }
692
693 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
694 #[test]
695 fn eventual_success() {
696 let (mut cfg, rc, mut ctx) = setup_test(
697 vec![RetryAction::server_error()],
698 RetryConfig::standard()
699 .with_use_static_exponential_base(true)
700 .with_max_attempts(5),
701 );
702 let strategy = StandardRetryStrategy::new();
703 cfg.interceptor_state().store_put(TokenBucket::default());
704 let token_bucket = cfg.load::<TokenBucket>().unwrap().clone();
705
706 cfg.interceptor_state().store_put(RequestAttempts::new(1));
707 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
708 let dur = should_retry.expect_delay();
709 assert_eq!(dur, Duration::from_secs(1));
710 assert_eq!(token_bucket.available_permits(), 495);
711
712 cfg.interceptor_state().store_put(RequestAttempts::new(2));
713 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
714 let dur = should_retry.expect_delay();
715 assert_eq!(dur, Duration::from_secs(2));
716 assert_eq!(token_bucket.available_permits(), 490);
717
718 ctx.set_output_or_error(Ok(Output::doesnt_matter()));
719
720 cfg.interceptor_state().store_put(RequestAttempts::new(3));
721 let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
722 assert_eq!(no_retry, ShouldAttempt::No);
723 assert_eq!(token_bucket.available_permits(), 495);
724 }
725
726 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
727 #[test]
728 fn no_more_attempts() {
729 let (mut cfg, rc, ctx) = setup_test(
730 vec![RetryAction::server_error()],
731 RetryConfig::standard()
732 .with_use_static_exponential_base(true)
733 .with_max_attempts(3),
734 );
735 let strategy = StandardRetryStrategy::new();
736 cfg.interceptor_state().store_put(TokenBucket::default());
737 let token_bucket = cfg.load::<TokenBucket>().unwrap().clone();
738
739 cfg.interceptor_state().store_put(RequestAttempts::new(1));
740 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
741 let dur = should_retry.expect_delay();
742 assert_eq!(dur, Duration::from_secs(1));
743 assert_eq!(token_bucket.available_permits(), 495);
744
745 cfg.interceptor_state().store_put(RequestAttempts::new(2));
746 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
747 let dur = should_retry.expect_delay();
748 assert_eq!(dur, Duration::from_secs(2));
749 assert_eq!(token_bucket.available_permits(), 490);
750
751 cfg.interceptor_state().store_put(RequestAttempts::new(3));
752 let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
753 assert_eq!(no_retry, ShouldAttempt::No);
754 assert_eq!(token_bucket.available_permits(), 490);
755 }
756
757 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
758 #[test]
759 fn successful_request_and_deser_should_be_retryable() {
760 #[derive(Clone, Copy, Debug)]
761 enum LongRunningOperationStatus {
762 Running,
763 Complete,
764 }
765
766 #[derive(Debug)]
767 struct LongRunningOperationOutput {
768 status: Option<LongRunningOperationStatus>,
769 }
770
771 impl LongRunningOperationOutput {
772 fn status(&self) -> Option<LongRunningOperationStatus> {
773 self.status
774 }
775 }
776
777 struct WaiterRetryClassifier {}
778
779 impl WaiterRetryClassifier {
780 fn new() -> Self {
781 WaiterRetryClassifier {}
782 }
783 }
784
785 impl fmt::Debug for WaiterRetryClassifier {
786 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
787 write!(f, "WaiterRetryClassifier")
788 }
789 }
790 impl ClassifyRetry for WaiterRetryClassifier {
791 fn classify_retry(&self, ctx: &InterceptorContext) -> RetryAction {
792 let status: Option<LongRunningOperationStatus> =
793 ctx.output_or_error().and_then(|res| {
794 res.ok().and_then(|output| {
795 output
796 .downcast_ref::<LongRunningOperationOutput>()
797 .and_then(|output| output.status())
798 })
799 });
800
801 if let Some(LongRunningOperationStatus::Running) = status {
802 return RetryAction::server_error();
803 };
804
805 RetryAction::NoActionIndicated
806 }
807
808 fn name(&self) -> &'static str {
809 "waiter retry classifier"
810 }
811 }
812
813 let retry_config = RetryConfig::standard()
814 .with_use_static_exponential_base(true)
815 .with_max_attempts(5);
816
817 let rc = RuntimeComponentsBuilder::for_tests()
818 .with_retry_classifier(SharedRetryClassifier::new(WaiterRetryClassifier::new()))
819 .build()
820 .unwrap();
821 let mut layer = Layer::new("test");
822 layer.store_put(retry_config);
823 let mut cfg = ConfigBag::of_layers(vec![layer]);
824 let mut ctx = InterceptorContext::new(Input::doesnt_matter());
825 let strategy = StandardRetryStrategy::new();
826
827 ctx.set_output_or_error(Ok(Output::erase(LongRunningOperationOutput {
828 status: Some(LongRunningOperationStatus::Running),
829 })));
830
831 cfg.interceptor_state().store_put(TokenBucket::new(5));
832 let token_bucket = cfg.load::<TokenBucket>().unwrap().clone();
833
834 cfg.interceptor_state().store_put(RequestAttempts::new(1));
835 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
836 let dur = should_retry.expect_delay();
837 assert_eq!(dur, Duration::from_secs(1));
838 assert_eq!(token_bucket.available_permits(), 0);
839
840 ctx.set_output_or_error(Ok(Output::erase(LongRunningOperationOutput {
841 status: Some(LongRunningOperationStatus::Complete),
842 })));
843 cfg.interceptor_state().store_put(RequestAttempts::new(2));
844 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
845 should_retry.expect_no();
846 assert_eq!(token_bucket.available_permits(), 5);
847 }
848
849 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
850 #[test]
851 fn no_quota() {
852 let (mut cfg, rc, ctx) = setup_test(
853 vec![RetryAction::server_error()],
854 RetryConfig::standard()
855 .with_use_static_exponential_base(true)
856 .with_max_attempts(5),
857 );
858 let strategy = StandardRetryStrategy::new();
859 cfg.interceptor_state().store_put(TokenBucket::new(5));
860 let token_bucket = cfg.load::<TokenBucket>().unwrap().clone();
861
862 cfg.interceptor_state().store_put(RequestAttempts::new(1));
863 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
864 let dur = should_retry.expect_delay();
865 assert_eq!(dur, Duration::from_secs(1));
866 assert_eq!(token_bucket.available_permits(), 0);
867
868 cfg.interceptor_state().store_put(RequestAttempts::new(2));
869 let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
870 assert_eq!(no_retry, ShouldAttempt::No);
871 assert_eq!(token_bucket.available_permits(), 0);
872 }
873
874 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
875 #[test]
876 fn quota_replenishes_on_success() {
877 let (mut cfg, rc, mut ctx) = setup_test(
878 vec![
879 RetryAction::transient_error(),
880 RetryAction::retryable_error_with_explicit_delay(
881 ErrorKind::TransientError,
882 Duration::from_secs(1),
883 ),
884 ],
885 RetryConfig::standard()
886 .with_use_static_exponential_base(true)
887 .with_max_attempts(5),
888 );
889 let strategy = StandardRetryStrategy::new();
890 cfg.interceptor_state().store_put(TokenBucket::new(100));
891 let token_bucket = cfg.load::<TokenBucket>().unwrap().clone();
892
893 cfg.interceptor_state().store_put(RequestAttempts::new(1));
894 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
895 let dur = should_retry.expect_delay();
896 assert_eq!(dur, Duration::from_secs(1));
897 assert_eq!(token_bucket.available_permits(), 90);
898
899 cfg.interceptor_state().store_put(RequestAttempts::new(2));
900 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
901 let dur = should_retry.expect_delay();
902 assert_eq!(dur, Duration::from_secs(1));
903 assert_eq!(token_bucket.available_permits(), 80);
904
905 ctx.set_output_or_error(Ok(Output::doesnt_matter()));
906
907 cfg.interceptor_state().store_put(RequestAttempts::new(3));
908 let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
909 assert_eq!(no_retry, ShouldAttempt::No);
910
911 assert_eq!(token_bucket.available_permits(), 90);
912 }
913
914 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
915 #[test]
916 fn quota_replenishes_on_first_try_success() {
917 const PERMIT_COUNT: usize = 20;
918 let (mut cfg, rc, mut ctx) = setup_test(
919 vec![RetryAction::transient_error()],
920 RetryConfig::standard()
921 .with_use_static_exponential_base(true)
922 .with_max_attempts(u32::MAX),
923 );
924 let strategy = StandardRetryStrategy::new();
925 cfg.interceptor_state()
926 .store_put(TokenBucket::new(PERMIT_COUNT));
927 let token_bucket = cfg.load::<TokenBucket>().unwrap().clone();
928
929 let mut attempt = 1;
930
931 while token_bucket.available_permits() > 0 {
933 if attempt > 2 {
935 panic!("This test should have completed by now (drain)");
936 }
937
938 cfg.interceptor_state()
939 .store_put(RequestAttempts::new(attempt));
940 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
941 assert!(matches!(should_retry, ShouldAttempt::YesAfterDelay(_)));
942 attempt += 1;
943 }
944
945 let permit = strategy.retry_permit.lock().unwrap().take().unwrap();
947 permit.forget();
948
949 ctx.set_output_or_error(Ok(Output::doesnt_matter()));
950
951 while token_bucket.available_permits() < PERMIT_COUNT {
953 if attempt > 23 {
954 panic!("This test should have completed by now (fill-up)");
955 }
956
957 cfg.interceptor_state()
958 .store_put(RequestAttempts::new(attempt));
959 let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
960 assert_eq!(no_retry, ShouldAttempt::No);
961 attempt += 1;
962 }
963
964 assert_eq!(attempt, 23);
965 assert_eq!(token_bucket.available_permits(), PERMIT_COUNT);
966 }
967
968 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
969 #[test]
970 fn backoff_timing() {
971 let (mut cfg, rc, ctx) = setup_test(
972 vec![RetryAction::server_error()],
973 RetryConfig::standard()
974 .with_use_static_exponential_base(true)
975 .with_max_attempts(5),
976 );
977 let strategy = StandardRetryStrategy::new();
978 cfg.interceptor_state().store_put(TokenBucket::default());
979 let token_bucket = cfg.load::<TokenBucket>().unwrap().clone();
980
981 cfg.interceptor_state().store_put(RequestAttempts::new(1));
982 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
983 let dur = should_retry.expect_delay();
984 assert_eq!(dur, Duration::from_secs(1));
985 assert_eq!(token_bucket.available_permits(), 495);
986
987 cfg.interceptor_state().store_put(RequestAttempts::new(2));
988 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
989 let dur = should_retry.expect_delay();
990 assert_eq!(dur, Duration::from_secs(2));
991 assert_eq!(token_bucket.available_permits(), 490);
992
993 cfg.interceptor_state().store_put(RequestAttempts::new(3));
994 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
995 let dur = should_retry.expect_delay();
996 assert_eq!(dur, Duration::from_secs(4));
997 assert_eq!(token_bucket.available_permits(), 485);
998
999 cfg.interceptor_state().store_put(RequestAttempts::new(4));
1000 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1001 let dur = should_retry.expect_delay();
1002 assert_eq!(dur, Duration::from_secs(8));
1003 assert_eq!(token_bucket.available_permits(), 480);
1004
1005 cfg.interceptor_state().store_put(RequestAttempts::new(5));
1006 let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1007 assert_eq!(no_retry, ShouldAttempt::No);
1008 assert_eq!(token_bucket.available_permits(), 480);
1009 }
1010
1011 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
1012 #[test]
1013 fn max_backoff_time() {
1014 let (mut cfg, rc, ctx) = setup_test(
1015 vec![RetryAction::server_error()],
1016 RetryConfig::standard()
1017 .with_use_static_exponential_base(true)
1018 .with_max_attempts(5)
1019 .with_initial_backoff(Duration::from_secs(1))
1020 .with_max_backoff(Duration::from_secs(3)),
1021 );
1022 let strategy = StandardRetryStrategy::new();
1023 cfg.interceptor_state().store_put(TokenBucket::default());
1024 let token_bucket = cfg.load::<TokenBucket>().unwrap().clone();
1025
1026 cfg.interceptor_state().store_put(RequestAttempts::new(1));
1027 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1028 let dur = should_retry.expect_delay();
1029 assert_eq!(dur, Duration::from_secs(1));
1030 assert_eq!(token_bucket.available_permits(), 495);
1031
1032 cfg.interceptor_state().store_put(RequestAttempts::new(2));
1033 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1034 let dur = should_retry.expect_delay();
1035 assert_eq!(dur, Duration::from_secs(2));
1036 assert_eq!(token_bucket.available_permits(), 490);
1037
1038 cfg.interceptor_state().store_put(RequestAttempts::new(3));
1039 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1040 let dur = should_retry.expect_delay();
1041 assert_eq!(dur, Duration::from_secs(3));
1042 assert_eq!(token_bucket.available_permits(), 485);
1043
1044 cfg.interceptor_state().store_put(RequestAttempts::new(4));
1045 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1046 let dur = should_retry.expect_delay();
1047 assert_eq!(dur, Duration::from_secs(3));
1048 assert_eq!(token_bucket.available_permits(), 480);
1049
1050 cfg.interceptor_state().store_put(RequestAttempts::new(5));
1051 let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1052 assert_eq!(no_retry, ShouldAttempt::No);
1053 assert_eq!(token_bucket.available_permits(), 480);
1054 }
1055
1056 const MAX_BACKOFF: Duration = Duration::from_secs(20);
1057
1058 #[test]
1059 fn calculate_exponential_backoff_where_initial_backoff_is_one() {
1060 let initial_backoff = 1.0;
1061
1062 for (attempt, expected_backoff) in [initial_backoff, 2.0, 4.0].into_iter().enumerate() {
1063 let actual_backoff =
1064 calculate_exponential_backoff(1.0, initial_backoff, attempt as u32, MAX_BACKOFF);
1065 assert_eq!(Duration::from_secs_f64(expected_backoff), actual_backoff);
1066 }
1067 }
1068
1069 #[test]
1070 fn calculate_exponential_backoff_where_initial_backoff_is_greater_than_one() {
1071 let initial_backoff = 3.0;
1072
1073 for (attempt, expected_backoff) in [initial_backoff, 6.0, 12.0].into_iter().enumerate() {
1074 let actual_backoff =
1075 calculate_exponential_backoff(1.0, initial_backoff, attempt as u32, MAX_BACKOFF);
1076 assert_eq!(Duration::from_secs_f64(expected_backoff), actual_backoff);
1077 }
1078 }
1079
1080 #[test]
1081 fn calculate_exponential_backoff_where_initial_backoff_is_less_than_one() {
1082 let initial_backoff = 0.03;
1083
1084 for (attempt, expected_backoff) in [initial_backoff, 0.06, 0.12].into_iter().enumerate() {
1085 let actual_backoff =
1086 calculate_exponential_backoff(1.0, initial_backoff, attempt as u32, MAX_BACKOFF);
1087 assert_eq!(Duration::from_secs_f64(expected_backoff), actual_backoff);
1088 }
1089 }
1090
1091 #[test]
1092 fn calculate_backoff_overflow_should_gracefully_fallback_to_max_backoff() {
1093 assert_eq!(
1095 MAX_BACKOFF,
1096 calculate_exponential_backoff(1_f64, 10_f64, 100000, MAX_BACKOFF),
1097 );
1098 }
1099
1100 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
1101 #[test]
1102 fn v2_1_non_throttling_uses_50ms_backoff() {
1103 let (ctx, rc, cfg) = set_up_cfg_and_context(
1104 ErrorKind::ServerError,
1105 1,
1106 RetryConfig::standard()
1107 .with_use_static_exponential_base(true)
1108 .with_max_attempts(3)
1109 .with_retry_spec(RetrySpec::v2_1()),
1110 );
1111 let strategy = StandardRetryStrategy::new();
1112 let actual = strategy
1113 .should_attempt_retry(&ctx, &rc, &cfg)
1114 .expect("method is infallible for this use");
1115 assert_eq!(
1117 ShouldAttempt::YesAfterDelay(Duration::from_millis(50)),
1118 actual
1119 );
1120 }
1121
1122 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
1123 #[test]
1124 fn v2_1_throttling_uses_1s_backoff() {
1125 let (ctx, rc, cfg) = set_up_cfg_and_context(
1126 ErrorKind::ThrottlingError,
1127 1,
1128 RetryConfig::standard()
1129 .with_use_static_exponential_base(true)
1130 .with_max_attempts(3)
1131 .with_retry_spec(RetrySpec::v2_1()),
1132 );
1133 let strategy = StandardRetryStrategy::new();
1134 let actual = strategy
1135 .should_attempt_retry(&ctx, &rc, &cfg)
1136 .expect("method is infallible for this use");
1137 assert_eq!(ShouldAttempt::YesAfterDelay(Duration::from_secs(1)), actual);
1139 }
1140
1141 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
1142 #[test]
1143 fn v2_0_non_throttling_uses_1s_backoff() {
1144 let (ctx, rc, cfg) = set_up_cfg_and_context(
1145 ErrorKind::ServerError,
1146 1,
1147 RetryConfig::standard()
1148 .with_use_static_exponential_base(true)
1149 .with_max_attempts(3)
1150 .with_retry_spec(RetrySpec::v2_0()),
1151 );
1152 let strategy = StandardRetryStrategy::new();
1153 let actual = strategy
1154 .should_attempt_retry(&ctx, &rc, &cfg)
1155 .expect("method is infallible for this use");
1156 assert_eq!(ShouldAttempt::YesAfterDelay(Duration::from_secs(1)), actual);
1158 }
1159
1160 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
1161 #[test]
1162 fn v2_1_retry_after_bounded_between_t_i_and_t_i_plus_5s() {
1163 let (mut cfg, rc, ctx) = setup_test(
1164 vec![RetryAction::retryable_error_with_explicit_delay(
1165 ErrorKind::ServerError,
1166 Duration::from_secs(3),
1167 )],
1168 RetryConfig::standard()
1169 .with_use_static_exponential_base(true)
1170 .with_max_attempts(3)
1171 .with_retry_spec(RetrySpec::v2_1()),
1172 );
1173 let strategy = StandardRetryStrategy::new();
1174 cfg.interceptor_state().store_put(TokenBucket::default());
1175 cfg.interceptor_state().store_put(RequestAttempts::new(1));
1176 let actual = strategy
1177 .should_attempt_retry(&ctx, &rc, &cfg)
1178 .expect("method is infallible for this use");
1179 assert_eq!(ShouldAttempt::YesAfterDelay(Duration::from_secs(3)), actual);
1181 }
1182
1183 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
1184 #[test]
1185 fn v2_1_retry_after_below_t_i_uses_t_i() {
1186 let (mut cfg, rc, ctx) = setup_test(
1187 vec![RetryAction::retryable_error_with_explicit_delay(
1188 ErrorKind::ServerError,
1189 Duration::from_millis(10),
1190 )],
1191 RetryConfig::standard()
1192 .with_use_static_exponential_base(true)
1193 .with_max_attempts(3)
1194 .with_retry_spec(RetrySpec::v2_1()),
1195 );
1196 let strategy = StandardRetryStrategy::new();
1197 cfg.interceptor_state().store_put(TokenBucket::default());
1198 cfg.interceptor_state().store_put(RequestAttempts::new(1));
1199 let actual = strategy
1200 .should_attempt_retry(&ctx, &rc, &cfg)
1201 .expect("method is infallible for this use");
1202 assert_eq!(
1204 ShouldAttempt::YesAfterDelay(Duration::from_millis(50)),
1205 actual
1206 );
1207 }
1208
1209 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
1210 #[test]
1211 fn v2_1_retry_after_above_t_i_plus_5s_capped() {
1212 let (mut cfg, rc, ctx) = setup_test(
1213 vec![RetryAction::retryable_error_with_explicit_delay(
1214 ErrorKind::ServerError,
1215 Duration::from_secs(10),
1216 )],
1217 RetryConfig::standard()
1218 .with_use_static_exponential_base(true)
1219 .with_max_attempts(3)
1220 .with_retry_spec(RetrySpec::v2_1()),
1221 );
1222 let strategy = StandardRetryStrategy::new();
1223 cfg.interceptor_state().store_put(TokenBucket::default());
1224 cfg.interceptor_state().store_put(RequestAttempts::new(1));
1225 let actual = strategy
1226 .should_attempt_retry(&ctx, &rc, &cfg)
1227 .expect("method is infallible for this use");
1228 assert_eq!(
1230 ShouldAttempt::YesAfterDelay(Duration::from_millis(5050)),
1231 actual
1232 );
1233 }
1234
1235 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
1236 #[test]
1237 fn v2_0_retry_after_capped_at_max_backoff() {
1238 let (mut cfg, rc, ctx) = setup_test(
1239 vec![RetryAction::retryable_error_with_explicit_delay(
1240 ErrorKind::ServerError,
1241 Duration::from_secs(30),
1242 )],
1243 RetryConfig::standard()
1244 .with_use_static_exponential_base(true)
1245 .with_max_attempts(3)
1246 .with_retry_spec(RetrySpec::v2_0()),
1247 );
1248 let strategy = StandardRetryStrategy::new();
1249 cfg.interceptor_state().store_put(TokenBucket::default());
1250 cfg.interceptor_state().store_put(RequestAttempts::new(1));
1251 let actual = strategy
1252 .should_attempt_retry(&ctx, &rc, &cfg)
1253 .expect("method is infallible for this use");
1254 assert_eq!(
1256 ShouldAttempt::YesAfterDelay(Duration::from_secs(20)),
1257 actual
1258 );
1259 }
1260
1261 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
1262 #[test]
1263 fn long_polling_backs_off_when_token_bucket_empty() {
1264 let (mut cfg, rc, ctx) = setup_test(
1265 vec![RetryAction::server_error()],
1266 RetryConfig::standard()
1267 .with_use_static_exponential_base(true)
1268 .with_max_attempts(5)
1269 .with_retry_spec(RetrySpec::v2_1().with_long_polling(true)),
1270 );
1271 let strategy = StandardRetryStrategy::new();
1272 cfg.interceptor_state().store_put(TokenBucket::new(0));
1273 cfg.interceptor_state().store_put(RequestAttempts::new(1));
1274 let hint = LongPollingBackoff::default();
1275 cfg.interceptor_state().store_put(hint.clone());
1276
1277 let result = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1278 assert_eq!(result, ShouldAttempt::No);
1279 assert_eq!(hint.take(), Some(Duration::from_millis(50)));
1280 }
1281
1282 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
1283 #[test]
1284 fn non_long_polling_no_backoff_when_token_bucket_empty() {
1285 let (mut cfg, rc, ctx) = setup_test(
1286 vec![RetryAction::server_error()],
1287 RetryConfig::standard()
1288 .with_use_static_exponential_base(true)
1289 .with_max_attempts(5)
1290 .with_retry_spec(RetrySpec::v2_0()),
1291 );
1292 let strategy = StandardRetryStrategy::new();
1293 cfg.interceptor_state().store_put(TokenBucket::new(0));
1294 cfg.interceptor_state().store_put(RequestAttempts::new(1));
1295
1296 let result = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1297 assert_eq!(result, ShouldAttempt::No);
1298 }
1299
1300 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
1301 fn v2_1_token_bucket_with_capacity(capacity: usize) -> TokenBucket {
1302 TokenBucket::builder()
1303 .capacity(capacity)
1304 .retry_cost(DEFAULT_RETRY_COST)
1305 .throttling_retry_cost(THROTTLING_RETRY_COST)
1306 .timeout_retry_cost(DEFAULT_RETRY_TIMEOUT_COST)
1307 .build()
1308 }
1309
1310 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
1311 fn v2_1_token_bucket() -> TokenBucket {
1312 v2_1_token_bucket_with_capacity(DEFAULT_CAPACITY)
1313 }
1314
1315 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
1316 #[test]
1317 fn retry_eventually_succeeds() {
1318 let (mut cfg, rc, mut ctx) = setup_test(
1319 vec![RetryAction::server_error()],
1320 RetryConfig::standard()
1321 .with_use_static_exponential_base(true)
1322 .with_retry_spec(RetrySpec::v2_1()),
1323 );
1324 let strategy = StandardRetryStrategy::new();
1325 let tb = v2_1_token_bucket();
1326 cfg.interceptor_state().store_put(tb.clone());
1327
1328 cfg.interceptor_state().store_put(RequestAttempts::new(1));
1329 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1330 assert_eq!(should_retry.expect_delay(), Duration::from_millis(50));
1331 assert_eq!(
1332 tb.available_permits(),
1333 DEFAULT_CAPACITY - DEFAULT_RETRY_COST as usize
1334 );
1335
1336 cfg.interceptor_state().store_put(RequestAttempts::new(2));
1337 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1338 assert_eq!(should_retry.expect_delay(), Duration::from_millis(100));
1339 assert_eq!(
1340 tb.available_permits(),
1341 DEFAULT_CAPACITY - 2 * DEFAULT_RETRY_COST as usize
1342 );
1343
1344 ctx.set_output_or_error(Ok(Output::doesnt_matter()));
1345 cfg.interceptor_state().store_put(RequestAttempts::new(3));
1346 let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1347 assert_eq!(no_retry, ShouldAttempt::No);
1348 assert_eq!(
1349 tb.available_permits(),
1350 DEFAULT_CAPACITY - DEFAULT_RETRY_COST as usize
1351 );
1352 }
1353
1354 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
1355 #[test]
1356 fn fail_due_to_max_attempts_reached() {
1357 let (mut cfg, rc, ctx) = setup_test(
1358 vec![RetryAction::server_error()],
1359 RetryConfig::standard()
1360 .with_use_static_exponential_base(true)
1361 .with_retry_spec(RetrySpec::v2_1()),
1362 );
1363 let strategy = StandardRetryStrategy::new();
1364 let tb = v2_1_token_bucket();
1365 cfg.interceptor_state().store_put(tb.clone());
1366
1367 cfg.interceptor_state().store_put(RequestAttempts::new(1));
1368 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1369 assert_eq!(should_retry.expect_delay(), Duration::from_millis(50));
1370 assert_eq!(
1371 tb.available_permits(),
1372 DEFAULT_CAPACITY - DEFAULT_RETRY_COST as usize
1373 );
1374
1375 cfg.interceptor_state().store_put(RequestAttempts::new(2));
1376 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1377 assert_eq!(should_retry.expect_delay(), Duration::from_millis(100));
1378 assert_eq!(
1379 tb.available_permits(),
1380 DEFAULT_CAPACITY - 2 * DEFAULT_RETRY_COST as usize
1381 );
1382
1383 cfg.interceptor_state().store_put(RequestAttempts::new(3));
1384 let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1385 assert_eq!(no_retry, ShouldAttempt::No);
1386 assert_eq!(
1387 tb.available_permits(),
1388 DEFAULT_CAPACITY - 2 * DEFAULT_RETRY_COST as usize
1389 );
1390 }
1391
1392 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
1393 #[test]
1394 fn retry_quota_reached_after_single_retry() {
1395 let (mut cfg, rc, ctx) = setup_test(
1396 vec![RetryAction::server_error()],
1397 RetryConfig::standard()
1398 .with_use_static_exponential_base(true)
1399 .with_max_attempts(5)
1400 .with_retry_spec(RetrySpec::v2_1()),
1401 );
1402 let strategy = StandardRetryStrategy::new();
1403 let tb = v2_1_token_bucket_with_capacity(DEFAULT_RETRY_COST as usize);
1404 cfg.interceptor_state().store_put(tb.clone());
1405
1406 cfg.interceptor_state().store_put(RequestAttempts::new(1));
1407 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1408 assert_eq!(should_retry.expect_delay(), Duration::from_millis(50));
1409 assert_eq!(tb.available_permits(), 0);
1410
1411 cfg.interceptor_state().store_put(RequestAttempts::new(2));
1412 let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1413 assert_eq!(no_retry, ShouldAttempt::No);
1414 assert_eq!(tb.available_permits(), 0);
1415 }
1416
1417 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
1418 #[test]
1419 fn no_retries_if_retry_quota_is_zero() {
1420 let (mut cfg, rc, ctx) = setup_test(
1421 vec![RetryAction::server_error()],
1422 RetryConfig::standard()
1423 .with_use_static_exponential_base(true)
1424 .with_retry_spec(RetrySpec::v2_1()),
1425 );
1426 let strategy = StandardRetryStrategy::new();
1427 let tb = v2_1_token_bucket_with_capacity(0);
1428 cfg.interceptor_state().store_put(tb.clone());
1429
1430 cfg.interceptor_state().store_put(RequestAttempts::new(1));
1431 let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1432 assert_eq!(no_retry, ShouldAttempt::No);
1433 assert_eq!(tb.available_permits(), 0);
1434 }
1435
1436 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
1437 #[test]
1438 fn retry_stops_after_retry_quota_exhaustion() {
1439 let (mut cfg, rc, ctx) = setup_test(
1440 vec![RetryAction::server_error()],
1441 RetryConfig::standard()
1442 .with_use_static_exponential_base(true)
1443 .with_max_attempts(5)
1444 .with_retry_spec(RetrySpec::v2_1()),
1445 );
1446 let strategy = StandardRetryStrategy::new();
1447 let tb = v2_1_token_bucket_with_capacity(20);
1448 cfg.interceptor_state().store_put(tb.clone());
1449
1450 cfg.interceptor_state().store_put(RequestAttempts::new(1));
1451 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1452 assert_eq!(should_retry.expect_delay(), Duration::from_millis(50));
1453 assert_eq!(tb.available_permits(), 20 - DEFAULT_RETRY_COST as usize);
1454
1455 cfg.interceptor_state().store_put(RequestAttempts::new(2));
1456 let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1457 assert_eq!(no_retry, ShouldAttempt::No);
1458 assert_eq!(tb.available_permits(), 20 - DEFAULT_RETRY_COST as usize);
1459 }
1460
1461 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
1462 #[test]
1463 fn retry_quota_recovery_after_successful_responses() {
1464 let (mut cfg, rc, mut ctx) = setup_test(
1465 vec![RetryAction::server_error()],
1466 RetryConfig::standard()
1467 .with_use_static_exponential_base(true)
1468 .with_max_attempts(5)
1469 .with_retry_spec(RetrySpec::v2_1()),
1470 );
1471 let strategy = StandardRetryStrategy::new();
1472 let tb = v2_1_token_bucket_with_capacity(30);
1473 cfg.interceptor_state().store_put(tb.clone());
1474
1475 cfg.interceptor_state().store_put(RequestAttempts::new(1));
1476 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1477 assert_eq!(should_retry.expect_delay(), Duration::from_millis(50));
1478 assert_eq!(tb.available_permits(), 30 - DEFAULT_RETRY_COST as usize);
1479
1480 cfg.interceptor_state().store_put(RequestAttempts::new(2));
1481 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1482 assert_eq!(should_retry.expect_delay(), Duration::from_millis(100));
1483 assert_eq!(tb.available_permits(), 30 - 2 * DEFAULT_RETRY_COST as usize);
1484
1485 ctx.set_output_or_error(Ok(Output::doesnt_matter()));
1486 cfg.interceptor_state().store_put(RequestAttempts::new(3));
1487 let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1488 assert_eq!(no_retry, ShouldAttempt::No);
1489 assert_eq!(tb.available_permits(), 30 - DEFAULT_RETRY_COST as usize);
1490
1491 ctx.set_output_or_error(Err(OrchestratorError::other("doesn't matter")));
1492 cfg.interceptor_state().store_put(RequestAttempts::new(1));
1493 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1494 assert_eq!(should_retry.expect_delay(), Duration::from_millis(50));
1495 assert_eq!(tb.available_permits(), 30 - 2 * DEFAULT_RETRY_COST as usize);
1496
1497 ctx.set_output_or_error(Ok(Output::doesnt_matter()));
1498 cfg.interceptor_state().store_put(RequestAttempts::new(2));
1499 let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1500 assert_eq!(no_retry, ShouldAttempt::No);
1501 assert_eq!(tb.available_permits(), 30 - DEFAULT_RETRY_COST as usize);
1502 }
1503
1504 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
1505 #[test]
1506 fn throttling_error_token_bucket_drain_and_backoff() {
1507 let (mut cfg, rc, mut ctx) = setup_test(
1508 vec![RetryAction::retryable_error(ErrorKind::ThrottlingError)],
1509 RetryConfig::standard()
1510 .with_use_static_exponential_base(true)
1511 .with_retry_spec(RetrySpec::v2_1()),
1512 );
1513 let strategy = StandardRetryStrategy::new();
1514 let tb = v2_1_token_bucket();
1515 cfg.interceptor_state().store_put(tb.clone());
1516
1517 cfg.interceptor_state().store_put(RequestAttempts::new(1));
1518 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1519 assert_eq!(should_retry.expect_delay(), Duration::from_secs(1));
1520 assert_eq!(
1521 tb.available_permits(),
1522 DEFAULT_CAPACITY - THROTTLING_RETRY_COST as usize
1523 );
1524
1525 ctx.set_output_or_error(Ok(Output::doesnt_matter()));
1526 cfg.interceptor_state().store_put(RequestAttempts::new(2));
1527 let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1528 assert_eq!(no_retry, ShouldAttempt::No);
1529 assert_eq!(tb.available_permits(), DEFAULT_CAPACITY);
1530 }
1531
1532 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
1533 #[test]
1534 fn long_polling_backoff_after_throttling_error_when_token_bucket_empty() {
1535 let (mut cfg, rc, ctx) = setup_test(
1536 vec![RetryAction::retryable_error(ErrorKind::ThrottlingError)],
1537 RetryConfig::standard()
1538 .with_use_static_exponential_base(true)
1539 .with_max_attempts(5)
1540 .with_retry_spec(RetrySpec::v2_1().with_long_polling(true)),
1541 );
1542 let strategy = StandardRetryStrategy::new();
1543 let tb = v2_1_token_bucket_with_capacity(0);
1544 cfg.interceptor_state().store_put(tb.clone());
1545 cfg.interceptor_state().store_put(RequestAttempts::new(1));
1546 let hint = LongPollingBackoff::default();
1547 cfg.interceptor_state().store_put(hint.clone());
1548
1549 let result = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1550 assert_eq!(result, ShouldAttempt::No);
1551 assert_eq!(hint.take(), Some(Duration::from_secs(1)));
1552 assert_eq!(tb.available_permits(), 0);
1553 }
1554
1555 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
1556 #[test]
1557 fn long_polling_max_attempts_exceeded_must_not_delay() {
1558 let (mut cfg, rc, ctx) = setup_test(
1559 vec![RetryAction::server_error()],
1560 RetryConfig::standard()
1561 .with_use_static_exponential_base(true)
1562 .with_max_attempts(2)
1563 .with_retry_spec(RetrySpec::v2_1().with_long_polling(true)),
1564 );
1565 let strategy = StandardRetryStrategy::new();
1566 let tb = v2_1_token_bucket();
1567 cfg.interceptor_state().store_put(tb.clone());
1568
1569 cfg.interceptor_state().store_put(RequestAttempts::new(1));
1570 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1571 assert_eq!(should_retry.expect_delay(), Duration::from_millis(50));
1572
1573 cfg.interceptor_state().store_put(RequestAttempts::new(2));
1574 let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1575 assert_eq!(no_retry, ShouldAttempt::No);
1576 }
1577
1578 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
1579 #[test]
1580 fn long_polling_success_must_not_delay() {
1581 let (mut cfg, rc, mut ctx) = setup_test(
1582 vec![RetryAction::server_error()],
1583 RetryConfig::standard()
1584 .with_use_static_exponential_base(true)
1585 .with_max_attempts(2)
1586 .with_retry_spec(RetrySpec::v2_1().with_long_polling(true)),
1587 );
1588 let strategy = StandardRetryStrategy::new();
1589 let tb = v2_1_token_bucket();
1590 cfg.interceptor_state().store_put(tb.clone());
1591
1592 cfg.interceptor_state().store_put(RequestAttempts::new(1));
1593 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1594 assert_eq!(should_retry.expect_delay(), Duration::from_millis(50));
1595
1596 ctx.set_output_or_error(Ok(Output::doesnt_matter()));
1597 cfg.interceptor_state().store_put(RequestAttempts::new(2));
1598 let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1599 assert_eq!(no_retry, ShouldAttempt::No);
1600 }
1601
1602 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
1603 #[test]
1604 fn long_polling_non_retryable_errors_must_not_delay() {
1605 let (mut cfg, rc, ctx) = setup_test(
1606 vec![RetryAction::NoActionIndicated],
1607 RetryConfig::standard()
1608 .with_use_static_exponential_base(true)
1609 .with_max_attempts(2)
1610 .with_retry_spec(RetrySpec::v2_1().with_long_polling(true)),
1611 );
1612 let strategy = StandardRetryStrategy::new();
1613 let tb = v2_1_token_bucket();
1614 cfg.interceptor_state().store_put(tb.clone());
1615 cfg.interceptor_state().store_put(RequestAttempts::new(1));
1616
1617 let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1618 assert_eq!(no_retry, ShouldAttempt::No);
1619 }
1620
1621 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
1622 #[test]
1623 fn verify_max_backoff_time() {
1624 let (mut cfg, rc, ctx) = setup_test(
1625 vec![RetryAction::server_error()],
1626 RetryConfig::standard()
1627 .with_use_static_exponential_base(true)
1628 .with_max_attempts(5)
1629 .with_max_backoff(Duration::from_millis(200))
1630 .with_retry_spec(RetrySpec::v2_1()),
1631 );
1632 let strategy = StandardRetryStrategy::new();
1633 let tb = v2_1_token_bucket();
1634 cfg.interceptor_state().store_put(tb.clone());
1635
1636 cfg.interceptor_state().store_put(RequestAttempts::new(1));
1637 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1638 assert_eq!(should_retry.expect_delay(), Duration::from_millis(50));
1639 assert_eq!(
1640 tb.available_permits(),
1641 DEFAULT_CAPACITY - DEFAULT_RETRY_COST as usize
1642 );
1643
1644 cfg.interceptor_state().store_put(RequestAttempts::new(2));
1645 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1646 assert_eq!(should_retry.expect_delay(), Duration::from_millis(100));
1647 assert_eq!(
1648 tb.available_permits(),
1649 DEFAULT_CAPACITY - 2 * DEFAULT_RETRY_COST as usize
1650 );
1651
1652 cfg.interceptor_state().store_put(RequestAttempts::new(3));
1653 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1654 assert_eq!(should_retry.expect_delay(), Duration::from_millis(200));
1655 assert_eq!(
1656 tb.available_permits(),
1657 DEFAULT_CAPACITY - 3 * DEFAULT_RETRY_COST as usize
1658 );
1659
1660 cfg.interceptor_state().store_put(RequestAttempts::new(4));
1662 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1663 assert_eq!(should_retry.expect_delay(), Duration::from_millis(200));
1664 assert_eq!(
1665 tb.available_permits(),
1666 DEFAULT_CAPACITY - 4 * DEFAULT_RETRY_COST as usize
1667 );
1668
1669 cfg.interceptor_state().store_put(RequestAttempts::new(5));
1670 let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1671 assert_eq!(no_retry, ShouldAttempt::No);
1672 assert_eq!(
1673 tb.available_permits(),
1674 DEFAULT_CAPACITY - 4 * DEFAULT_RETRY_COST as usize
1675 );
1676 }
1677}