aws_smithy_runtime/client/retries/
client_rate_limiter.rs

1/*
2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6//! A rate limiter for controlling the rate at which AWS requests are made. The rate changes based
7//! on the number of throttling errors encountered.
8
9#![allow(dead_code)]
10
11use crate::client::retries::RetryPartition;
12use std::sync::{Arc, Mutex};
13use std::time::Duration;
14use tracing::debug;
15
16/// Represents a partition for the rate limiter, e.g. an endpoint, a region
17#[non_exhaustive]
18#[derive(Clone, Debug, Hash, PartialEq, Eq)]
19pub struct ClientRateLimiterPartition {
20    retry_partition: RetryPartition,
21}
22
23impl ClientRateLimiterPartition {
24    /// Creates a `ClientRateLimiterPartition` from the given [`RetryPartition`]
25    pub fn new(retry_partition: RetryPartition) -> Self {
26        Self { retry_partition }
27    }
28}
29
30const RETRY_COST: f64 = 5.0;
31const RETRY_TIMEOUT_COST: f64 = RETRY_COST * 2.0;
32const INITIAL_REQUEST_COST: f64 = 1.0;
33
34const MIN_FILL_RATE: f64 = 0.5;
35const MIN_CAPACITY: f64 = 1.0;
36const SMOOTH: f64 = 0.8;
37/// How much to scale back after receiving a throttling response
38const BETA: f64 = 0.7;
39/// Controls how aggressively we scale up after being throttled
40const SCALE_CONSTANT: f64 = 0.4;
41
42/// Rate limiter for adaptive retry.
43#[derive(Clone, Debug)]
44pub struct ClientRateLimiter {
45    pub(crate) inner: Arc<Mutex<Inner>>,
46}
47
48#[derive(Debug)]
49pub(crate) struct Inner {
50    /// The rate at which token are replenished.
51    fill_rate: f64,
52    /// The maximum capacity allowed in the token bucket.
53    max_capacity: f64,
54    /// The current capacity of the token bucket. The minimum this can be is 0.0.
55    current_capacity: f64,
56    /// The last time the token bucket was refilled.
57    last_timestamp: Option<f64>,
58    /// Boolean indicating if the token bucket is enabled.
59    /// The token bucket is initially disabled.
60    /// When a throttling error is encountered it is enabled.
61    enabled: bool,
62    /// The smoothed rate which tokens are being retrieved.
63    measured_tx_rate: f64,
64    /// The last half second time bucket used.
65    last_tx_rate_bucket: f64,
66    /// The number of requests seen within the current time bucket.
67    request_count: u64,
68    /// The maximum rate when the client was last throttled.
69    last_max_rate: f64,
70    /// The last time when the client was throttled.
71    time_of_last_throttle: f64,
72}
73
74pub(crate) enum RequestReason {
75    Retry,
76    RetryTimeout,
77    InitialRequest,
78}
79
80impl Default for ClientRateLimiter {
81    fn default() -> Self {
82        Self::builder().build()
83    }
84}
85
86impl ClientRateLimiter {
87    /// Creates a new `ClientRateLimiter`
88    pub fn new(seconds_since_unix_epoch: f64) -> Self {
89        Self::builder()
90            .tokens_retrieved_per_second(MIN_FILL_RATE)
91            .time_of_last_throttle(seconds_since_unix_epoch)
92            .previous_time_bucket(seconds_since_unix_epoch.floor())
93            .build()
94    }
95
96    /// Creates a new `ClientRateLimiterBuilder`
97    pub fn builder() -> ClientRateLimiterBuilder {
98        ClientRateLimiterBuilder::new()
99    }
100
101    pub(crate) fn acquire_permission_to_send_a_request(
102        &self,
103        seconds_since_unix_epoch: f64,
104        kind: RequestReason,
105    ) -> Result<(), Duration> {
106        let mut it = self.inner.lock().unwrap();
107
108        if !it.enabled {
109            // return early if we haven't encountered a throttling error yet
110            return Ok(());
111        }
112        let amount = match kind {
113            RequestReason::Retry => RETRY_COST,
114            RequestReason::RetryTimeout => RETRY_TIMEOUT_COST,
115            RequestReason::InitialRequest => INITIAL_REQUEST_COST,
116        };
117
118        it.refill(seconds_since_unix_epoch);
119
120        if amount > it.current_capacity {
121            let sleep_time = (amount - it.current_capacity) / it.fill_rate;
122            debug!(
123                amount,
124                it.current_capacity,
125                it.fill_rate,
126                sleep_time,
127                "client rate limiter delayed a request"
128            );
129            // Capacity unchanged; caller sleeps and re-acquires.
130            Err(Duration::from_secs_f64(sleep_time))
131        } else {
132            it.current_capacity -= amount;
133            Ok(())
134        }
135    }
136
137    pub(crate) fn update_rate_limiter(
138        &self,
139        seconds_since_unix_epoch: f64,
140        is_throttling_error: bool,
141    ) {
142        let mut it = self.inner.lock().unwrap();
143        it.update_tokens_retrieved_per_second(seconds_since_unix_epoch);
144
145        let calculated_rate;
146        if is_throttling_error {
147            let rate_to_use = if it.enabled {
148                f64::min(it.measured_tx_rate, it.fill_rate)
149            } else {
150                it.measured_tx_rate
151            };
152
153            // The fill_rate is from the token bucket
154            it.last_max_rate = rate_to_use;
155            it.calculate_time_window();
156            it.time_of_last_throttle = seconds_since_unix_epoch;
157            calculated_rate = cubic_throttle(rate_to_use);
158            it.enable_token_bucket();
159        } else {
160            it.calculate_time_window();
161            calculated_rate = it.cubic_success(seconds_since_unix_epoch);
162        }
163
164        let new_rate = f64::min(calculated_rate, 2.0 * it.measured_tx_rate);
165        it.update_bucket_refill_rate(seconds_since_unix_epoch, new_rate);
166    }
167}
168
169impl Inner {
170    fn refill(&mut self, seconds_since_unix_epoch: f64) {
171        if let Some(last_timestamp) = self.last_timestamp {
172            let fill_amount = (seconds_since_unix_epoch - last_timestamp) * self.fill_rate;
173            self.current_capacity =
174                f64::min(self.max_capacity, self.current_capacity + fill_amount);
175            debug!(
176                fill_amount,
177                self.current_capacity, self.max_capacity, "refilling client rate limiter tokens"
178            );
179        }
180        self.last_timestamp = Some(seconds_since_unix_epoch);
181    }
182
183    fn update_bucket_refill_rate(&mut self, seconds_since_unix_epoch: f64, new_fill_rate: f64) {
184        // Refill based on our current rate before we update to the new fill rate.
185        self.refill(seconds_since_unix_epoch);
186
187        self.fill_rate = f64::max(new_fill_rate, MIN_FILL_RATE);
188        self.max_capacity = f64::max(new_fill_rate, MIN_CAPACITY);
189
190        debug!(
191            fill_rate = self.fill_rate,
192            max_capacity = self.max_capacity,
193            current_capacity = self.current_capacity,
194            measured_tx_rate = self.measured_tx_rate,
195            "client rate limiter state has been updated"
196        );
197
198        // When we scale down we can't have a current capacity that exceeds our max_capacity.
199        self.current_capacity = f64::min(self.current_capacity, self.max_capacity);
200    }
201
202    fn enable_token_bucket(&mut self) {
203        // If throttling wasn't already enabled, note that we're now enabling it.
204        if !self.enabled {
205            debug!("client rate limiting has been enabled");
206        }
207        self.enabled = true;
208    }
209
210    fn update_tokens_retrieved_per_second(&mut self, seconds_since_unix_epoch: f64) {
211        let next_time_bucket = (seconds_since_unix_epoch * 2.0).floor() / 2.0;
212        self.request_count += 1;
213
214        if next_time_bucket > self.last_tx_rate_bucket {
215            let current_rate =
216                self.request_count as f64 / (next_time_bucket - self.last_tx_rate_bucket);
217            self.measured_tx_rate = current_rate * SMOOTH + self.measured_tx_rate * (1.0 - SMOOTH);
218            self.request_count = 0;
219            self.last_tx_rate_bucket = next_time_bucket;
220        }
221    }
222
223    fn calculate_time_window(&self) -> f64 {
224        let base = (self.last_max_rate * (1.0 - BETA)) / SCALE_CONSTANT;
225        base.powf(1.0 / 3.0)
226    }
227
228    fn cubic_success(&self, seconds_since_unix_epoch: f64) -> f64 {
229        let dt =
230            seconds_since_unix_epoch - self.time_of_last_throttle - self.calculate_time_window();
231        (SCALE_CONSTANT * dt.powi(3)) + self.last_max_rate
232    }
233}
234
235fn cubic_throttle(rate_to_use: f64) -> f64 {
236    rate_to_use * BETA
237}
238
239/// Builder for `ClientRateLimiter`.
240#[derive(Clone, Debug, Default)]
241pub struct ClientRateLimiterBuilder {
242    ///The rate at which token are replenished.
243    token_refill_rate: Option<f64>,
244    ///The maximum capacity allowed in the token bucket.
245    maximum_bucket_capacity: Option<f64>,
246    ///The current capacity of the token bucket.
247    current_bucket_capacity: Option<f64>,
248    ///The last time the token bucket was refilled.
249    time_of_last_refill: Option<f64>,
250    ///The smoothed rate which tokens are being retrieved.
251    tokens_retrieved_per_second: Option<f64>,
252    ///The last half second time bucket used.
253    previous_time_bucket: Option<f64>,
254    ///The number of requests seen within the current time bucket.
255    request_count: Option<u64>,
256    ///Boolean indicating if the token bucket is enabled. The token bucket is initially disabled. When a throttling error is encountered it is enabled.
257    enable_throttling: Option<bool>,
258    ///The maximum rate when the client was last throttled.
259    tokens_retrieved_per_second_at_time_of_last_throttle: Option<f64>,
260    ///The last time when the client was throttled.
261    time_of_last_throttle: Option<f64>,
262}
263
264impl ClientRateLimiterBuilder {
265    /// Create a new `ClientRateLimiterBuilder`.
266    pub fn new() -> Self {
267        ClientRateLimiterBuilder::default()
268    }
269    /// The rate at which token are replenished.
270    pub fn token_refill_rate(mut self, token_refill_rate: f64) -> Self {
271        self.set_token_refill_rate(Some(token_refill_rate));
272        self
273    }
274    /// The rate at which token are replenished.
275    pub fn set_token_refill_rate(&mut self, token_refill_rate: Option<f64>) -> &mut Self {
276        self.token_refill_rate = token_refill_rate;
277        self
278    }
279    /// The maximum capacity allowed in the token bucket
280    ///
281    /// The implementation of [`ClientRateLimiter`] guarantees that `current_capacity` never exceeds this value.
282    pub fn maximum_bucket_capacity(mut self, maximum_bucket_capacity: f64) -> Self {
283        self.set_maximum_bucket_capacity(Some(maximum_bucket_capacity));
284        self
285    }
286    /// The maximum capacity allowed in the token bucket
287    ///
288    /// The implementation of [`ClientRateLimiter`] guarantees that `current_capacity` never exceeds this value.
289    pub fn set_maximum_bucket_capacity(
290        &mut self,
291        maximum_bucket_capacity: Option<f64>,
292    ) -> &mut Self {
293        self.maximum_bucket_capacity = maximum_bucket_capacity;
294        self
295    }
296    /// The current capacity of the token bucket
297    ///
298    /// The implementation of [`ClientRateLimiter`] guarantees that this value is always at least `1.0` when it's enabled.
299    pub fn current_bucket_capacity(mut self, current_bucket_capacity: f64) -> Self {
300        self.set_current_bucket_capacity(Some(current_bucket_capacity));
301        self
302    }
303    /// The current capacity of the token bucket
304    ///
305    /// The implementation of [`ClientRateLimiter`] guarantees that this value is always at least `1.0` when it's enabled.
306    pub fn set_current_bucket_capacity(
307        &mut self,
308        current_bucket_capacity: Option<f64>,
309    ) -> &mut Self {
310        self.current_bucket_capacity = current_bucket_capacity;
311        self
312    }
313    // The last time the token bucket was refilled.
314    fn time_of_last_refill(mut self, time_of_last_refill: f64) -> Self {
315        self.set_time_of_last_refill(Some(time_of_last_refill));
316        self
317    }
318    // The last time the token bucket was refilled.
319    fn set_time_of_last_refill(&mut self, time_of_last_refill: Option<f64>) -> &mut Self {
320        self.time_of_last_refill = time_of_last_refill;
321        self
322    }
323    /// The smoothed rate which tokens are being retrieved.
324    pub fn tokens_retrieved_per_second(mut self, tokens_retrieved_per_second: f64) -> Self {
325        self.set_tokens_retrieved_per_second(Some(tokens_retrieved_per_second));
326        self
327    }
328    /// The smoothed rate which tokens are being retrieved.
329    pub fn set_tokens_retrieved_per_second(
330        &mut self,
331        tokens_retrieved_per_second: Option<f64>,
332    ) -> &mut Self {
333        self.tokens_retrieved_per_second = tokens_retrieved_per_second;
334        self
335    }
336    // The last half second time bucket used.
337    fn previous_time_bucket(mut self, previous_time_bucket: f64) -> Self {
338        self.set_previous_time_bucket(Some(previous_time_bucket));
339        self
340    }
341    // The last half second time bucket used.
342    fn set_previous_time_bucket(&mut self, previous_time_bucket: Option<f64>) -> &mut Self {
343        self.previous_time_bucket = previous_time_bucket;
344        self
345    }
346    // The number of requests seen within the current time bucket.
347    fn request_count(mut self, request_count: u64) -> Self {
348        self.set_request_count(Some(request_count));
349        self
350    }
351    // The number of requests seen within the current time bucket.
352    fn set_request_count(&mut self, request_count: Option<u64>) -> &mut Self {
353        self.request_count = request_count;
354        self
355    }
356    // Boolean indicating if the token bucket is enabled. The token bucket is initially disabled. When a throttling error is encountered it is enabled.
357    fn enable_throttling(mut self, enable_throttling: bool) -> Self {
358        self.set_enable_throttling(Some(enable_throttling));
359        self
360    }
361    // Boolean indicating if the token bucket is enabled. The token bucket is initially disabled. When a throttling error is encountered it is enabled.
362    fn set_enable_throttling(&mut self, enable_throttling: Option<bool>) -> &mut Self {
363        self.enable_throttling = enable_throttling;
364        self
365    }
366    // The maximum rate when the client was last throttled.
367    fn tokens_retrieved_per_second_at_time_of_last_throttle(
368        mut self,
369        tokens_retrieved_per_second_at_time_of_last_throttle: f64,
370    ) -> Self {
371        self.set_tokens_retrieved_per_second_at_time_of_last_throttle(Some(
372            tokens_retrieved_per_second_at_time_of_last_throttle,
373        ));
374        self
375    }
376    // The maximum rate when the client was last throttled.
377    fn set_tokens_retrieved_per_second_at_time_of_last_throttle(
378        &mut self,
379        tokens_retrieved_per_second_at_time_of_last_throttle: Option<f64>,
380    ) -> &mut Self {
381        self.tokens_retrieved_per_second_at_time_of_last_throttle =
382            tokens_retrieved_per_second_at_time_of_last_throttle;
383        self
384    }
385    // The last time when the client was throttled.
386    fn time_of_last_throttle(mut self, time_of_last_throttle: f64) -> Self {
387        self.set_time_of_last_throttle(Some(time_of_last_throttle));
388        self
389    }
390    // The last time when the client was throttled.
391    fn set_time_of_last_throttle(&mut self, time_of_last_throttle: Option<f64>) -> &mut Self {
392        self.time_of_last_throttle = time_of_last_throttle;
393        self
394    }
395    /// Build the ClientRateLimiter.
396    pub fn build(self) -> ClientRateLimiter {
397        ClientRateLimiter {
398            inner: Arc::new(Mutex::new(Inner {
399                fill_rate: self.token_refill_rate.unwrap_or_default(),
400                max_capacity: self.maximum_bucket_capacity.unwrap_or(f64::MAX),
401                current_capacity: self.current_bucket_capacity.unwrap_or_default(),
402                last_timestamp: self.time_of_last_refill,
403                enabled: self.enable_throttling.unwrap_or_default(),
404                measured_tx_rate: self.tokens_retrieved_per_second.unwrap_or_default(),
405                last_tx_rate_bucket: self.previous_time_bucket.unwrap_or_default(),
406                request_count: self.request_count.unwrap_or_default(),
407                last_max_rate: self
408                    .tokens_retrieved_per_second_at_time_of_last_throttle
409                    .unwrap_or_default(),
410                time_of_last_throttle: self.time_of_last_throttle.unwrap_or_default(),
411            })),
412        }
413    }
414}
415
416#[cfg(test)]
417mod tests {
418    use super::{cubic_throttle, ClientRateLimiter};
419    use crate::client::retries::client_rate_limiter::RequestReason;
420    use approx::assert_relative_eq;
421    use aws_smithy_async::rt::sleep::AsyncSleep;
422    use aws_smithy_async::test_util::instant_time_and_sleep;
423    use std::time::{Duration, SystemTime};
424
425    const ONE_SECOND: Duration = Duration::from_secs(1);
426    const TWO_HUNDRED_MILLISECONDS: Duration = Duration::from_millis(200);
427
428    #[test]
429    fn should_match_beta_decrease() {
430        let new_rate = cubic_throttle(10.0);
431        assert_relative_eq!(new_rate, 7.0);
432
433        let rate_limiter = ClientRateLimiter::builder()
434            .tokens_retrieved_per_second_at_time_of_last_throttle(10.0)
435            .time_of_last_throttle(1.0)
436            .build();
437
438        rate_limiter.inner.lock().unwrap().calculate_time_window();
439        let new_rate = rate_limiter.inner.lock().unwrap().cubic_success(1.0);
440        assert_relative_eq!(new_rate, 7.0);
441    }
442
443    #[tokio::test]
444    async fn throttling_is_enabled_once_throttling_error_is_received() {
445        let rate_limiter = ClientRateLimiter::builder()
446            .previous_time_bucket(0.0)
447            .time_of_last_throttle(0.0)
448            .build();
449
450        assert!(
451            !rate_limiter.inner.lock().unwrap().enabled,
452            "rate_limiter should be disabled by default"
453        );
454        rate_limiter.update_rate_limiter(0.0, true);
455        assert!(
456            rate_limiter.inner.lock().unwrap().enabled,
457            "rate_limiter should be enabled after throttling error"
458        );
459    }
460
461    #[tokio::test]
462    async fn test_calculated_rate_with_successes() {
463        let rate_limiter = ClientRateLimiter::builder()
464            .time_of_last_throttle(5.0)
465            .tokens_retrieved_per_second_at_time_of_last_throttle(10.0)
466            .build();
467
468        struct Attempt {
469            seconds_since_unix_epoch: f64,
470            expected_calculated_rate: f64,
471        }
472
473        let attempts = [
474            Attempt {
475                seconds_since_unix_epoch: 5.0,
476                expected_calculated_rate: 7.0,
477            },
478            Attempt {
479                seconds_since_unix_epoch: 6.0,
480                expected_calculated_rate: 9.64893600966,
481            },
482            Attempt {
483                seconds_since_unix_epoch: 7.0,
484                expected_calculated_rate: 10.000030849917364,
485            },
486            Attempt {
487                seconds_since_unix_epoch: 8.0,
488                expected_calculated_rate: 10.453284520772092,
489            },
490            Attempt {
491                seconds_since_unix_epoch: 9.0,
492                expected_calculated_rate: 13.408697022224185,
493            },
494            Attempt {
495                seconds_since_unix_epoch: 10.0,
496                expected_calculated_rate: 21.26626835427364,
497            },
498            Attempt {
499                seconds_since_unix_epoch: 11.0,
500                expected_calculated_rate: 36.425998516920465,
501            },
502        ];
503
504        // Think this test is a little strange? I ported the test from Go v2, and this is how it
505        // was implemented. See for yourself:
506        // https://github.com/aws/aws-sdk-go-v2/blob/844ff45cdc76182229ad098c95bf3f5ab8c20e9f/aws/retry/adaptive_ratelimit_test.go#L97
507        for attempt in attempts {
508            rate_limiter.inner.lock().unwrap().calculate_time_window();
509            let calculated_rate = rate_limiter
510                .inner
511                .lock()
512                .unwrap()
513                .cubic_success(attempt.seconds_since_unix_epoch);
514
515            assert_relative_eq!(attempt.expected_calculated_rate, calculated_rate);
516        }
517    }
518
519    #[tokio::test]
520    async fn test_calculated_rate_with_throttles() {
521        let rate_limiter = ClientRateLimiter::builder()
522            .tokens_retrieved_per_second_at_time_of_last_throttle(10.0)
523            .time_of_last_throttle(5.0)
524            .build();
525
526        struct Attempt {
527            throttled: bool,
528            seconds_since_unix_epoch: f64,
529            expected_calculated_rate: f64,
530        }
531
532        let attempts = [
533            Attempt {
534                throttled: false,
535                seconds_since_unix_epoch: 5.0,
536                expected_calculated_rate: 7.0,
537            },
538            Attempt {
539                throttled: false,
540                seconds_since_unix_epoch: 6.0,
541                expected_calculated_rate: 9.64893600966,
542            },
543            Attempt {
544                throttled: true,
545                seconds_since_unix_epoch: 7.0,
546                expected_calculated_rate: 6.754255206761999,
547            },
548            Attempt {
549                throttled: true,
550                seconds_since_unix_epoch: 8.0,
551                expected_calculated_rate: 4.727978644733399,
552            },
553            Attempt {
554                throttled: false,
555                seconds_since_unix_epoch: 9.0,
556                expected_calculated_rate: 4.670125557970046,
557            },
558            Attempt {
559                throttled: false,
560                seconds_since_unix_epoch: 10.0,
561                expected_calculated_rate: 4.770870456867401,
562            },
563            Attempt {
564                throttled: false,
565                seconds_since_unix_epoch: 11.0,
566                expected_calculated_rate: 6.011819748005445,
567            },
568            Attempt {
569                throttled: false,
570                seconds_since_unix_epoch: 12.0,
571                expected_calculated_rate: 10.792973431384178,
572            },
573        ];
574
575        // Think this test is a little strange? I ported the test from Go v2, and this is how it
576        // was implemented. See for yourself:
577        // https://github.com/aws/aws-sdk-go-v2/blob/844ff45cdc76182229ad098c95bf3f5ab8c20e9f/aws/retry/adaptive_ratelimit_test.go#L97
578        let mut calculated_rate = 0.0;
579        for attempt in attempts {
580            let mut inner = rate_limiter.inner.lock().unwrap();
581            inner.calculate_time_window();
582            if attempt.throttled {
583                calculated_rate = cubic_throttle(calculated_rate);
584                inner.time_of_last_throttle = attempt.seconds_since_unix_epoch;
585                inner.last_max_rate = calculated_rate;
586            } else {
587                calculated_rate = inner.cubic_success(attempt.seconds_since_unix_epoch);
588            };
589
590            assert_relative_eq!(attempt.expected_calculated_rate, calculated_rate);
591        }
592    }
593
594    #[tokio::test]
595    async fn test_client_sending_rates() {
596        let (_, sleep_impl) = instant_time_and_sleep(SystemTime::UNIX_EPOCH);
597        let rate_limiter = ClientRateLimiter::builder().build();
598
599        struct Attempt {
600            throttled: bool,
601            seconds_since_unix_epoch: f64,
602            expected_tokens_retrieved_per_second: f64,
603            expected_token_refill_rate: f64,
604        }
605
606        let attempts = [
607            Attempt {
608                throttled: false,
609                seconds_since_unix_epoch: 0.2,
610                expected_tokens_retrieved_per_second: 0.000000,
611                expected_token_refill_rate: 0.500000,
612            },
613            Attempt {
614                throttled: false,
615                seconds_since_unix_epoch: 0.4,
616                expected_tokens_retrieved_per_second: 0.000000,
617                expected_token_refill_rate: 0.500000,
618            },
619            Attempt {
620                throttled: false,
621                seconds_since_unix_epoch: 0.6,
622                expected_tokens_retrieved_per_second: 4.800000000000001,
623                expected_token_refill_rate: 0.500000,
624            },
625            Attempt {
626                throttled: false,
627                seconds_since_unix_epoch: 0.8,
628                expected_tokens_retrieved_per_second: 4.800000000000001,
629                expected_token_refill_rate: 0.500000,
630            },
631            Attempt {
632                throttled: false,
633                seconds_since_unix_epoch: 1.0,
634                expected_tokens_retrieved_per_second: 4.160000,
635                expected_token_refill_rate: 0.500000,
636            },
637            Attempt {
638                throttled: false,
639                seconds_since_unix_epoch: 1.2,
640                expected_tokens_retrieved_per_second: 4.160000,
641                expected_token_refill_rate: 0.691200,
642            },
643            Attempt {
644                throttled: false,
645                seconds_since_unix_epoch: 1.4,
646                expected_tokens_retrieved_per_second: 4.160000,
647                expected_token_refill_rate: 1.0975999999999997,
648            },
649            Attempt {
650                throttled: false,
651                seconds_since_unix_epoch: 1.6,
652                expected_tokens_retrieved_per_second: 5.632000000000001,
653                expected_token_refill_rate: 1.6384000000000005,
654            },
655            Attempt {
656                throttled: false,
657                seconds_since_unix_epoch: 1.8,
658                expected_tokens_retrieved_per_second: 5.632000000000001,
659                expected_token_refill_rate: 2.332800,
660            },
661            Attempt {
662                throttled: true,
663                seconds_since_unix_epoch: 2.0,
664                expected_tokens_retrieved_per_second: 4.326400,
665                expected_token_refill_rate: 3.0284799999999996,
666            },
667            Attempt {
668                throttled: false,
669                seconds_since_unix_epoch: 2.2,
670                expected_tokens_retrieved_per_second: 4.326400,
671                expected_token_refill_rate: 3.48663917347026,
672            },
673            Attempt {
674                throttled: false,
675                seconds_since_unix_epoch: 2.4,
676                expected_tokens_retrieved_per_second: 4.326400,
677                expected_token_refill_rate: 3.821874416040255,
678            },
679            Attempt {
680                throttled: false,
681                seconds_since_unix_epoch: 2.6,
682                expected_tokens_retrieved_per_second: 5.665280,
683                expected_token_refill_rate: 4.053385727709987,
684            },
685            Attempt {
686                throttled: false,
687                seconds_since_unix_epoch: 2.8,
688                expected_tokens_retrieved_per_second: 5.665280,
689                expected_token_refill_rate: 4.200373108479454,
690            },
691            Attempt {
692                throttled: false,
693                seconds_since_unix_epoch: 3.0,
694                expected_tokens_retrieved_per_second: 4.333056,
695                expected_token_refill_rate: 4.282036558348658,
696            },
697            Attempt {
698                throttled: true,
699                seconds_since_unix_epoch: 3.2,
700                expected_tokens_retrieved_per_second: 4.333056,
701                expected_token_refill_rate: 2.99742559084406,
702            },
703            Attempt {
704                throttled: false,
705                seconds_since_unix_epoch: 3.4,
706                expected_tokens_retrieved_per_second: 4.333056,
707                expected_token_refill_rate: 3.4522263943863463,
708            },
709        ];
710
711        for attempt in attempts {
712            sleep_impl.sleep(TWO_HUNDRED_MILLISECONDS).await;
713            assert_eq!(
714                attempt.seconds_since_unix_epoch,
715                sleep_impl.total_duration().as_secs_f64()
716            );
717
718            rate_limiter.update_rate_limiter(attempt.seconds_since_unix_epoch, attempt.throttled);
719            assert_relative_eq!(
720                attempt.expected_tokens_retrieved_per_second,
721                rate_limiter.inner.lock().unwrap().measured_tx_rate
722            );
723            assert_relative_eq!(
724                attempt.expected_token_refill_rate,
725                rate_limiter.inner.lock().unwrap().fill_rate
726            );
727        }
728    }
729
730    // Regression test for the multi-thread negative capacity bug.
731    // See: https://github.com/smithy-lang/smithy-rs/blob/main/rust-runtime/aws-smithy-runtime/src/client/retries/client_rate_limiter.rs#L135
732    // Failing test showing the bug: https://github.com/smithy-lang/smithy-rs/commit/786b6d07e17d39ae0a6c040a49664169149c2fdf
733    //
734    // Previously, capacity was deducted unconditionally (even on Err),
735    // allowing it to go to -0.95 in this scenario. Now capacity is only
736    // deducted when a token is actually granted (Ok), so it stays at 0.05.
737    #[tokio::test]
738    async fn test_capacity_never_goes_negative() {
739        let rate_limiter = ClientRateLimiter::builder()
740            .time_of_last_throttle(0.0)
741            .previous_time_bucket(0.0)
742            .build();
743
744        rate_limiter.update_rate_limiter(0.0, true);
745
746        // fill_rate = 0.5, 0.1s elapsed => refill adds 0.05 tokens.
747        // Cost of InitialRequest is 1.0, so capacity (0.05) is insufficient.
748        let result =
749            rate_limiter.acquire_permission_to_send_a_request(0.1, RequestReason::InitialRequest);
750
751        assert!(result.is_err(), "should require waiting for capacity");
752
753        let inner = rate_limiter.inner.lock().unwrap();
754        assert_relative_eq!(inner.current_capacity, 0.05, epsilon = 0.01);
755        assert_relative_eq!(result.unwrap_err().as_secs_f64(), 1.9, epsilon = 0.01);
756    }
757
758    #[tokio::test]
759    async fn test_concurrent_acquires_no_cascading_delays() {
760        let rate_limiter = ClientRateLimiter::builder()
761            .time_of_last_throttle(0.0)
762            .previous_time_bucket(0.0)
763            .build();
764
765        rate_limiter.update_rate_limiter(0.0, true);
766
767        let mut delays = Vec::with_capacity(10);
768        for _ in 0..10 {
769            let result = rate_limiter
770                .acquire_permission_to_send_a_request(0.1, RequestReason::InitialRequest);
771            assert!(result.is_err());
772            delays.push(result.unwrap_err());
773        }
774
775        // All delays must be identical — no cascading
776        let first = delays[0];
777        for delay in &delays {
778            assert_eq!(*delay, first, "all tasks should get the same delay");
779        }
780
781        let inner = rate_limiter.inner.lock().unwrap();
782        assert!(
783            inner.current_capacity >= 0.0,
784            "capacity must never be negative"
785        );
786    }
787
788    #[tokio::test]
789    async fn test_acquire_succeeds_after_sufficient_refill() {
790        let rate_limiter = ClientRateLimiter::builder()
791            .time_of_last_throttle(0.0)
792            .previous_time_bucket(0.0)
793            .build();
794
795        rate_limiter.update_rate_limiter(0.0, true);
796
797        let result =
798            rate_limiter.acquire_permission_to_send_a_request(0.1, RequestReason::InitialRequest);
799        assert!(result.is_err());
800
801        // At 2.1s: refill adds 1.0 token, capped at max_capacity (1.0).
802        let result =
803            rate_limiter.acquire_permission_to_send_a_request(2.1, RequestReason::InitialRequest);
804        assert!(result.is_ok(), "should succeed after sufficient refill");
805
806        let inner = rate_limiter.inner.lock().unwrap();
807        assert_relative_eq!(inner.current_capacity, 0.0, epsilon = 0.01);
808    }
809
810    // This test is only testing that we don't fail basic math and panic. It does include an
811    // element of randomness, but no duration between >= 0.0s and <= 1.0s will ever cause a panic.
812    //
813    // Because the cost of sending an individual request is 1.0, and because the minimum capacity is
814    // also 1.0, we will never encounter a situation where we run out of tokens.
815    #[tokio::test]
816    async fn test_when_throttling_is_enabled_requests_can_still_be_sent() {
817        let (time_source, sleep_impl) = instant_time_and_sleep(SystemTime::UNIX_EPOCH);
818        let crl = ClientRateLimiter::builder()
819            .time_of_last_throttle(0.0)
820            .previous_time_bucket(0.0)
821            .build();
822
823        // Start by recording a throttling error
824        crl.update_rate_limiter(0.0, true);
825
826        for _i in 0..100 {
827            // advance time by a random amount (up to 1s) each iteration
828            let duration = Duration::from_secs_f64(fastrand::f64());
829            sleep_impl.sleep(duration).await;
830            if let Err(delay) = crl.acquire_permission_to_send_a_request(
831                time_source.seconds_since_unix_epoch(),
832                RequestReason::InitialRequest,
833            ) {
834                sleep_impl.sleep(delay).await;
835            }
836
837            // Assume all further requests succeed on the first try
838            crl.update_rate_limiter(time_source.seconds_since_unix_epoch(), false);
839        }
840
841        let inner = crl.inner.lock().unwrap();
842        assert!(inner.enabled, "the rate limiter should still be enabled");
843        // Assert that the rate limiter respects the passage of time.
844        assert_relative_eq!(
845            inner.last_timestamp.unwrap(),
846            sleep_impl.total_duration().as_secs_f64(),
847            max_relative = 0.0001
848        );
849    }
850
851    #[tokio::test]
852    async fn test_multi_task_recovery_after_throttle_blip() {
853        // Simulates the transient throttle blip scenario: 50 tasks share a
854        // rate limiter, all get throttled, then throttle lifts. Verifies
855        // that tasks recover and can acquire tokens again.
856        let crl = ClientRateLimiter::builder()
857            .time_of_last_throttle(0.0)
858            .previous_time_bucket(0.0)
859            .build();
860
861        let num_tasks = 50;
862        let mut time = 0.0;
863
864        // All tasks get throttled
865        for _ in 0..num_tasks {
866            time += 0.001;
867            crl.update_rate_limiter(time, true);
868        }
869
870        assert_relative_eq!(crl.inner.lock().unwrap().fill_rate, 0.5, epsilon = 0.01);
871
872        // Simulate recovery over 20 seconds (200 rounds * 100ms).
873        // Tasks that acquire successfully get a success response,
874        // which increases the fill rate.
875        let mut total_acquired = 0;
876        let rounds = 200;
877        for _ in 0..rounds {
878            time += 0.1;
879            let mut acquired_this_round = 0;
880
881            for task in 0..num_tasks {
882                let task_time = time + (task as f64) * 0.0001;
883                if crl
884                    .acquire_permission_to_send_a_request(task_time, RequestReason::InitialRequest)
885                    .is_ok()
886                {
887                    acquired_this_round += 1;
888                }
889            }
890
891            for _ in 0..acquired_this_round {
892                time += 0.001;
893                crl.update_rate_limiter(time, false);
894            }
895
896            total_acquired += acquired_this_round;
897        }
898
899        assert!(
900            total_acquired > 100,
901            "expected recovery after throttle blip, but only acquired {total_acquired} tokens in {rounds} rounds"
902        );
903
904        let inner = crl.inner.lock().unwrap();
905        assert!(
906            inner.current_capacity >= 0.0,
907            "capacity must never be negative, got: {}",
908            inner.current_capacity
909        );
910    }
911}