aws_smithy_runtime/client/retries/
client_rate_limiter.rs

1/*
2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6//! A rate limiter for controlling the rate at which AWS requests are made. The rate changes based
7//! on the number of throttling errors encountered.
8
9#![allow(dead_code)]
10
11use crate::client::retries::RetryPartition;
12use std::sync::{Arc, Mutex};
13use std::time::Duration;
14use tracing::debug;
15
16/// Represents a partition for the rate limiter, e.g. an endpoint, a region
17#[non_exhaustive]
18#[derive(Clone, Debug, Hash, PartialEq, Eq)]
19pub struct ClientRateLimiterPartition {
20    retry_partition: RetryPartition,
21}
22
23impl ClientRateLimiterPartition {
24    /// Creates a `ClientRateLimiterPartition` from the given [`RetryPartition`]
25    pub fn new(retry_partition: RetryPartition) -> Self {
26        Self { retry_partition }
27    }
28}
29
30const RETRY_COST: f64 = 5.0;
31const RETRY_TIMEOUT_COST: f64 = RETRY_COST * 2.0;
32const INITIAL_REQUEST_COST: f64 = 1.0;
33
34const MIN_FILL_RATE: f64 = 0.5;
35const MIN_CAPACITY: f64 = 1.0;
36const SMOOTH: f64 = 0.8;
37/// How much to scale back after receiving a throttling response
38const BETA: f64 = 0.7;
39/// Controls how aggressively we scale up after being throttled
40const SCALE_CONSTANT: f64 = 0.4;
41
42/// Rate limiter for adaptive retry.
43#[derive(Clone, Debug)]
44pub struct ClientRateLimiter {
45    pub(crate) inner: Arc<Mutex<Inner>>,
46}
47
48#[derive(Debug)]
49pub(crate) struct Inner {
50    /// The rate at which token are replenished.
51    fill_rate: f64,
52    /// The maximum capacity allowed in the token bucket.
53    max_capacity: f64,
54    /// The current capacity of the token bucket. The minimum this can be is 1.0
55    current_capacity: f64,
56    /// The last time the token bucket was refilled.
57    last_timestamp: Option<f64>,
58    /// Boolean indicating if the token bucket is enabled.
59    /// The token bucket is initially disabled.
60    /// When a throttling error is encountered it is enabled.
61    enabled: bool,
62    /// The smoothed rate which tokens are being retrieved.
63    measured_tx_rate: f64,
64    /// The last half second time bucket used.
65    last_tx_rate_bucket: f64,
66    /// The number of requests seen within the current time bucket.
67    request_count: u64,
68    /// The maximum rate when the client was last throttled.
69    last_max_rate: f64,
70    /// The last time when the client was throttled.
71    time_of_last_throttle: f64,
72}
73
74pub(crate) enum RequestReason {
75    Retry,
76    RetryTimeout,
77    InitialRequest,
78}
79
80impl Default for ClientRateLimiter {
81    fn default() -> Self {
82        Self::builder().build()
83    }
84}
85
86impl ClientRateLimiter {
87    /// Creates a new `ClientRateLimiter`
88    pub fn new(seconds_since_unix_epoch: f64) -> Self {
89        Self::builder()
90            .tokens_retrieved_per_second(MIN_FILL_RATE)
91            .time_of_last_throttle(seconds_since_unix_epoch)
92            .previous_time_bucket(seconds_since_unix_epoch.floor())
93            .build()
94    }
95
96    /// Creates a new `ClientRateLimiterBuilder`
97    pub fn builder() -> ClientRateLimiterBuilder {
98        ClientRateLimiterBuilder::new()
99    }
100
101    pub(crate) fn acquire_permission_to_send_a_request(
102        &self,
103        seconds_since_unix_epoch: f64,
104        kind: RequestReason,
105    ) -> Result<(), Duration> {
106        let mut it = self.inner.lock().unwrap();
107
108        if !it.enabled {
109            // return early if we haven't encountered a throttling error yet
110            return Ok(());
111        }
112        let amount = match kind {
113            RequestReason::Retry => RETRY_COST,
114            RequestReason::RetryTimeout => RETRY_TIMEOUT_COST,
115            RequestReason::InitialRequest => INITIAL_REQUEST_COST,
116        };
117
118        it.refill(seconds_since_unix_epoch);
119
120        let res = if amount > it.current_capacity {
121            let sleep_time = (amount - it.current_capacity) / it.fill_rate;
122            debug!(
123                amount,
124                it.current_capacity,
125                it.fill_rate,
126                sleep_time,
127                "client rate limiter delayed a request"
128            );
129
130            Err(Duration::from_secs_f64(sleep_time))
131        } else {
132            Ok(())
133        };
134
135        it.current_capacity -= amount;
136        res
137    }
138
139    pub(crate) fn update_rate_limiter(
140        &self,
141        seconds_since_unix_epoch: f64,
142        is_throttling_error: bool,
143    ) {
144        let mut it = self.inner.lock().unwrap();
145        it.update_tokens_retrieved_per_second(seconds_since_unix_epoch);
146
147        let calculated_rate;
148        if is_throttling_error {
149            let rate_to_use = if it.enabled {
150                f64::min(it.measured_tx_rate, it.fill_rate)
151            } else {
152                it.measured_tx_rate
153            };
154
155            // The fill_rate is from the token bucket
156            it.last_max_rate = rate_to_use;
157            it.calculate_time_window();
158            it.time_of_last_throttle = seconds_since_unix_epoch;
159            calculated_rate = cubic_throttle(rate_to_use);
160            it.enable_token_bucket();
161        } else {
162            it.calculate_time_window();
163            calculated_rate = it.cubic_success(seconds_since_unix_epoch);
164        }
165
166        let new_rate = f64::min(calculated_rate, 2.0 * it.measured_tx_rate);
167        it.update_bucket_refill_rate(seconds_since_unix_epoch, new_rate);
168    }
169}
170
171impl Inner {
172    fn refill(&mut self, seconds_since_unix_epoch: f64) {
173        if let Some(last_timestamp) = self.last_timestamp {
174            let fill_amount = (seconds_since_unix_epoch - last_timestamp) * self.fill_rate;
175            self.current_capacity =
176                f64::min(self.max_capacity, self.current_capacity + fill_amount);
177            debug!(
178                fill_amount,
179                self.current_capacity, self.max_capacity, "refilling client rate limiter tokens"
180            );
181        }
182        self.last_timestamp = Some(seconds_since_unix_epoch);
183    }
184
185    fn update_bucket_refill_rate(&mut self, seconds_since_unix_epoch: f64, new_fill_rate: f64) {
186        // Refill based on our current rate before we update to the new fill rate.
187        self.refill(seconds_since_unix_epoch);
188
189        self.fill_rate = f64::max(new_fill_rate, MIN_FILL_RATE);
190        self.max_capacity = f64::max(new_fill_rate, MIN_CAPACITY);
191
192        debug!(
193            fill_rate = self.fill_rate,
194            max_capacity = self.max_capacity,
195            current_capacity = self.current_capacity,
196            measured_tx_rate = self.measured_tx_rate,
197            "client rate limiter state has been updated"
198        );
199
200        // When we scale down we can't have a current capacity that exceeds our max_capacity.
201        self.current_capacity = f64::min(self.current_capacity, self.max_capacity);
202    }
203
204    fn enable_token_bucket(&mut self) {
205        // If throttling wasn't already enabled, note that we're now enabling it.
206        if !self.enabled {
207            debug!("client rate limiting has been enabled");
208        }
209        self.enabled = true;
210    }
211
212    fn update_tokens_retrieved_per_second(&mut self, seconds_since_unix_epoch: f64) {
213        let next_time_bucket = (seconds_since_unix_epoch * 2.0).floor() / 2.0;
214        self.request_count += 1;
215
216        if next_time_bucket > self.last_tx_rate_bucket {
217            let current_rate =
218                self.request_count as f64 / (next_time_bucket - self.last_tx_rate_bucket);
219            self.measured_tx_rate = current_rate * SMOOTH + self.measured_tx_rate * (1.0 - SMOOTH);
220            self.request_count = 0;
221            self.last_tx_rate_bucket = next_time_bucket;
222        }
223    }
224
225    fn calculate_time_window(&self) -> f64 {
226        let base = (self.last_max_rate * (1.0 - BETA)) / SCALE_CONSTANT;
227        base.powf(1.0 / 3.0)
228    }
229
230    fn cubic_success(&self, seconds_since_unix_epoch: f64) -> f64 {
231        let dt =
232            seconds_since_unix_epoch - self.time_of_last_throttle - self.calculate_time_window();
233        (SCALE_CONSTANT * dt.powi(3)) + self.last_max_rate
234    }
235}
236
237fn cubic_throttle(rate_to_use: f64) -> f64 {
238    rate_to_use * BETA
239}
240
241/// Builder for `ClientRateLimiter`.
242#[derive(Clone, Debug, Default)]
243pub struct ClientRateLimiterBuilder {
244    ///The rate at which token are replenished.
245    token_refill_rate: Option<f64>,
246    ///The maximum capacity allowed in the token bucket.
247    maximum_bucket_capacity: Option<f64>,
248    ///The current capacity of the token bucket.
249    current_bucket_capacity: Option<f64>,
250    ///The last time the token bucket was refilled.
251    time_of_last_refill: Option<f64>,
252    ///The smoothed rate which tokens are being retrieved.
253    tokens_retrieved_per_second: Option<f64>,
254    ///The last half second time bucket used.
255    previous_time_bucket: Option<f64>,
256    ///The number of requests seen within the current time bucket.
257    request_count: Option<u64>,
258    ///Boolean indicating if the token bucket is enabled. The token bucket is initially disabled. When a throttling error is encountered it is enabled.
259    enable_throttling: Option<bool>,
260    ///The maximum rate when the client was last throttled.
261    tokens_retrieved_per_second_at_time_of_last_throttle: Option<f64>,
262    ///The last time when the client was throttled.
263    time_of_last_throttle: Option<f64>,
264}
265
266impl ClientRateLimiterBuilder {
267    /// Create a new `ClientRateLimiterBuilder`.
268    pub fn new() -> Self {
269        ClientRateLimiterBuilder::default()
270    }
271    /// The rate at which token are replenished.
272    pub fn token_refill_rate(mut self, token_refill_rate: f64) -> Self {
273        self.set_token_refill_rate(Some(token_refill_rate));
274        self
275    }
276    /// The rate at which token are replenished.
277    pub fn set_token_refill_rate(&mut self, token_refill_rate: Option<f64>) -> &mut Self {
278        self.token_refill_rate = token_refill_rate;
279        self
280    }
281    /// The maximum capacity allowed in the token bucket
282    ///
283    /// The implementation of [`ClientRateLimiter`] guarantees that `current_capacity` never exceeds this value.
284    pub fn maximum_bucket_capacity(mut self, maximum_bucket_capacity: f64) -> Self {
285        self.set_maximum_bucket_capacity(Some(maximum_bucket_capacity));
286        self
287    }
288    /// The maximum capacity allowed in the token bucket
289    ///
290    /// The implementation of [`ClientRateLimiter`] guarantees that `current_capacity` never exceeds this value.
291    pub fn set_maximum_bucket_capacity(
292        &mut self,
293        maximum_bucket_capacity: Option<f64>,
294    ) -> &mut Self {
295        self.maximum_bucket_capacity = maximum_bucket_capacity;
296        self
297    }
298    /// The current capacity of the token bucket
299    ///
300    /// The implementation of [`ClientRateLimiter`] guarantees that this value is always at least `1.0` when it's enabled.
301    pub fn current_bucket_capacity(mut self, current_bucket_capacity: f64) -> Self {
302        self.set_current_bucket_capacity(Some(current_bucket_capacity));
303        self
304    }
305    /// The current capacity of the token bucket
306    ///
307    /// The implementation of [`ClientRateLimiter`] guarantees that this value is always at least `1.0` when it's enabled.
308    pub fn set_current_bucket_capacity(
309        &mut self,
310        current_bucket_capacity: Option<f64>,
311    ) -> &mut Self {
312        self.current_bucket_capacity = current_bucket_capacity;
313        self
314    }
315    // The last time the token bucket was refilled.
316    fn time_of_last_refill(mut self, time_of_last_refill: f64) -> Self {
317        self.set_time_of_last_refill(Some(time_of_last_refill));
318        self
319    }
320    // The last time the token bucket was refilled.
321    fn set_time_of_last_refill(&mut self, time_of_last_refill: Option<f64>) -> &mut Self {
322        self.time_of_last_refill = time_of_last_refill;
323        self
324    }
325    /// The smoothed rate which tokens are being retrieved.
326    pub fn tokens_retrieved_per_second(mut self, tokens_retrieved_per_second: f64) -> Self {
327        self.set_tokens_retrieved_per_second(Some(tokens_retrieved_per_second));
328        self
329    }
330    /// The smoothed rate which tokens are being retrieved.
331    pub fn set_tokens_retrieved_per_second(
332        &mut self,
333        tokens_retrieved_per_second: Option<f64>,
334    ) -> &mut Self {
335        self.tokens_retrieved_per_second = tokens_retrieved_per_second;
336        self
337    }
338    // The last half second time bucket used.
339    fn previous_time_bucket(mut self, previous_time_bucket: f64) -> Self {
340        self.set_previous_time_bucket(Some(previous_time_bucket));
341        self
342    }
343    // The last half second time bucket used.
344    fn set_previous_time_bucket(&mut self, previous_time_bucket: Option<f64>) -> &mut Self {
345        self.previous_time_bucket = previous_time_bucket;
346        self
347    }
348    // The number of requests seen within the current time bucket.
349    fn request_count(mut self, request_count: u64) -> Self {
350        self.set_request_count(Some(request_count));
351        self
352    }
353    // The number of requests seen within the current time bucket.
354    fn set_request_count(&mut self, request_count: Option<u64>) -> &mut Self {
355        self.request_count = request_count;
356        self
357    }
358    // Boolean indicating if the token bucket is enabled. The token bucket is initially disabled. When a throttling error is encountered it is enabled.
359    fn enable_throttling(mut self, enable_throttling: bool) -> Self {
360        self.set_enable_throttling(Some(enable_throttling));
361        self
362    }
363    // Boolean indicating if the token bucket is enabled. The token bucket is initially disabled. When a throttling error is encountered it is enabled.
364    fn set_enable_throttling(&mut self, enable_throttling: Option<bool>) -> &mut Self {
365        self.enable_throttling = enable_throttling;
366        self
367    }
368    // The maximum rate when the client was last throttled.
369    fn tokens_retrieved_per_second_at_time_of_last_throttle(
370        mut self,
371        tokens_retrieved_per_second_at_time_of_last_throttle: f64,
372    ) -> Self {
373        self.set_tokens_retrieved_per_second_at_time_of_last_throttle(Some(
374            tokens_retrieved_per_second_at_time_of_last_throttle,
375        ));
376        self
377    }
378    // The maximum rate when the client was last throttled.
379    fn set_tokens_retrieved_per_second_at_time_of_last_throttle(
380        &mut self,
381        tokens_retrieved_per_second_at_time_of_last_throttle: Option<f64>,
382    ) -> &mut Self {
383        self.tokens_retrieved_per_second_at_time_of_last_throttle =
384            tokens_retrieved_per_second_at_time_of_last_throttle;
385        self
386    }
387    // The last time when the client was throttled.
388    fn time_of_last_throttle(mut self, time_of_last_throttle: f64) -> Self {
389        self.set_time_of_last_throttle(Some(time_of_last_throttle));
390        self
391    }
392    // The last time when the client was throttled.
393    fn set_time_of_last_throttle(&mut self, time_of_last_throttle: Option<f64>) -> &mut Self {
394        self.time_of_last_throttle = time_of_last_throttle;
395        self
396    }
397    /// Build the ClientRateLimiter.
398    pub fn build(self) -> ClientRateLimiter {
399        ClientRateLimiter {
400            inner: Arc::new(Mutex::new(Inner {
401                fill_rate: self.token_refill_rate.unwrap_or_default(),
402                max_capacity: self.maximum_bucket_capacity.unwrap_or(f64::MAX),
403                current_capacity: self.current_bucket_capacity.unwrap_or_default(),
404                last_timestamp: self.time_of_last_refill,
405                enabled: self.enable_throttling.unwrap_or_default(),
406                measured_tx_rate: self.tokens_retrieved_per_second.unwrap_or_default(),
407                last_tx_rate_bucket: self.previous_time_bucket.unwrap_or_default(),
408                request_count: self.request_count.unwrap_or_default(),
409                last_max_rate: self
410                    .tokens_retrieved_per_second_at_time_of_last_throttle
411                    .unwrap_or_default(),
412                time_of_last_throttle: self.time_of_last_throttle.unwrap_or_default(),
413            })),
414        }
415    }
416}
417
418#[cfg(test)]
419mod tests {
420    use super::{cubic_throttle, ClientRateLimiter};
421    use crate::client::retries::client_rate_limiter::RequestReason;
422    use approx::assert_relative_eq;
423    use aws_smithy_async::rt::sleep::AsyncSleep;
424    use aws_smithy_async::test_util::instant_time_and_sleep;
425    use std::time::{Duration, SystemTime};
426
427    const ONE_SECOND: Duration = Duration::from_secs(1);
428    const TWO_HUNDRED_MILLISECONDS: Duration = Duration::from_millis(200);
429
430    #[test]
431    fn should_match_beta_decrease() {
432        let new_rate = cubic_throttle(10.0);
433        assert_relative_eq!(new_rate, 7.0);
434
435        let rate_limiter = ClientRateLimiter::builder()
436            .tokens_retrieved_per_second_at_time_of_last_throttle(10.0)
437            .time_of_last_throttle(1.0)
438            .build();
439
440        rate_limiter.inner.lock().unwrap().calculate_time_window();
441        let new_rate = rate_limiter.inner.lock().unwrap().cubic_success(1.0);
442        assert_relative_eq!(new_rate, 7.0);
443    }
444
445    #[tokio::test]
446    async fn throttling_is_enabled_once_throttling_error_is_received() {
447        let rate_limiter = ClientRateLimiter::builder()
448            .previous_time_bucket(0.0)
449            .time_of_last_throttle(0.0)
450            .build();
451
452        assert!(
453            !rate_limiter.inner.lock().unwrap().enabled,
454            "rate_limiter should be disabled by default"
455        );
456        rate_limiter.update_rate_limiter(0.0, true);
457        assert!(
458            rate_limiter.inner.lock().unwrap().enabled,
459            "rate_limiter should be enabled after throttling error"
460        );
461    }
462
463    #[tokio::test]
464    async fn test_calculated_rate_with_successes() {
465        let rate_limiter = ClientRateLimiter::builder()
466            .time_of_last_throttle(5.0)
467            .tokens_retrieved_per_second_at_time_of_last_throttle(10.0)
468            .build();
469
470        struct Attempt {
471            seconds_since_unix_epoch: f64,
472            expected_calculated_rate: f64,
473        }
474
475        let attempts = [
476            Attempt {
477                seconds_since_unix_epoch: 5.0,
478                expected_calculated_rate: 7.0,
479            },
480            Attempt {
481                seconds_since_unix_epoch: 6.0,
482                expected_calculated_rate: 9.64893600966,
483            },
484            Attempt {
485                seconds_since_unix_epoch: 7.0,
486                expected_calculated_rate: 10.000030849917364,
487            },
488            Attempt {
489                seconds_since_unix_epoch: 8.0,
490                expected_calculated_rate: 10.453284520772092,
491            },
492            Attempt {
493                seconds_since_unix_epoch: 9.0,
494                expected_calculated_rate: 13.408697022224185,
495            },
496            Attempt {
497                seconds_since_unix_epoch: 10.0,
498                expected_calculated_rate: 21.26626835427364,
499            },
500            Attempt {
501                seconds_since_unix_epoch: 11.0,
502                expected_calculated_rate: 36.425998516920465,
503            },
504        ];
505
506        // Think this test is a little strange? I ported the test from Go v2, and this is how it
507        // was implemented. See for yourself:
508        // https://github.com/aws/aws-sdk-go-v2/blob/844ff45cdc76182229ad098c95bf3f5ab8c20e9f/aws/retry/adaptive_ratelimit_test.go#L97
509        for attempt in attempts {
510            rate_limiter.inner.lock().unwrap().calculate_time_window();
511            let calculated_rate = rate_limiter
512                .inner
513                .lock()
514                .unwrap()
515                .cubic_success(attempt.seconds_since_unix_epoch);
516
517            assert_relative_eq!(attempt.expected_calculated_rate, calculated_rate);
518        }
519    }
520
521    #[tokio::test]
522    async fn test_calculated_rate_with_throttles() {
523        let rate_limiter = ClientRateLimiter::builder()
524            .tokens_retrieved_per_second_at_time_of_last_throttle(10.0)
525            .time_of_last_throttle(5.0)
526            .build();
527
528        struct Attempt {
529            throttled: bool,
530            seconds_since_unix_epoch: f64,
531            expected_calculated_rate: f64,
532        }
533
534        let attempts = [
535            Attempt {
536                throttled: false,
537                seconds_since_unix_epoch: 5.0,
538                expected_calculated_rate: 7.0,
539            },
540            Attempt {
541                throttled: false,
542                seconds_since_unix_epoch: 6.0,
543                expected_calculated_rate: 9.64893600966,
544            },
545            Attempt {
546                throttled: true,
547                seconds_since_unix_epoch: 7.0,
548                expected_calculated_rate: 6.754255206761999,
549            },
550            Attempt {
551                throttled: true,
552                seconds_since_unix_epoch: 8.0,
553                expected_calculated_rate: 4.727978644733399,
554            },
555            Attempt {
556                throttled: false,
557                seconds_since_unix_epoch: 9.0,
558                expected_calculated_rate: 4.670125557970046,
559            },
560            Attempt {
561                throttled: false,
562                seconds_since_unix_epoch: 10.0,
563                expected_calculated_rate: 4.770870456867401,
564            },
565            Attempt {
566                throttled: false,
567                seconds_since_unix_epoch: 11.0,
568                expected_calculated_rate: 6.011819748005445,
569            },
570            Attempt {
571                throttled: false,
572                seconds_since_unix_epoch: 12.0,
573                expected_calculated_rate: 10.792973431384178,
574            },
575        ];
576
577        // Think this test is a little strange? I ported the test from Go v2, and this is how it
578        // was implemented. See for yourself:
579        // https://github.com/aws/aws-sdk-go-v2/blob/844ff45cdc76182229ad098c95bf3f5ab8c20e9f/aws/retry/adaptive_ratelimit_test.go#L97
580        let mut calculated_rate = 0.0;
581        for attempt in attempts {
582            let mut inner = rate_limiter.inner.lock().unwrap();
583            inner.calculate_time_window();
584            if attempt.throttled {
585                calculated_rate = cubic_throttle(calculated_rate);
586                inner.time_of_last_throttle = attempt.seconds_since_unix_epoch;
587                inner.last_max_rate = calculated_rate;
588            } else {
589                calculated_rate = inner.cubic_success(attempt.seconds_since_unix_epoch);
590            };
591
592            assert_relative_eq!(attempt.expected_calculated_rate, calculated_rate);
593        }
594    }
595
596    #[tokio::test]
597    async fn test_client_sending_rates() {
598        let (_, sleep_impl) = instant_time_and_sleep(SystemTime::UNIX_EPOCH);
599        let rate_limiter = ClientRateLimiter::builder().build();
600
601        struct Attempt {
602            throttled: bool,
603            seconds_since_unix_epoch: f64,
604            expected_tokens_retrieved_per_second: f64,
605            expected_token_refill_rate: f64,
606        }
607
608        let attempts = [
609            Attempt {
610                throttled: false,
611                seconds_since_unix_epoch: 0.2,
612                expected_tokens_retrieved_per_second: 0.000000,
613                expected_token_refill_rate: 0.500000,
614            },
615            Attempt {
616                throttled: false,
617                seconds_since_unix_epoch: 0.4,
618                expected_tokens_retrieved_per_second: 0.000000,
619                expected_token_refill_rate: 0.500000,
620            },
621            Attempt {
622                throttled: false,
623                seconds_since_unix_epoch: 0.6,
624                expected_tokens_retrieved_per_second: 4.800000000000001,
625                expected_token_refill_rate: 0.500000,
626            },
627            Attempt {
628                throttled: false,
629                seconds_since_unix_epoch: 0.8,
630                expected_tokens_retrieved_per_second: 4.800000000000001,
631                expected_token_refill_rate: 0.500000,
632            },
633            Attempt {
634                throttled: false,
635                seconds_since_unix_epoch: 1.0,
636                expected_tokens_retrieved_per_second: 4.160000,
637                expected_token_refill_rate: 0.500000,
638            },
639            Attempt {
640                throttled: false,
641                seconds_since_unix_epoch: 1.2,
642                expected_tokens_retrieved_per_second: 4.160000,
643                expected_token_refill_rate: 0.691200,
644            },
645            Attempt {
646                throttled: false,
647                seconds_since_unix_epoch: 1.4,
648                expected_tokens_retrieved_per_second: 4.160000,
649                expected_token_refill_rate: 1.0975999999999997,
650            },
651            Attempt {
652                throttled: false,
653                seconds_since_unix_epoch: 1.6,
654                expected_tokens_retrieved_per_second: 5.632000000000001,
655                expected_token_refill_rate: 1.6384000000000005,
656            },
657            Attempt {
658                throttled: false,
659                seconds_since_unix_epoch: 1.8,
660                expected_tokens_retrieved_per_second: 5.632000000000001,
661                expected_token_refill_rate: 2.332800,
662            },
663            Attempt {
664                throttled: true,
665                seconds_since_unix_epoch: 2.0,
666                expected_tokens_retrieved_per_second: 4.326400,
667                expected_token_refill_rate: 3.0284799999999996,
668            },
669            Attempt {
670                throttled: false,
671                seconds_since_unix_epoch: 2.2,
672                expected_tokens_retrieved_per_second: 4.326400,
673                expected_token_refill_rate: 3.48663917347026,
674            },
675            Attempt {
676                throttled: false,
677                seconds_since_unix_epoch: 2.4,
678                expected_tokens_retrieved_per_second: 4.326400,
679                expected_token_refill_rate: 3.821874416040255,
680            },
681            Attempt {
682                throttled: false,
683                seconds_since_unix_epoch: 2.6,
684                expected_tokens_retrieved_per_second: 5.665280,
685                expected_token_refill_rate: 4.053385727709987,
686            },
687            Attempt {
688                throttled: false,
689                seconds_since_unix_epoch: 2.8,
690                expected_tokens_retrieved_per_second: 5.665280,
691                expected_token_refill_rate: 4.200373108479454,
692            },
693            Attempt {
694                throttled: false,
695                seconds_since_unix_epoch: 3.0,
696                expected_tokens_retrieved_per_second: 4.333056,
697                expected_token_refill_rate: 4.282036558348658,
698            },
699            Attempt {
700                throttled: true,
701                seconds_since_unix_epoch: 3.2,
702                expected_tokens_retrieved_per_second: 4.333056,
703                expected_token_refill_rate: 2.99742559084406,
704            },
705            Attempt {
706                throttled: false,
707                seconds_since_unix_epoch: 3.4,
708                expected_tokens_retrieved_per_second: 4.333056,
709                expected_token_refill_rate: 3.4522263943863463,
710            },
711        ];
712
713        for attempt in attempts {
714            sleep_impl.sleep(TWO_HUNDRED_MILLISECONDS).await;
715            assert_eq!(
716                attempt.seconds_since_unix_epoch,
717                sleep_impl.total_duration().as_secs_f64()
718            );
719
720            rate_limiter.update_rate_limiter(attempt.seconds_since_unix_epoch, attempt.throttled);
721            assert_relative_eq!(
722                attempt.expected_tokens_retrieved_per_second,
723                rate_limiter.inner.lock().unwrap().measured_tx_rate
724            );
725            assert_relative_eq!(
726                attempt.expected_token_refill_rate,
727                rate_limiter.inner.lock().unwrap().fill_rate
728            );
729        }
730    }
731
732    // This test is only testing that we don't fail basic math and panic. It does include an
733    // element of randomness, but no duration between >= 0.0s and <= 1.0s will ever cause a panic.
734    //
735    // Because the cost of sending an individual request is 1.0, and because the minimum capacity is
736    // also 1.0, we will never encounter a situation where we run out of tokens.
737    #[tokio::test]
738    async fn test_when_throttling_is_enabled_requests_can_still_be_sent() {
739        let (time_source, sleep_impl) = instant_time_and_sleep(SystemTime::UNIX_EPOCH);
740        let crl = ClientRateLimiter::builder()
741            .time_of_last_throttle(0.0)
742            .previous_time_bucket(0.0)
743            .build();
744
745        // Start by recording a throttling error
746        crl.update_rate_limiter(0.0, true);
747
748        for _i in 0..100 {
749            // advance time by a random amount (up to 1s) each iteration
750            let duration = Duration::from_secs_f64(fastrand::f64());
751            sleep_impl.sleep(duration).await;
752            if let Err(delay) = crl.acquire_permission_to_send_a_request(
753                time_source.seconds_since_unix_epoch(),
754                RequestReason::InitialRequest,
755            ) {
756                sleep_impl.sleep(delay).await;
757            }
758
759            // Assume all further requests succeed on the first try
760            crl.update_rate_limiter(time_source.seconds_since_unix_epoch(), false);
761        }
762
763        let inner = crl.inner.lock().unwrap();
764        assert!(inner.enabled, "the rate limiter should still be enabled");
765        // Assert that the rate limiter respects the passage of time.
766        assert_relative_eq!(
767            inner.last_timestamp.unwrap(),
768            sleep_impl.total_duration().as_secs_f64(),
769            max_relative = 0.0001
770        );
771    }
772}