533 533 | .expect("method is infallible for this use");
|
534 534 | assert_eq!(ShouldAttempt::YesAfterDelay(MAX_BACKOFF), actual);
|
535 535 | }
|
536 536 |
|
537 537 | #[allow(dead_code)] // will be unused with `--no-default-features --features client`
|
538 538 | #[derive(Debug)]
|
539 539 | struct PresetReasonRetryClassifier {
|
540 540 | retry_actions: Mutex<Vec<RetryAction>>,
|
541 541 | }
|
542 542 |
|
543 - | #[cfg(feature = "test-util")]
|
543 + | #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
|
544 544 | impl PresetReasonRetryClassifier {
|
545 545 | fn new(mut retry_reasons: Vec<RetryAction>) -> Self {
|
546 546 | // We'll pop the retry_reasons in reverse order, so we reverse the list to fix that.
|
547 547 | retry_reasons.reverse();
|
548 548 | Self {
|
549 549 | retry_actions: Mutex::new(retry_reasons),
|
550 550 | }
|
551 551 | }
|
552 552 | }
|
553 553 |
|
554 554 | impl ClassifyRetry for PresetReasonRetryClassifier {
|
555 555 | fn classify_retry(&self, ctx: &InterceptorContext) -> RetryAction {
|
556 556 | // Check for a result
|
557 557 | let output_or_error = ctx.output_or_error();
|
558 558 | // Check for an error
|
559 559 | match output_or_error {
|
560 560 | Some(Ok(_)) | None => return RetryAction::NoActionIndicated,
|
561 561 | _ => (),
|
562 562 | };
|
563 563 |
|
564 564 | let mut retry_actions = self.retry_actions.lock().unwrap();
|
565 565 | if retry_actions.len() == 1 {
|
566 566 | retry_actions.first().unwrap().clone()
|
567 567 | } else {
|
568 568 | retry_actions.pop().unwrap()
|
569 569 | }
|
570 570 | }
|
571 571 |
|
572 572 | fn name(&self) -> &'static str {
|
573 573 | "Always returns a preset retry reason"
|
574 574 | }
|
575 575 | }
|
576 576 |
|
577 - | #[cfg(feature = "test-util")]
|
577 + | #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
|
578 578 | fn setup_test(
|
579 579 | retry_reasons: Vec<RetryAction>,
|
580 580 | retry_config: RetryConfig,
|
581 581 | ) -> (ConfigBag, RuntimeComponents, InterceptorContext) {
|
582 582 | let rc = RuntimeComponentsBuilder::for_tests()
|
583 583 | .with_retry_classifier(SharedRetryClassifier::new(
|
584 584 | PresetReasonRetryClassifier::new(retry_reasons),
|
585 585 | ))
|
586 586 | .build()
|
587 587 | .unwrap();
|
588 588 | let mut layer = Layer::new("test");
|
589 589 | layer.store_put(retry_config);
|
590 590 | let cfg = ConfigBag::of_layers(vec![layer]);
|
591 591 | let mut ctx = InterceptorContext::new(Input::doesnt_matter());
|
592 592 | // This type doesn't matter b/c the classifier will just return whatever we tell it to.
|
593 593 | ctx.set_output_or_error(Err(OrchestratorError::other("doesn't matter")));
|
594 594 |
|
595 595 | (cfg, rc, ctx)
|
596 596 | }
|
597 597 |
|
598 - | #[cfg(feature = "test-util")]
|
598 + | #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
|
599 599 | #[test]
|
600 600 | fn eventual_success() {
|
601 601 | let (mut cfg, rc, mut ctx) = setup_test(
|
602 602 | vec![RetryAction::server_error()],
|
603 603 | RetryConfig::standard()
|
604 604 | .with_use_static_exponential_base(true)
|
605 605 | .with_max_attempts(5),
|
606 606 | );
|
607 607 | let strategy = StandardRetryStrategy::new();
|
608 608 | cfg.interceptor_state().store_put(TokenBucket::default());
|
609 609 | let token_bucket = cfg.load::<TokenBucket>().unwrap().clone();
|
610 610 |
|
611 611 | cfg.interceptor_state().store_put(RequestAttempts::new(1));
|
612 612 | let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
|
613 613 | let dur = should_retry.expect_delay();
|
614 614 | assert_eq!(dur, Duration::from_secs(1));
|
615 615 | assert_eq!(token_bucket.available_permits(), 495);
|
616 616 |
|
617 617 | cfg.interceptor_state().store_put(RequestAttempts::new(2));
|
618 618 | let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
|
619 619 | let dur = should_retry.expect_delay();
|
620 620 | assert_eq!(dur, Duration::from_secs(2));
|
621 621 | assert_eq!(token_bucket.available_permits(), 490);
|
622 622 |
|
623 623 | ctx.set_output_or_error(Ok(Output::doesnt_matter()));
|
624 624 |
|
625 625 | cfg.interceptor_state().store_put(RequestAttempts::new(3));
|
626 626 | let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
|
627 627 | assert_eq!(no_retry, ShouldAttempt::No);
|
628 628 | assert_eq!(token_bucket.available_permits(), 495);
|
629 629 | }
|
630 630 |
|
631 - | #[cfg(feature = "test-util")]
|
631 + | #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
|
632 632 | #[test]
|
633 633 | fn no_more_attempts() {
|
634 634 | let (mut cfg, rc, ctx) = setup_test(
|
635 635 | vec![RetryAction::server_error()],
|
636 636 | RetryConfig::standard()
|
637 637 | .with_use_static_exponential_base(true)
|
638 638 | .with_max_attempts(3),
|
639 639 | );
|
640 640 | let strategy = StandardRetryStrategy::new();
|
641 641 | cfg.interceptor_state().store_put(TokenBucket::default());
|
642 642 | let token_bucket = cfg.load::<TokenBucket>().unwrap().clone();
|
643 643 |
|
644 644 | cfg.interceptor_state().store_put(RequestAttempts::new(1));
|
645 645 | let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
|
646 646 | let dur = should_retry.expect_delay();
|
647 647 | assert_eq!(dur, Duration::from_secs(1));
|
648 648 | assert_eq!(token_bucket.available_permits(), 495);
|
649 649 |
|
650 650 | cfg.interceptor_state().store_put(RequestAttempts::new(2));
|
651 651 | let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
|
652 652 | let dur = should_retry.expect_delay();
|
653 653 | assert_eq!(dur, Duration::from_secs(2));
|
654 654 | assert_eq!(token_bucket.available_permits(), 490);
|
655 655 |
|
656 656 | cfg.interceptor_state().store_put(RequestAttempts::new(3));
|
657 657 | let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
|
658 658 | assert_eq!(no_retry, ShouldAttempt::No);
|
659 659 | assert_eq!(token_bucket.available_permits(), 490);
|
660 660 | }
|
661 661 |
|
662 - | #[cfg(feature = "test-util")]
|
662 + | #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
|
663 663 | #[test]
|
664 664 | fn successful_request_and_deser_should_be_retryable() {
|
665 665 | #[derive(Clone, Copy, Debug)]
|
666 666 | enum LongRunningOperationStatus {
|
667 667 | Running,
|
668 668 | Complete,
|
669 669 | }
|
670 670 |
|
671 671 | #[derive(Debug)]
|
672 672 | struct LongRunningOperationOutput {
|
744 744 |
|
745 745 | ctx.set_output_or_error(Ok(Output::erase(LongRunningOperationOutput {
|
746 746 | status: Some(LongRunningOperationStatus::Complete),
|
747 747 | })));
|
748 748 | cfg.interceptor_state().store_put(RequestAttempts::new(2));
|
749 749 | let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
|
750 750 | should_retry.expect_no();
|
751 751 | assert_eq!(token_bucket.available_permits(), 5);
|
752 752 | }
|
753 753 |
|
754 - | #[cfg(feature = "test-util")]
|
754 + | #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
|
755 755 | #[test]
|
756 756 | fn no_quota() {
|
757 757 | let (mut cfg, rc, ctx) = setup_test(
|
758 758 | vec![RetryAction::server_error()],
|
759 759 | RetryConfig::standard()
|
760 760 | .with_use_static_exponential_base(true)
|
761 761 | .with_max_attempts(5),
|
762 762 | );
|
763 763 | let strategy = StandardRetryStrategy::new();
|
764 764 | cfg.interceptor_state().store_put(TokenBucket::new(5));
|
765 765 | let token_bucket = cfg.load::<TokenBucket>().unwrap().clone();
|
766 766 |
|
767 767 | cfg.interceptor_state().store_put(RequestAttempts::new(1));
|
768 768 | let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
|
769 769 | let dur = should_retry.expect_delay();
|
770 770 | assert_eq!(dur, Duration::from_secs(1));
|
771 771 | assert_eq!(token_bucket.available_permits(), 0);
|
772 772 |
|
773 773 | cfg.interceptor_state().store_put(RequestAttempts::new(2));
|
774 774 | let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
|
775 775 | assert_eq!(no_retry, ShouldAttempt::No);
|
776 776 | assert_eq!(token_bucket.available_permits(), 0);
|
777 777 | }
|
778 778 |
|
779 - | #[cfg(feature = "test-util")]
|
779 + | #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
|
780 780 | #[test]
|
781 781 | fn quota_replenishes_on_success() {
|
782 782 | let (mut cfg, rc, mut ctx) = setup_test(
|
783 783 | vec![
|
784 784 | RetryAction::transient_error(),
|
785 785 | RetryAction::retryable_error_with_explicit_delay(
|
786 786 | ErrorKind::TransientError,
|
787 787 | Duration::from_secs(1),
|
788 788 | ),
|
789 789 | ],
|
790 790 | RetryConfig::standard()
|
791 791 | .with_use_static_exponential_base(true)
|
792 792 | .with_max_attempts(5),
|
793 793 | );
|
794 794 | let strategy = StandardRetryStrategy::new();
|
795 795 | cfg.interceptor_state().store_put(TokenBucket::new(100));
|
796 796 | let token_bucket = cfg.load::<TokenBucket>().unwrap().clone();
|
797 797 |
|
798 798 | cfg.interceptor_state().store_put(RequestAttempts::new(1));
|
799 799 | let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
|
800 800 | let dur = should_retry.expect_delay();
|
801 801 | assert_eq!(dur, Duration::from_secs(1));
|
802 802 | assert_eq!(token_bucket.available_permits(), 90);
|
803 803 |
|
804 804 | cfg.interceptor_state().store_put(RequestAttempts::new(2));
|
805 805 | let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
|
806 806 | let dur = should_retry.expect_delay();
|
807 807 | assert_eq!(dur, Duration::from_secs(1));
|
808 808 | assert_eq!(token_bucket.available_permits(), 80);
|
809 809 |
|
810 810 | ctx.set_output_or_error(Ok(Output::doesnt_matter()));
|
811 811 |
|
812 812 | cfg.interceptor_state().store_put(RequestAttempts::new(3));
|
813 813 | let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
|
814 814 | assert_eq!(no_retry, ShouldAttempt::No);
|
815 815 |
|
816 816 | assert_eq!(token_bucket.available_permits(), 90);
|
817 817 | }
|
818 818 |
|
819 - | #[cfg(feature = "test-util")]
|
819 + | #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
|
820 820 | #[test]
|
821 821 | fn quota_replenishes_on_first_try_success() {
|
822 822 | const PERMIT_COUNT: usize = 20;
|
823 823 | let (mut cfg, rc, mut ctx) = setup_test(
|
824 824 | vec![RetryAction::transient_error()],
|
825 825 | RetryConfig::standard()
|
826 826 | .with_use_static_exponential_base(true)
|
827 827 | .with_max_attempts(u32::MAX),
|
828 828 | );
|
829 829 | let strategy = StandardRetryStrategy::new();
|
830 830 | cfg.interceptor_state()
|
831 831 | .store_put(TokenBucket::new(PERMIT_COUNT));
|
832 832 | let token_bucket = cfg.load::<TokenBucket>().unwrap().clone();
|
833 833 |
|
834 834 | let mut attempt = 1;
|
835 835 |
|
836 836 | // Drain all available permits with failed attempts
|
837 837 | while token_bucket.available_permits() > 0 {
|
838 838 | // Draining should complete in 2 attempts
|
839 839 | if attempt > 2 {
|
840 840 | panic!("This test should have completed by now (drain)");
|
841 841 | }
|
842 842 |
|
843 843 | cfg.interceptor_state()
|
844 844 | .store_put(RequestAttempts::new(attempt));
|
845 845 | let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
|
846 846 | assert!(matches!(should_retry, ShouldAttempt::YesAfterDelay(_)));
|
847 847 | attempt += 1;
|
848 848 | }
|
849 849 |
|
850 850 | // Forget the permit so that we can only refill by "success on first try".
|
851 851 | let permit = strategy.retry_permit.lock().unwrap().take().unwrap();
|
852 852 | permit.forget();
|
853 853 |
|
854 854 | ctx.set_output_or_error(Ok(Output::doesnt_matter()));
|
855 855 |
|
856 856 | // Replenish permits until we get back to `PERMIT_COUNT`
|
857 857 | while token_bucket.available_permits() < PERMIT_COUNT {
|
858 858 | if attempt > 23 {
|
859 859 | panic!("This test should have completed by now (fill-up)");
|
860 860 | }
|
861 861 |
|
862 862 | cfg.interceptor_state()
|
863 863 | .store_put(RequestAttempts::new(attempt));
|
864 864 | let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
|
865 865 | assert_eq!(no_retry, ShouldAttempt::No);
|
866 866 | attempt += 1;
|
867 867 | }
|
868 868 |
|
869 869 | assert_eq!(attempt, 23);
|
870 870 | assert_eq!(token_bucket.available_permits(), PERMIT_COUNT);
|
871 871 | }
|
872 872 |
|
873 - | #[cfg(feature = "test-util")]
|
873 + | #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
|
874 874 | #[test]
|
875 875 | fn backoff_timing() {
|
876 876 | let (mut cfg, rc, ctx) = setup_test(
|
877 877 | vec![RetryAction::server_error()],
|
878 878 | RetryConfig::standard()
|
879 879 | .with_use_static_exponential_base(true)
|
880 880 | .with_max_attempts(5),
|
881 881 | );
|
882 882 | let strategy = StandardRetryStrategy::new();
|
883 883 | cfg.interceptor_state().store_put(TokenBucket::default());
|
884 884 | let token_bucket = cfg.load::<TokenBucket>().unwrap().clone();
|
885 885 |
|
886 886 | cfg.interceptor_state().store_put(RequestAttempts::new(1));
|
887 887 | let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
|
888 888 | let dur = should_retry.expect_delay();
|
889 889 | assert_eq!(dur, Duration::from_secs(1));
|
890 890 | assert_eq!(token_bucket.available_permits(), 495);
|
891 891 |
|
892 892 | cfg.interceptor_state().store_put(RequestAttempts::new(2));
|
893 893 | let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
|
894 894 | let dur = should_retry.expect_delay();
|
895 895 | assert_eq!(dur, Duration::from_secs(2));
|
896 896 | assert_eq!(token_bucket.available_permits(), 490);
|
897 897 |
|
898 898 | cfg.interceptor_state().store_put(RequestAttempts::new(3));
|
899 899 | let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
|
900 900 | let dur = should_retry.expect_delay();
|
901 901 | assert_eq!(dur, Duration::from_secs(4));
|
902 902 | assert_eq!(token_bucket.available_permits(), 485);
|
903 903 |
|
904 904 | cfg.interceptor_state().store_put(RequestAttempts::new(4));
|
905 905 | let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
|
906 906 | let dur = should_retry.expect_delay();
|
907 907 | assert_eq!(dur, Duration::from_secs(8));
|
908 908 | assert_eq!(token_bucket.available_permits(), 480);
|
909 909 |
|
910 910 | cfg.interceptor_state().store_put(RequestAttempts::new(5));
|
911 911 | let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
|
912 912 | assert_eq!(no_retry, ShouldAttempt::No);
|
913 913 | assert_eq!(token_bucket.available_permits(), 480);
|
914 914 | }
|
915 915 |
|
916 - | #[cfg(feature = "test-util")]
|
916 + | #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
|
917 917 | #[test]
|
918 918 | fn max_backoff_time() {
|
919 919 | let (mut cfg, rc, ctx) = setup_test(
|
920 920 | vec![RetryAction::server_error()],
|
921 921 | RetryConfig::standard()
|
922 922 | .with_use_static_exponential_base(true)
|
923 923 | .with_max_attempts(5)
|
924 924 | .with_initial_backoff(Duration::from_secs(1))
|
925 925 | .with_max_backoff(Duration::from_secs(3)),
|
926 926 | );
|