1use std::sync::Mutex;
7use std::time::{Duration, SystemTime};
8
9use tokio::sync::OwnedSemaphorePermit;
10use tracing::{debug, trace};
11
12use aws_smithy_runtime_api::box_error::BoxError;
13use aws_smithy_runtime_api::client::interceptors::context::{
14 BeforeTransmitInterceptorContextMut, InterceptorContext,
15};
16use aws_smithy_runtime_api::client::interceptors::Intercept;
17use aws_smithy_runtime_api::client::retries::classifiers::{RetryAction, RetryReason};
18use aws_smithy_runtime_api::client::retries::{RequestAttempts, RetryStrategy, ShouldAttempt};
19use aws_smithy_runtime_api::client::runtime_components::RuntimeComponents;
20use aws_smithy_types::config_bag::{ConfigBag, Layer, Storable, StoreReplace};
21use aws_smithy_types::retry::{ErrorKind, RetryConfig, RetryMode};
22
23use crate::client::retries::classifiers::run_classifiers_on_ctx;
24use crate::client::retries::client_rate_limiter::{ClientRateLimiter, RequestReason};
25use crate::client::retries::strategy::standard::ReleaseResult::{
26 APermitWasReleased, NoPermitWasReleased,
27};
28use crate::client::retries::token_bucket::TokenBucket;
29use crate::client::retries::{ClientRateLimiterPartition, RetryPartition, RetryPartitionInner};
30use crate::static_partition_map::StaticPartitionMap;
31
32static CLIENT_RATE_LIMITER: StaticPartitionMap<ClientRateLimiterPartition, ClientRateLimiter> =
33 StaticPartitionMap::new();
34
35static TOKEN_BUCKET: StaticPartitionMap<RetryPartition, TokenBucket> = StaticPartitionMap::new();
37
38#[derive(Debug, Default)]
40pub struct StandardRetryStrategy {
41 retry_permit: Mutex<Option<OwnedSemaphorePermit>>,
42}
43
44impl Storable for StandardRetryStrategy {
45 type Storer = StoreReplace<Self>;
46}
47
48impl StandardRetryStrategy {
49 pub fn new() -> Self {
51 Default::default()
52 }
53
54 fn release_retry_permit(&self, token_bucket: &TokenBucket) -> ReleaseResult {
55 let mut retry_permit = self.retry_permit.lock().unwrap();
56 match retry_permit.take() {
57 Some(p) => {
58 if token_bucket.success_reward() > 0.0 {
60 token_bucket.reward_success();
61 p.forget();
62 } else {
63 drop(p); }
65 APermitWasReleased
66 }
67 None => {
68 if token_bucket.success_reward() > 0.0 {
70 token_bucket.reward_success();
71 } else {
72 token_bucket.regenerate_a_token();
73 }
74 NoPermitWasReleased
75 }
76 }
77 }
78
79 fn set_retry_permit(&self, new_retry_permit: OwnedSemaphorePermit) {
80 let mut old_retry_permit = self.retry_permit.lock().unwrap();
81 if let Some(p) = old_retry_permit.replace(new_retry_permit) {
82 p.forget()
85 }
86 }
87
88 fn adaptive_retry_rate_limiter(
90 runtime_components: &RuntimeComponents,
91 cfg: &ConfigBag,
92 ) -> Option<ClientRateLimiter> {
93 let retry_config = cfg.load::<RetryConfig>().expect("retry config is required");
94 if retry_config.mode() == RetryMode::Adaptive {
95 if let Some(time_source) = runtime_components.time_source() {
96 let retry_partition = cfg.load::<RetryPartition>().expect("set in default config");
97 let seconds_since_unix_epoch = time_source
98 .now()
99 .duration_since(SystemTime::UNIX_EPOCH)
100 .expect("the present takes place after the UNIX_EPOCH")
101 .as_secs_f64();
102 let client_rate_limiter = match &retry_partition.inner {
103 RetryPartitionInner::Default(_) => {
104 let client_rate_limiter_partition =
105 ClientRateLimiterPartition::new(retry_partition.clone());
106 CLIENT_RATE_LIMITER.get_or_init(client_rate_limiter_partition, || {
107 ClientRateLimiter::new(seconds_since_unix_epoch)
108 })
109 }
110 RetryPartitionInner::Custom {
111 client_rate_limiter,
112 ..
113 } => client_rate_limiter.clone(),
114 };
115 return Some(client_rate_limiter);
116 }
117 }
118 None
119 }
120
121 fn calculate_backoff(
122 &self,
123 runtime_components: &RuntimeComponents,
124 cfg: &ConfigBag,
125 retry_cfg: &RetryConfig,
126 retry_reason: &RetryAction,
127 ) -> Result<Duration, ShouldAttempt> {
128 let request_attempts = cfg
129 .load::<RequestAttempts>()
130 .expect("at least one request attempt is made before any retry is attempted")
131 .attempts();
132
133 match retry_reason {
134 RetryAction::RetryIndicated(RetryReason::RetryableError { kind, retry_after }) => {
135 if let Some(delay) = *retry_after {
136 let delay = delay.min(retry_cfg.max_backoff());
137 debug!("explicit request from server to delay {delay:?} before retrying");
138 Ok(delay)
139 } else if let Some(delay) =
140 check_rate_limiter_for_delay(runtime_components, cfg, *kind)
141 {
142 let delay = delay.min(retry_cfg.max_backoff());
143 debug!("rate limiter has requested a {delay:?} delay before retrying");
144 Ok(delay)
145 } else {
146 let base = if retry_cfg.use_static_exponential_base() {
147 1.0
148 } else {
149 fastrand::f64()
150 };
151 Ok(calculate_exponential_backoff(
152 base,
154 retry_cfg.initial_backoff().as_secs_f64(),
156 request_attempts - 1,
159 retry_cfg.max_backoff(),
161 ))
162 }
163 }
164 RetryAction::RetryForbidden | RetryAction::NoActionIndicated => {
165 debug!(
166 attempts = request_attempts,
167 max_attempts = retry_cfg.max_attempts(),
168 "encountered un-retryable error"
169 );
170 Err(ShouldAttempt::No)
171 }
172 _ => unreachable!("RetryAction is non-exhaustive"),
173 }
174 }
175}
176
177enum ReleaseResult {
178 APermitWasReleased,
179 NoPermitWasReleased,
180}
181
182impl RetryStrategy for StandardRetryStrategy {
183 fn should_attempt_initial_request(
184 &self,
185 runtime_components: &RuntimeComponents,
186 cfg: &ConfigBag,
187 ) -> Result<ShouldAttempt, BoxError> {
188 if let Some(crl) = Self::adaptive_retry_rate_limiter(runtime_components, cfg) {
189 let seconds_since_unix_epoch = get_seconds_since_unix_epoch(runtime_components);
190 if let Err(delay) = crl.acquire_permission_to_send_a_request(
191 seconds_since_unix_epoch,
192 RequestReason::InitialRequest,
193 ) {
194 return Ok(ShouldAttempt::YesAfterDelay(delay));
195 }
196 } else {
197 debug!("no client rate limiter configured, so no token is required for the initial request.");
198 }
199
200 Ok(ShouldAttempt::Yes)
201 }
202
203 fn should_attempt_retry(
204 &self,
205 ctx: &InterceptorContext,
206 runtime_components: &RuntimeComponents,
207 cfg: &ConfigBag,
208 ) -> Result<ShouldAttempt, BoxError> {
209 let retry_cfg = cfg.load::<RetryConfig>().expect("retry config is required");
210
211 let token_bucket = cfg.load::<TokenBucket>().expect("token bucket is required");
213 let retry_classifiers = runtime_components.retry_classifiers();
215 let classifier_result = run_classifiers_on_ctx(retry_classifiers, ctx);
216
217 let error_kind = error_kind(&classifier_result);
222 let is_throttling_error = error_kind
223 .map(|kind| kind == ErrorKind::ThrottlingError)
224 .unwrap_or(false);
225 update_rate_limiter_if_exists(runtime_components, cfg, is_throttling_error);
226
227 if !ctx.is_failed() {
229 self.release_retry_permit(token_bucket);
230 }
231 let request_attempts = cfg
234 .load::<RequestAttempts>()
235 .expect("at least one request attempt is made before any retry is attempted")
236 .attempts();
237
238 if !classifier_result.should_retry() {
240 debug!(
241 "attempt #{request_attempts} classified as {:?}, not retrying",
242 classifier_result
243 );
244 return Ok(ShouldAttempt::No);
245 }
246
247 if request_attempts >= retry_cfg.max_attempts() {
249 debug!(
250 attempts = request_attempts,
251 max_attempts = retry_cfg.max_attempts(),
252 "not retrying because we are out of attempts"
253 );
254 return Ok(ShouldAttempt::No);
255 }
256
257 let error_kind = error_kind.expect("result was classified retryable");
259 match token_bucket.acquire(
260 &error_kind,
261 &runtime_components.time_source().unwrap_or_default(),
262 ) {
263 Some(permit) => self.set_retry_permit(permit),
264 None => {
265 debug!("attempt #{request_attempts} failed with {error_kind:?}; However, not enough retry quota is available for another attempt so no retry will be attempted.");
266 return Ok(ShouldAttempt::No);
267 }
268 }
269
270 let backoff =
272 match self.calculate_backoff(runtime_components, cfg, retry_cfg, &classifier_result) {
273 Ok(value) => value,
274 Err(value) => return Ok(value),
276 };
277
278 debug!(
279 "attempt #{request_attempts} failed with {:?}; retrying after {:?}",
280 classifier_result, backoff
281 );
282 Ok(ShouldAttempt::YesAfterDelay(backoff))
283 }
284}
285
286fn error_kind(classifier_result: &RetryAction) -> Option<ErrorKind> {
288 match classifier_result {
289 RetryAction::RetryIndicated(RetryReason::RetryableError { kind, .. }) => Some(*kind),
290 _ => None,
291 }
292}
293
294fn update_rate_limiter_if_exists(
295 runtime_components: &RuntimeComponents,
296 cfg: &ConfigBag,
297 is_throttling_error: bool,
298) {
299 if let Some(crl) = StandardRetryStrategy::adaptive_retry_rate_limiter(runtime_components, cfg) {
300 let seconds_since_unix_epoch = get_seconds_since_unix_epoch(runtime_components);
301 crl.update_rate_limiter(seconds_since_unix_epoch, is_throttling_error);
302 }
303}
304
305fn check_rate_limiter_for_delay(
306 runtime_components: &RuntimeComponents,
307 cfg: &ConfigBag,
308 kind: ErrorKind,
309) -> Option<Duration> {
310 if let Some(crl) = StandardRetryStrategy::adaptive_retry_rate_limiter(runtime_components, cfg) {
311 let retry_reason = if kind == ErrorKind::ThrottlingError {
312 RequestReason::RetryTimeout
313 } else {
314 RequestReason::Retry
315 };
316 if let Err(delay) = crl.acquire_permission_to_send_a_request(
317 get_seconds_since_unix_epoch(runtime_components),
318 retry_reason,
319 ) {
320 return Some(delay);
321 }
322 }
323
324 None
325}
326
327pub(super) fn calculate_exponential_backoff(
328 base: f64,
329 initial_backoff: f64,
330 retry_attempts: u32,
331 max_backoff: Duration,
332) -> Duration {
333 let result = match 2_u32
334 .checked_pow(retry_attempts)
335 .map(|power| (power as f64) * initial_backoff)
336 {
337 Some(backoff) => match Duration::try_from_secs_f64(backoff) {
338 Ok(result) => result.min(max_backoff),
339 Err(e) => {
340 tracing::warn!("falling back to {max_backoff:?} as `Duration` could not be created for exponential backoff: {e}");
341 max_backoff
342 }
343 },
344 None => max_backoff,
345 };
346
347 result.mul_f64(base)
350}
351
352pub(super) fn get_seconds_since_unix_epoch(runtime_components: &RuntimeComponents) -> f64 {
353 let request_time = runtime_components
354 .time_source()
355 .expect("time source required for retries");
356 request_time
357 .now()
358 .duration_since(SystemTime::UNIX_EPOCH)
359 .unwrap()
360 .as_secs_f64()
361}
362
363#[derive(Debug)]
367pub(crate) struct TokenBucketProvider {
368 default_partition: RetryPartition,
369 token_bucket: TokenBucket,
370}
371
372impl TokenBucketProvider {
373 pub(crate) fn new(default_partition: RetryPartition) -> Self {
378 let token_bucket = TOKEN_BUCKET.get_or_init_default(default_partition.clone());
379 Self {
380 default_partition,
381 token_bucket,
382 }
383 }
384}
385
386impl Intercept for TokenBucketProvider {
387 fn name(&self) -> &'static str {
388 "TokenBucketProvider"
389 }
390
391 fn modify_before_retry_loop(
392 &self,
393 _context: &mut BeforeTransmitInterceptorContextMut<'_>,
394 _runtime_components: &RuntimeComponents,
395 cfg: &mut ConfigBag,
396 ) -> Result<(), BoxError> {
397 let retry_partition = cfg.load::<RetryPartition>().expect("set in default config");
398
399 let tb = match &retry_partition.inner {
400 RetryPartitionInner::Default(name) => {
401 if name == self.default_partition.name() {
405 self.token_bucket.clone()
407 } else {
408 TOKEN_BUCKET.get_or_init_default(retry_partition.clone())
409 }
410 }
411 RetryPartitionInner::Custom { token_bucket, .. } => token_bucket.clone(),
412 };
413
414 trace!("token bucket for {retry_partition:?} added to config bag");
415 let mut layer = Layer::new("token_bucket_partition");
416 layer.store_put(tb);
417 cfg.push_layer(layer);
418 Ok(())
419 }
420}
421
422#[cfg(test)]
423mod tests {
424 #[allow(unused_imports)] use std::fmt;
426 use std::sync::Mutex;
427 use std::time::Duration;
428
429 use aws_smithy_async::time::SystemTimeSource;
430 use aws_smithy_runtime_api::client::interceptors::context::{
431 Input, InterceptorContext, Output,
432 };
433 use aws_smithy_runtime_api::client::orchestrator::OrchestratorError;
434 use aws_smithy_runtime_api::client::retries::classifiers::{
435 ClassifyRetry, RetryAction, SharedRetryClassifier,
436 };
437 use aws_smithy_runtime_api::client::retries::{
438 AlwaysRetry, RequestAttempts, RetryStrategy, ShouldAttempt,
439 };
440 use aws_smithy_runtime_api::client::runtime_components::{
441 RuntimeComponents, RuntimeComponentsBuilder,
442 };
443 use aws_smithy_types::config_bag::{ConfigBag, Layer};
444 use aws_smithy_types::retry::{ErrorKind, RetryConfig};
445
446 use super::{calculate_exponential_backoff, StandardRetryStrategy};
447 use crate::client::retries::{ClientRateLimiter, RetryPartition, TokenBucket};
448
449 #[test]
450 fn no_retry_necessary_for_ok_result() {
451 let cfg = ConfigBag::of_layers(vec![{
452 let mut layer = Layer::new("test");
453 layer.store_put(RetryConfig::standard());
454 layer.store_put(RequestAttempts::new(1));
455 layer.store_put(TokenBucket::default());
456 layer
457 }]);
458 let rc = RuntimeComponentsBuilder::for_tests().build().unwrap();
459 let mut ctx = InterceptorContext::new(Input::doesnt_matter());
460 let strategy = StandardRetryStrategy::default();
461 ctx.set_output_or_error(Ok(Output::doesnt_matter()));
462
463 let actual = strategy
464 .should_attempt_retry(&ctx, &rc, &cfg)
465 .expect("method is infallible for this use");
466 assert_eq!(ShouldAttempt::No, actual);
467 }
468
469 fn set_up_cfg_and_context(
470 error_kind: ErrorKind,
471 current_request_attempts: u32,
472 retry_config: RetryConfig,
473 ) -> (InterceptorContext, RuntimeComponents, ConfigBag) {
474 let mut ctx = InterceptorContext::new(Input::doesnt_matter());
475 ctx.set_output_or_error(Err(OrchestratorError::other("doesn't matter")));
476 let rc = RuntimeComponentsBuilder::for_tests()
477 .with_retry_classifier(SharedRetryClassifier::new(AlwaysRetry(error_kind)))
478 .build()
479 .unwrap();
480 let mut layer = Layer::new("test");
481 layer.store_put(RequestAttempts::new(current_request_attempts));
482 layer.store_put(retry_config);
483 layer.store_put(TokenBucket::default());
484 let cfg = ConfigBag::of_layers(vec![layer]);
485
486 (ctx, rc, cfg)
487 }
488
489 fn test_should_retry_error_kind(error_kind: ErrorKind) {
492 let (ctx, rc, cfg) = set_up_cfg_and_context(
493 error_kind,
494 3,
495 RetryConfig::standard()
496 .with_use_static_exponential_base(true)
497 .with_max_attempts(4),
498 );
499 let strategy = StandardRetryStrategy::new();
500 let actual = strategy
501 .should_attempt_retry(&ctx, &rc, &cfg)
502 .expect("method is infallible for this use");
503 assert_eq!(ShouldAttempt::YesAfterDelay(Duration::from_secs(4)), actual);
504 }
505
506 #[test]
507 fn should_retry_transient_error_result_after_2s() {
508 test_should_retry_error_kind(ErrorKind::TransientError);
509 }
510
511 #[test]
512 fn should_retry_client_error_result_after_2s() {
513 test_should_retry_error_kind(ErrorKind::ClientError);
514 }
515
516 #[test]
517 fn should_retry_server_error_result_after_2s() {
518 test_should_retry_error_kind(ErrorKind::ServerError);
519 }
520
521 #[test]
522 fn should_retry_throttling_error_result_after_2s() {
523 test_should_retry_error_kind(ErrorKind::ThrottlingError);
524 }
525
526 #[test]
527 fn dont_retry_when_out_of_attempts() {
528 let current_attempts = 4;
529 let max_attempts = current_attempts;
530 let (ctx, rc, cfg) = set_up_cfg_and_context(
531 ErrorKind::TransientError,
532 current_attempts,
533 RetryConfig::standard()
534 .with_use_static_exponential_base(true)
535 .with_max_attempts(max_attempts),
536 );
537 let strategy = StandardRetryStrategy::new();
538 let actual = strategy
539 .should_attempt_retry(&ctx, &rc, &cfg)
540 .expect("method is infallible for this use");
541 assert_eq!(ShouldAttempt::No, actual);
542 }
543
544 #[test]
545 fn should_not_panic_when_exponential_backoff_duration_could_not_be_created() {
546 let (ctx, rc, cfg) = set_up_cfg_and_context(
547 ErrorKind::TransientError,
548 33,
550 RetryConfig::standard()
551 .with_use_static_exponential_base(true)
552 .with_max_attempts(100), );
554 let strategy = StandardRetryStrategy::new();
555 let actual = strategy
556 .should_attempt_retry(&ctx, &rc, &cfg)
557 .expect("method is infallible for this use");
558 assert_eq!(ShouldAttempt::YesAfterDelay(MAX_BACKOFF), actual);
559 }
560
561 #[test]
562 fn should_yield_client_rate_limiter_from_custom_partition() {
563 let expected = ClientRateLimiter::builder().token_refill_rate(3.14).build();
564 let cfg = ConfigBag::of_layers(vec![
565 {
567 let mut layer = Layer::new("default");
568 layer.store_put(RetryPartition::new("default"));
569 layer
570 },
571 {
572 let mut layer = Layer::new("user");
573 layer.store_put(RetryConfig::adaptive());
574 layer.store_put(
575 RetryPartition::custom("user")
576 .client_rate_limiter(expected.clone())
577 .build(),
578 );
579 layer
580 },
581 ]);
582 let rc = RuntimeComponentsBuilder::for_tests()
583 .with_time_source(Some(SystemTimeSource::new()))
584 .build()
585 .unwrap();
586 let actual = StandardRetryStrategy::adaptive_retry_rate_limiter(&rc, &cfg)
587 .expect("should yield client rate limiter from custom partition");
588 assert!(std::sync::Arc::ptr_eq(&expected.inner, &actual.inner));
589 }
590
591 #[allow(dead_code)] #[derive(Debug)]
593 struct PresetReasonRetryClassifier {
594 retry_actions: Mutex<Vec<RetryAction>>,
595 }
596
597 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
598 impl PresetReasonRetryClassifier {
599 fn new(mut retry_reasons: Vec<RetryAction>) -> Self {
600 retry_reasons.reverse();
602 Self {
603 retry_actions: Mutex::new(retry_reasons),
604 }
605 }
606 }
607
608 impl ClassifyRetry for PresetReasonRetryClassifier {
609 fn classify_retry(&self, ctx: &InterceptorContext) -> RetryAction {
610 let output_or_error = ctx.output_or_error();
612 match output_or_error {
614 Some(Ok(_)) | None => return RetryAction::NoActionIndicated,
615 _ => (),
616 };
617
618 let mut retry_actions = self.retry_actions.lock().unwrap();
619 if retry_actions.len() == 1 {
620 retry_actions.first().unwrap().clone()
621 } else {
622 retry_actions.pop().unwrap()
623 }
624 }
625
626 fn name(&self) -> &'static str {
627 "Always returns a preset retry reason"
628 }
629 }
630
631 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
632 fn setup_test(
633 retry_reasons: Vec<RetryAction>,
634 retry_config: RetryConfig,
635 ) -> (ConfigBag, RuntimeComponents, InterceptorContext) {
636 let rc = RuntimeComponentsBuilder::for_tests()
637 .with_retry_classifier(SharedRetryClassifier::new(
638 PresetReasonRetryClassifier::new(retry_reasons),
639 ))
640 .build()
641 .unwrap();
642 let mut layer = Layer::new("test");
643 layer.store_put(retry_config);
644 let cfg = ConfigBag::of_layers(vec![layer]);
645 let mut ctx = InterceptorContext::new(Input::doesnt_matter());
646 ctx.set_output_or_error(Err(OrchestratorError::other("doesn't matter")));
648
649 (cfg, rc, ctx)
650 }
651
652 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
653 #[test]
654 fn eventual_success() {
655 let (mut cfg, rc, mut ctx) = setup_test(
656 vec![RetryAction::server_error()],
657 RetryConfig::standard()
658 .with_use_static_exponential_base(true)
659 .with_max_attempts(5),
660 );
661 let strategy = StandardRetryStrategy::new();
662 cfg.interceptor_state().store_put(TokenBucket::default());
663 let token_bucket = cfg.load::<TokenBucket>().unwrap().clone();
664
665 cfg.interceptor_state().store_put(RequestAttempts::new(1));
666 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
667 let dur = should_retry.expect_delay();
668 assert_eq!(dur, Duration::from_secs(1));
669 assert_eq!(token_bucket.available_permits(), 495);
670
671 cfg.interceptor_state().store_put(RequestAttempts::new(2));
672 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
673 let dur = should_retry.expect_delay();
674 assert_eq!(dur, Duration::from_secs(2));
675 assert_eq!(token_bucket.available_permits(), 490);
676
677 ctx.set_output_or_error(Ok(Output::doesnt_matter()));
678
679 cfg.interceptor_state().store_put(RequestAttempts::new(3));
680 let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
681 assert_eq!(no_retry, ShouldAttempt::No);
682 assert_eq!(token_bucket.available_permits(), 495);
683 }
684
685 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
686 #[test]
687 fn no_more_attempts() {
688 let (mut cfg, rc, ctx) = setup_test(
689 vec![RetryAction::server_error()],
690 RetryConfig::standard()
691 .with_use_static_exponential_base(true)
692 .with_max_attempts(3),
693 );
694 let strategy = StandardRetryStrategy::new();
695 cfg.interceptor_state().store_put(TokenBucket::default());
696 let token_bucket = cfg.load::<TokenBucket>().unwrap().clone();
697
698 cfg.interceptor_state().store_put(RequestAttempts::new(1));
699 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
700 let dur = should_retry.expect_delay();
701 assert_eq!(dur, Duration::from_secs(1));
702 assert_eq!(token_bucket.available_permits(), 495);
703
704 cfg.interceptor_state().store_put(RequestAttempts::new(2));
705 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
706 let dur = should_retry.expect_delay();
707 assert_eq!(dur, Duration::from_secs(2));
708 assert_eq!(token_bucket.available_permits(), 490);
709
710 cfg.interceptor_state().store_put(RequestAttempts::new(3));
711 let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
712 assert_eq!(no_retry, ShouldAttempt::No);
713 assert_eq!(token_bucket.available_permits(), 490);
714 }
715
716 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
717 #[test]
718 fn successful_request_and_deser_should_be_retryable() {
719 #[derive(Clone, Copy, Debug)]
720 enum LongRunningOperationStatus {
721 Running,
722 Complete,
723 }
724
725 #[derive(Debug)]
726 struct LongRunningOperationOutput {
727 status: Option<LongRunningOperationStatus>,
728 }
729
730 impl LongRunningOperationOutput {
731 fn status(&self) -> Option<LongRunningOperationStatus> {
732 self.status
733 }
734 }
735
736 struct WaiterRetryClassifier {}
737
738 impl WaiterRetryClassifier {
739 fn new() -> Self {
740 WaiterRetryClassifier {}
741 }
742 }
743
744 impl fmt::Debug for WaiterRetryClassifier {
745 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
746 write!(f, "WaiterRetryClassifier")
747 }
748 }
749 impl ClassifyRetry for WaiterRetryClassifier {
750 fn classify_retry(&self, ctx: &InterceptorContext) -> RetryAction {
751 let status: Option<LongRunningOperationStatus> =
752 ctx.output_or_error().and_then(|res| {
753 res.ok().and_then(|output| {
754 output
755 .downcast_ref::<LongRunningOperationOutput>()
756 .and_then(|output| output.status())
757 })
758 });
759
760 if let Some(LongRunningOperationStatus::Running) = status {
761 return RetryAction::server_error();
762 };
763
764 RetryAction::NoActionIndicated
765 }
766
767 fn name(&self) -> &'static str {
768 "waiter retry classifier"
769 }
770 }
771
772 let retry_config = RetryConfig::standard()
773 .with_use_static_exponential_base(true)
774 .with_max_attempts(5);
775
776 let rc = RuntimeComponentsBuilder::for_tests()
777 .with_retry_classifier(SharedRetryClassifier::new(WaiterRetryClassifier::new()))
778 .build()
779 .unwrap();
780 let mut layer = Layer::new("test");
781 layer.store_put(retry_config);
782 let mut cfg = ConfigBag::of_layers(vec![layer]);
783 let mut ctx = InterceptorContext::new(Input::doesnt_matter());
784 let strategy = StandardRetryStrategy::new();
785
786 ctx.set_output_or_error(Ok(Output::erase(LongRunningOperationOutput {
787 status: Some(LongRunningOperationStatus::Running),
788 })));
789
790 cfg.interceptor_state().store_put(TokenBucket::new(5));
791 let token_bucket = cfg.load::<TokenBucket>().unwrap().clone();
792
793 cfg.interceptor_state().store_put(RequestAttempts::new(1));
794 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
795 let dur = should_retry.expect_delay();
796 assert_eq!(dur, Duration::from_secs(1));
797 assert_eq!(token_bucket.available_permits(), 0);
798
799 ctx.set_output_or_error(Ok(Output::erase(LongRunningOperationOutput {
800 status: Some(LongRunningOperationStatus::Complete),
801 })));
802 cfg.interceptor_state().store_put(RequestAttempts::new(2));
803 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
804 should_retry.expect_no();
805 assert_eq!(token_bucket.available_permits(), 5);
806 }
807
808 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
809 #[test]
810 fn no_quota() {
811 let (mut cfg, rc, ctx) = setup_test(
812 vec![RetryAction::server_error()],
813 RetryConfig::standard()
814 .with_use_static_exponential_base(true)
815 .with_max_attempts(5),
816 );
817 let strategy = StandardRetryStrategy::new();
818 cfg.interceptor_state().store_put(TokenBucket::new(5));
819 let token_bucket = cfg.load::<TokenBucket>().unwrap().clone();
820
821 cfg.interceptor_state().store_put(RequestAttempts::new(1));
822 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
823 let dur = should_retry.expect_delay();
824 assert_eq!(dur, Duration::from_secs(1));
825 assert_eq!(token_bucket.available_permits(), 0);
826
827 cfg.interceptor_state().store_put(RequestAttempts::new(2));
828 let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
829 assert_eq!(no_retry, ShouldAttempt::No);
830 assert_eq!(token_bucket.available_permits(), 0);
831 }
832
833 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
834 #[test]
835 fn quota_replenishes_on_success() {
836 let (mut cfg, rc, mut ctx) = setup_test(
837 vec![
838 RetryAction::transient_error(),
839 RetryAction::retryable_error_with_explicit_delay(
840 ErrorKind::TransientError,
841 Duration::from_secs(1),
842 ),
843 ],
844 RetryConfig::standard()
845 .with_use_static_exponential_base(true)
846 .with_max_attempts(5),
847 );
848 let strategy = StandardRetryStrategy::new();
849 cfg.interceptor_state().store_put(TokenBucket::new(100));
850 let token_bucket = cfg.load::<TokenBucket>().unwrap().clone();
851
852 cfg.interceptor_state().store_put(RequestAttempts::new(1));
853 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
854 let dur = should_retry.expect_delay();
855 assert_eq!(dur, Duration::from_secs(1));
856 assert_eq!(token_bucket.available_permits(), 90);
857
858 cfg.interceptor_state().store_put(RequestAttempts::new(2));
859 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
860 let dur = should_retry.expect_delay();
861 assert_eq!(dur, Duration::from_secs(1));
862 assert_eq!(token_bucket.available_permits(), 80);
863
864 ctx.set_output_or_error(Ok(Output::doesnt_matter()));
865
866 cfg.interceptor_state().store_put(RequestAttempts::new(3));
867 let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
868 assert_eq!(no_retry, ShouldAttempt::No);
869
870 assert_eq!(token_bucket.available_permits(), 90);
871 }
872
873 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
874 #[test]
875 fn quota_replenishes_on_first_try_success() {
876 const PERMIT_COUNT: usize = 20;
877 let (mut cfg, rc, mut ctx) = setup_test(
878 vec![RetryAction::transient_error()],
879 RetryConfig::standard()
880 .with_use_static_exponential_base(true)
881 .with_max_attempts(u32::MAX),
882 );
883 let strategy = StandardRetryStrategy::new();
884 cfg.interceptor_state()
885 .store_put(TokenBucket::new(PERMIT_COUNT));
886 let token_bucket = cfg.load::<TokenBucket>().unwrap().clone();
887
888 let mut attempt = 1;
889
890 while token_bucket.available_permits() > 0 {
892 if attempt > 2 {
894 panic!("This test should have completed by now (drain)");
895 }
896
897 cfg.interceptor_state()
898 .store_put(RequestAttempts::new(attempt));
899 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
900 assert!(matches!(should_retry, ShouldAttempt::YesAfterDelay(_)));
901 attempt += 1;
902 }
903
904 let permit = strategy.retry_permit.lock().unwrap().take().unwrap();
906 permit.forget();
907
908 ctx.set_output_or_error(Ok(Output::doesnt_matter()));
909
910 while token_bucket.available_permits() < PERMIT_COUNT {
912 if attempt > 23 {
913 panic!("This test should have completed by now (fill-up)");
914 }
915
916 cfg.interceptor_state()
917 .store_put(RequestAttempts::new(attempt));
918 let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
919 assert_eq!(no_retry, ShouldAttempt::No);
920 attempt += 1;
921 }
922
923 assert_eq!(attempt, 23);
924 assert_eq!(token_bucket.available_permits(), PERMIT_COUNT);
925 }
926
927 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
928 #[test]
929 fn backoff_timing() {
930 let (mut cfg, rc, ctx) = setup_test(
931 vec![RetryAction::server_error()],
932 RetryConfig::standard()
933 .with_use_static_exponential_base(true)
934 .with_max_attempts(5),
935 );
936 let strategy = StandardRetryStrategy::new();
937 cfg.interceptor_state().store_put(TokenBucket::default());
938 let token_bucket = cfg.load::<TokenBucket>().unwrap().clone();
939
940 cfg.interceptor_state().store_put(RequestAttempts::new(1));
941 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
942 let dur = should_retry.expect_delay();
943 assert_eq!(dur, Duration::from_secs(1));
944 assert_eq!(token_bucket.available_permits(), 495);
945
946 cfg.interceptor_state().store_put(RequestAttempts::new(2));
947 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
948 let dur = should_retry.expect_delay();
949 assert_eq!(dur, Duration::from_secs(2));
950 assert_eq!(token_bucket.available_permits(), 490);
951
952 cfg.interceptor_state().store_put(RequestAttempts::new(3));
953 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
954 let dur = should_retry.expect_delay();
955 assert_eq!(dur, Duration::from_secs(4));
956 assert_eq!(token_bucket.available_permits(), 485);
957
958 cfg.interceptor_state().store_put(RequestAttempts::new(4));
959 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
960 let dur = should_retry.expect_delay();
961 assert_eq!(dur, Duration::from_secs(8));
962 assert_eq!(token_bucket.available_permits(), 480);
963
964 cfg.interceptor_state().store_put(RequestAttempts::new(5));
965 let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
966 assert_eq!(no_retry, ShouldAttempt::No);
967 assert_eq!(token_bucket.available_permits(), 480);
968 }
969
970 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
971 #[test]
972 fn max_backoff_time() {
973 let (mut cfg, rc, ctx) = setup_test(
974 vec![RetryAction::server_error()],
975 RetryConfig::standard()
976 .with_use_static_exponential_base(true)
977 .with_max_attempts(5)
978 .with_initial_backoff(Duration::from_secs(1))
979 .with_max_backoff(Duration::from_secs(3)),
980 );
981 let strategy = StandardRetryStrategy::new();
982 cfg.interceptor_state().store_put(TokenBucket::default());
983 let token_bucket = cfg.load::<TokenBucket>().unwrap().clone();
984
985 cfg.interceptor_state().store_put(RequestAttempts::new(1));
986 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
987 let dur = should_retry.expect_delay();
988 assert_eq!(dur, Duration::from_secs(1));
989 assert_eq!(token_bucket.available_permits(), 495);
990
991 cfg.interceptor_state().store_put(RequestAttempts::new(2));
992 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
993 let dur = should_retry.expect_delay();
994 assert_eq!(dur, Duration::from_secs(2));
995 assert_eq!(token_bucket.available_permits(), 490);
996
997 cfg.interceptor_state().store_put(RequestAttempts::new(3));
998 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
999 let dur = should_retry.expect_delay();
1000 assert_eq!(dur, Duration::from_secs(3));
1001 assert_eq!(token_bucket.available_permits(), 485);
1002
1003 cfg.interceptor_state().store_put(RequestAttempts::new(4));
1004 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1005 let dur = should_retry.expect_delay();
1006 assert_eq!(dur, Duration::from_secs(3));
1007 assert_eq!(token_bucket.available_permits(), 480);
1008
1009 cfg.interceptor_state().store_put(RequestAttempts::new(5));
1010 let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1011 assert_eq!(no_retry, ShouldAttempt::No);
1012 assert_eq!(token_bucket.available_permits(), 480);
1013 }
1014
1015 const MAX_BACKOFF: Duration = Duration::from_secs(20);
1016
1017 #[test]
1018 fn calculate_exponential_backoff_where_initial_backoff_is_one() {
1019 let initial_backoff = 1.0;
1020
1021 for (attempt, expected_backoff) in [initial_backoff, 2.0, 4.0].into_iter().enumerate() {
1022 let actual_backoff =
1023 calculate_exponential_backoff(1.0, initial_backoff, attempt as u32, MAX_BACKOFF);
1024 assert_eq!(Duration::from_secs_f64(expected_backoff), actual_backoff);
1025 }
1026 }
1027
1028 #[test]
1029 fn calculate_exponential_backoff_where_initial_backoff_is_greater_than_one() {
1030 let initial_backoff = 3.0;
1031
1032 for (attempt, expected_backoff) in [initial_backoff, 6.0, 12.0].into_iter().enumerate() {
1033 let actual_backoff =
1034 calculate_exponential_backoff(1.0, initial_backoff, attempt as u32, MAX_BACKOFF);
1035 assert_eq!(Duration::from_secs_f64(expected_backoff), actual_backoff);
1036 }
1037 }
1038
1039 #[test]
1040 fn calculate_exponential_backoff_where_initial_backoff_is_less_than_one() {
1041 let initial_backoff = 0.03;
1042
1043 for (attempt, expected_backoff) in [initial_backoff, 0.06, 0.12].into_iter().enumerate() {
1044 let actual_backoff =
1045 calculate_exponential_backoff(1.0, initial_backoff, attempt as u32, MAX_BACKOFF);
1046 assert_eq!(Duration::from_secs_f64(expected_backoff), actual_backoff);
1047 }
1048 }
1049
1050 #[test]
1051 fn calculate_backoff_overflow_should_gracefully_fallback_to_max_backoff() {
1052 assert_eq!(
1054 MAX_BACKOFF,
1055 calculate_exponential_backoff(1_f64, 10_f64, 100000, MAX_BACKOFF),
1056 );
1057 }
1058}