1#![allow(dead_code)]
10
11use crate::client::retries::RetryPartition;
12use std::sync::{Arc, Mutex};
13use std::time::Duration;
14use tracing::debug;
15
16#[non_exhaustive]
18#[derive(Clone, Debug, Hash, PartialEq, Eq)]
19pub struct ClientRateLimiterPartition {
20 retry_partition: RetryPartition,
21}
22
23impl ClientRateLimiterPartition {
24 pub fn new(retry_partition: RetryPartition) -> Self {
26 Self { retry_partition }
27 }
28}
29
30const RETRY_COST: f64 = 5.0;
31const RETRY_TIMEOUT_COST: f64 = RETRY_COST * 2.0;
32const INITIAL_REQUEST_COST: f64 = 1.0;
33
34const MIN_FILL_RATE: f64 = 0.5;
35const MIN_CAPACITY: f64 = 1.0;
36const SMOOTH: f64 = 0.8;
37const BETA: f64 = 0.7;
39const SCALE_CONSTANT: f64 = 0.4;
41
42#[derive(Clone, Debug)]
44pub struct ClientRateLimiter {
45 pub(crate) inner: Arc<Mutex<Inner>>,
46}
47
48#[derive(Debug)]
49pub(crate) struct Inner {
50 fill_rate: f64,
52 max_capacity: f64,
54 current_capacity: f64,
56 last_timestamp: Option<f64>,
58 enabled: bool,
62 measured_tx_rate: f64,
64 last_tx_rate_bucket: f64,
66 request_count: u64,
68 last_max_rate: f64,
70 time_of_last_throttle: f64,
72}
73
74pub(crate) enum RequestReason {
75 Retry,
76 RetryTimeout,
77 InitialRequest,
78}
79
80impl Default for ClientRateLimiter {
81 fn default() -> Self {
82 Self::builder().build()
83 }
84}
85
86impl ClientRateLimiter {
87 pub fn new(seconds_since_unix_epoch: f64) -> Self {
89 Self::builder()
90 .tokens_retrieved_per_second(MIN_FILL_RATE)
91 .time_of_last_throttle(seconds_since_unix_epoch)
92 .previous_time_bucket(seconds_since_unix_epoch.floor())
93 .build()
94 }
95
96 pub fn builder() -> ClientRateLimiterBuilder {
98 ClientRateLimiterBuilder::new()
99 }
100
101 pub(crate) fn acquire_permission_to_send_a_request(
102 &self,
103 seconds_since_unix_epoch: f64,
104 kind: RequestReason,
105 ) -> Result<(), Duration> {
106 let mut it = self.inner.lock().unwrap();
107
108 if !it.enabled {
109 return Ok(());
111 }
112 let amount = match kind {
113 RequestReason::Retry => RETRY_COST,
114 RequestReason::RetryTimeout => RETRY_TIMEOUT_COST,
115 RequestReason::InitialRequest => INITIAL_REQUEST_COST,
116 };
117
118 it.refill(seconds_since_unix_epoch);
119
120 if amount > it.current_capacity {
121 let sleep_time = (amount - it.current_capacity) / it.fill_rate;
122 debug!(
123 amount,
124 it.current_capacity,
125 it.fill_rate,
126 sleep_time,
127 "client rate limiter delayed a request"
128 );
129 Err(Duration::from_secs_f64(sleep_time))
131 } else {
132 it.current_capacity -= amount;
133 Ok(())
134 }
135 }
136
137 pub(crate) fn update_rate_limiter(
138 &self,
139 seconds_since_unix_epoch: f64,
140 is_throttling_error: bool,
141 ) {
142 let mut it = self.inner.lock().unwrap();
143 it.update_tokens_retrieved_per_second(seconds_since_unix_epoch);
144
145 let calculated_rate;
146 if is_throttling_error {
147 let rate_to_use = if it.enabled {
148 f64::min(it.measured_tx_rate, it.fill_rate)
149 } else {
150 it.measured_tx_rate
151 };
152
153 it.last_max_rate = rate_to_use;
155 it.calculate_time_window();
156 it.time_of_last_throttle = seconds_since_unix_epoch;
157 calculated_rate = cubic_throttle(rate_to_use);
158 it.enable_token_bucket();
159 } else {
160 it.calculate_time_window();
161 calculated_rate = it.cubic_success(seconds_since_unix_epoch);
162 }
163
164 let new_rate = f64::min(calculated_rate, 2.0 * it.measured_tx_rate);
165 it.update_bucket_refill_rate(seconds_since_unix_epoch, new_rate);
166 }
167}
168
169impl Inner {
170 fn refill(&mut self, seconds_since_unix_epoch: f64) {
171 if let Some(last_timestamp) = self.last_timestamp {
172 let fill_amount = (seconds_since_unix_epoch - last_timestamp) * self.fill_rate;
173 self.current_capacity =
174 f64::min(self.max_capacity, self.current_capacity + fill_amount);
175 debug!(
176 fill_amount,
177 self.current_capacity, self.max_capacity, "refilling client rate limiter tokens"
178 );
179 }
180 self.last_timestamp = Some(seconds_since_unix_epoch);
181 }
182
183 fn update_bucket_refill_rate(&mut self, seconds_since_unix_epoch: f64, new_fill_rate: f64) {
184 self.refill(seconds_since_unix_epoch);
186
187 self.fill_rate = f64::max(new_fill_rate, MIN_FILL_RATE);
188 self.max_capacity = f64::max(new_fill_rate, MIN_CAPACITY);
189
190 debug!(
191 fill_rate = self.fill_rate,
192 max_capacity = self.max_capacity,
193 current_capacity = self.current_capacity,
194 measured_tx_rate = self.measured_tx_rate,
195 "client rate limiter state has been updated"
196 );
197
198 self.current_capacity = f64::min(self.current_capacity, self.max_capacity);
200 }
201
202 fn enable_token_bucket(&mut self) {
203 if !self.enabled {
205 debug!("client rate limiting has been enabled");
206 }
207 self.enabled = true;
208 }
209
210 fn update_tokens_retrieved_per_second(&mut self, seconds_since_unix_epoch: f64) {
211 let next_time_bucket = (seconds_since_unix_epoch * 2.0).floor() / 2.0;
212 self.request_count += 1;
213
214 if next_time_bucket > self.last_tx_rate_bucket {
215 let current_rate =
216 self.request_count as f64 / (next_time_bucket - self.last_tx_rate_bucket);
217 self.measured_tx_rate = current_rate * SMOOTH + self.measured_tx_rate * (1.0 - SMOOTH);
218 self.request_count = 0;
219 self.last_tx_rate_bucket = next_time_bucket;
220 }
221 }
222
223 fn calculate_time_window(&self) -> f64 {
224 let base = (self.last_max_rate * (1.0 - BETA)) / SCALE_CONSTANT;
225 base.powf(1.0 / 3.0)
226 }
227
228 fn cubic_success(&self, seconds_since_unix_epoch: f64) -> f64 {
229 let dt =
230 seconds_since_unix_epoch - self.time_of_last_throttle - self.calculate_time_window();
231 (SCALE_CONSTANT * dt.powi(3)) + self.last_max_rate
232 }
233}
234
235fn cubic_throttle(rate_to_use: f64) -> f64 {
236 rate_to_use * BETA
237}
238
239#[derive(Clone, Debug, Default)]
241pub struct ClientRateLimiterBuilder {
242 token_refill_rate: Option<f64>,
244 maximum_bucket_capacity: Option<f64>,
246 current_bucket_capacity: Option<f64>,
248 time_of_last_refill: Option<f64>,
250 tokens_retrieved_per_second: Option<f64>,
252 previous_time_bucket: Option<f64>,
254 request_count: Option<u64>,
256 enable_throttling: Option<bool>,
258 tokens_retrieved_per_second_at_time_of_last_throttle: Option<f64>,
260 time_of_last_throttle: Option<f64>,
262}
263
264impl ClientRateLimiterBuilder {
265 pub fn new() -> Self {
267 ClientRateLimiterBuilder::default()
268 }
269 pub fn token_refill_rate(mut self, token_refill_rate: f64) -> Self {
271 self.set_token_refill_rate(Some(token_refill_rate));
272 self
273 }
274 pub fn set_token_refill_rate(&mut self, token_refill_rate: Option<f64>) -> &mut Self {
276 self.token_refill_rate = token_refill_rate;
277 self
278 }
279 pub fn maximum_bucket_capacity(mut self, maximum_bucket_capacity: f64) -> Self {
283 self.set_maximum_bucket_capacity(Some(maximum_bucket_capacity));
284 self
285 }
286 pub fn set_maximum_bucket_capacity(
290 &mut self,
291 maximum_bucket_capacity: Option<f64>,
292 ) -> &mut Self {
293 self.maximum_bucket_capacity = maximum_bucket_capacity;
294 self
295 }
296 pub fn current_bucket_capacity(mut self, current_bucket_capacity: f64) -> Self {
300 self.set_current_bucket_capacity(Some(current_bucket_capacity));
301 self
302 }
303 pub fn set_current_bucket_capacity(
307 &mut self,
308 current_bucket_capacity: Option<f64>,
309 ) -> &mut Self {
310 self.current_bucket_capacity = current_bucket_capacity;
311 self
312 }
313 fn time_of_last_refill(mut self, time_of_last_refill: f64) -> Self {
315 self.set_time_of_last_refill(Some(time_of_last_refill));
316 self
317 }
318 fn set_time_of_last_refill(&mut self, time_of_last_refill: Option<f64>) -> &mut Self {
320 self.time_of_last_refill = time_of_last_refill;
321 self
322 }
323 pub fn tokens_retrieved_per_second(mut self, tokens_retrieved_per_second: f64) -> Self {
325 self.set_tokens_retrieved_per_second(Some(tokens_retrieved_per_second));
326 self
327 }
328 pub fn set_tokens_retrieved_per_second(
330 &mut self,
331 tokens_retrieved_per_second: Option<f64>,
332 ) -> &mut Self {
333 self.tokens_retrieved_per_second = tokens_retrieved_per_second;
334 self
335 }
336 fn previous_time_bucket(mut self, previous_time_bucket: f64) -> Self {
338 self.set_previous_time_bucket(Some(previous_time_bucket));
339 self
340 }
341 fn set_previous_time_bucket(&mut self, previous_time_bucket: Option<f64>) -> &mut Self {
343 self.previous_time_bucket = previous_time_bucket;
344 self
345 }
346 fn request_count(mut self, request_count: u64) -> Self {
348 self.set_request_count(Some(request_count));
349 self
350 }
351 fn set_request_count(&mut self, request_count: Option<u64>) -> &mut Self {
353 self.request_count = request_count;
354 self
355 }
356 fn enable_throttling(mut self, enable_throttling: bool) -> Self {
358 self.set_enable_throttling(Some(enable_throttling));
359 self
360 }
361 fn set_enable_throttling(&mut self, enable_throttling: Option<bool>) -> &mut Self {
363 self.enable_throttling = enable_throttling;
364 self
365 }
366 fn tokens_retrieved_per_second_at_time_of_last_throttle(
368 mut self,
369 tokens_retrieved_per_second_at_time_of_last_throttle: f64,
370 ) -> Self {
371 self.set_tokens_retrieved_per_second_at_time_of_last_throttle(Some(
372 tokens_retrieved_per_second_at_time_of_last_throttle,
373 ));
374 self
375 }
376 fn set_tokens_retrieved_per_second_at_time_of_last_throttle(
378 &mut self,
379 tokens_retrieved_per_second_at_time_of_last_throttle: Option<f64>,
380 ) -> &mut Self {
381 self.tokens_retrieved_per_second_at_time_of_last_throttle =
382 tokens_retrieved_per_second_at_time_of_last_throttle;
383 self
384 }
385 fn time_of_last_throttle(mut self, time_of_last_throttle: f64) -> Self {
387 self.set_time_of_last_throttle(Some(time_of_last_throttle));
388 self
389 }
390 fn set_time_of_last_throttle(&mut self, time_of_last_throttle: Option<f64>) -> &mut Self {
392 self.time_of_last_throttle = time_of_last_throttle;
393 self
394 }
395 pub fn build(self) -> ClientRateLimiter {
397 ClientRateLimiter {
398 inner: Arc::new(Mutex::new(Inner {
399 fill_rate: self.token_refill_rate.unwrap_or_default(),
400 max_capacity: self.maximum_bucket_capacity.unwrap_or(f64::MAX),
401 current_capacity: self.current_bucket_capacity.unwrap_or_default(),
402 last_timestamp: self.time_of_last_refill,
403 enabled: self.enable_throttling.unwrap_or_default(),
404 measured_tx_rate: self.tokens_retrieved_per_second.unwrap_or_default(),
405 last_tx_rate_bucket: self.previous_time_bucket.unwrap_or_default(),
406 request_count: self.request_count.unwrap_or_default(),
407 last_max_rate: self
408 .tokens_retrieved_per_second_at_time_of_last_throttle
409 .unwrap_or_default(),
410 time_of_last_throttle: self.time_of_last_throttle.unwrap_or_default(),
411 })),
412 }
413 }
414}
415
416#[cfg(test)]
417mod tests {
418 use super::{cubic_throttle, ClientRateLimiter};
419 use crate::client::retries::client_rate_limiter::RequestReason;
420 use approx::assert_relative_eq;
421 use aws_smithy_async::rt::sleep::AsyncSleep;
422 use aws_smithy_async::test_util::instant_time_and_sleep;
423 use std::time::{Duration, SystemTime};
424
425 const ONE_SECOND: Duration = Duration::from_secs(1);
426 const TWO_HUNDRED_MILLISECONDS: Duration = Duration::from_millis(200);
427
428 #[test]
429 fn should_match_beta_decrease() {
430 let new_rate = cubic_throttle(10.0);
431 assert_relative_eq!(new_rate, 7.0);
432
433 let rate_limiter = ClientRateLimiter::builder()
434 .tokens_retrieved_per_second_at_time_of_last_throttle(10.0)
435 .time_of_last_throttle(1.0)
436 .build();
437
438 rate_limiter.inner.lock().unwrap().calculate_time_window();
439 let new_rate = rate_limiter.inner.lock().unwrap().cubic_success(1.0);
440 assert_relative_eq!(new_rate, 7.0);
441 }
442
443 #[tokio::test]
444 async fn throttling_is_enabled_once_throttling_error_is_received() {
445 let rate_limiter = ClientRateLimiter::builder()
446 .previous_time_bucket(0.0)
447 .time_of_last_throttle(0.0)
448 .build();
449
450 assert!(
451 !rate_limiter.inner.lock().unwrap().enabled,
452 "rate_limiter should be disabled by default"
453 );
454 rate_limiter.update_rate_limiter(0.0, true);
455 assert!(
456 rate_limiter.inner.lock().unwrap().enabled,
457 "rate_limiter should be enabled after throttling error"
458 );
459 }
460
461 #[tokio::test]
462 async fn test_calculated_rate_with_successes() {
463 let rate_limiter = ClientRateLimiter::builder()
464 .time_of_last_throttle(5.0)
465 .tokens_retrieved_per_second_at_time_of_last_throttle(10.0)
466 .build();
467
468 struct Attempt {
469 seconds_since_unix_epoch: f64,
470 expected_calculated_rate: f64,
471 }
472
473 let attempts = [
474 Attempt {
475 seconds_since_unix_epoch: 5.0,
476 expected_calculated_rate: 7.0,
477 },
478 Attempt {
479 seconds_since_unix_epoch: 6.0,
480 expected_calculated_rate: 9.64893600966,
481 },
482 Attempt {
483 seconds_since_unix_epoch: 7.0,
484 expected_calculated_rate: 10.000030849917364,
485 },
486 Attempt {
487 seconds_since_unix_epoch: 8.0,
488 expected_calculated_rate: 10.453284520772092,
489 },
490 Attempt {
491 seconds_since_unix_epoch: 9.0,
492 expected_calculated_rate: 13.408697022224185,
493 },
494 Attempt {
495 seconds_since_unix_epoch: 10.0,
496 expected_calculated_rate: 21.26626835427364,
497 },
498 Attempt {
499 seconds_since_unix_epoch: 11.0,
500 expected_calculated_rate: 36.425998516920465,
501 },
502 ];
503
504 for attempt in attempts {
508 rate_limiter.inner.lock().unwrap().calculate_time_window();
509 let calculated_rate = rate_limiter
510 .inner
511 .lock()
512 .unwrap()
513 .cubic_success(attempt.seconds_since_unix_epoch);
514
515 assert_relative_eq!(attempt.expected_calculated_rate, calculated_rate);
516 }
517 }
518
519 #[tokio::test]
520 async fn test_calculated_rate_with_throttles() {
521 let rate_limiter = ClientRateLimiter::builder()
522 .tokens_retrieved_per_second_at_time_of_last_throttle(10.0)
523 .time_of_last_throttle(5.0)
524 .build();
525
526 struct Attempt {
527 throttled: bool,
528 seconds_since_unix_epoch: f64,
529 expected_calculated_rate: f64,
530 }
531
532 let attempts = [
533 Attempt {
534 throttled: false,
535 seconds_since_unix_epoch: 5.0,
536 expected_calculated_rate: 7.0,
537 },
538 Attempt {
539 throttled: false,
540 seconds_since_unix_epoch: 6.0,
541 expected_calculated_rate: 9.64893600966,
542 },
543 Attempt {
544 throttled: true,
545 seconds_since_unix_epoch: 7.0,
546 expected_calculated_rate: 6.754255206761999,
547 },
548 Attempt {
549 throttled: true,
550 seconds_since_unix_epoch: 8.0,
551 expected_calculated_rate: 4.727978644733399,
552 },
553 Attempt {
554 throttled: false,
555 seconds_since_unix_epoch: 9.0,
556 expected_calculated_rate: 4.670125557970046,
557 },
558 Attempt {
559 throttled: false,
560 seconds_since_unix_epoch: 10.0,
561 expected_calculated_rate: 4.770870456867401,
562 },
563 Attempt {
564 throttled: false,
565 seconds_since_unix_epoch: 11.0,
566 expected_calculated_rate: 6.011819748005445,
567 },
568 Attempt {
569 throttled: false,
570 seconds_since_unix_epoch: 12.0,
571 expected_calculated_rate: 10.792973431384178,
572 },
573 ];
574
575 let mut calculated_rate = 0.0;
579 for attempt in attempts {
580 let mut inner = rate_limiter.inner.lock().unwrap();
581 inner.calculate_time_window();
582 if attempt.throttled {
583 calculated_rate = cubic_throttle(calculated_rate);
584 inner.time_of_last_throttle = attempt.seconds_since_unix_epoch;
585 inner.last_max_rate = calculated_rate;
586 } else {
587 calculated_rate = inner.cubic_success(attempt.seconds_since_unix_epoch);
588 };
589
590 assert_relative_eq!(attempt.expected_calculated_rate, calculated_rate);
591 }
592 }
593
594 #[tokio::test]
595 async fn test_client_sending_rates() {
596 let (_, sleep_impl) = instant_time_and_sleep(SystemTime::UNIX_EPOCH);
597 let rate_limiter = ClientRateLimiter::builder().build();
598
599 struct Attempt {
600 throttled: bool,
601 seconds_since_unix_epoch: f64,
602 expected_tokens_retrieved_per_second: f64,
603 expected_token_refill_rate: f64,
604 }
605
606 let attempts = [
607 Attempt {
608 throttled: false,
609 seconds_since_unix_epoch: 0.2,
610 expected_tokens_retrieved_per_second: 0.000000,
611 expected_token_refill_rate: 0.500000,
612 },
613 Attempt {
614 throttled: false,
615 seconds_since_unix_epoch: 0.4,
616 expected_tokens_retrieved_per_second: 0.000000,
617 expected_token_refill_rate: 0.500000,
618 },
619 Attempt {
620 throttled: false,
621 seconds_since_unix_epoch: 0.6,
622 expected_tokens_retrieved_per_second: 4.800000000000001,
623 expected_token_refill_rate: 0.500000,
624 },
625 Attempt {
626 throttled: false,
627 seconds_since_unix_epoch: 0.8,
628 expected_tokens_retrieved_per_second: 4.800000000000001,
629 expected_token_refill_rate: 0.500000,
630 },
631 Attempt {
632 throttled: false,
633 seconds_since_unix_epoch: 1.0,
634 expected_tokens_retrieved_per_second: 4.160000,
635 expected_token_refill_rate: 0.500000,
636 },
637 Attempt {
638 throttled: false,
639 seconds_since_unix_epoch: 1.2,
640 expected_tokens_retrieved_per_second: 4.160000,
641 expected_token_refill_rate: 0.691200,
642 },
643 Attempt {
644 throttled: false,
645 seconds_since_unix_epoch: 1.4,
646 expected_tokens_retrieved_per_second: 4.160000,
647 expected_token_refill_rate: 1.0975999999999997,
648 },
649 Attempt {
650 throttled: false,
651 seconds_since_unix_epoch: 1.6,
652 expected_tokens_retrieved_per_second: 5.632000000000001,
653 expected_token_refill_rate: 1.6384000000000005,
654 },
655 Attempt {
656 throttled: false,
657 seconds_since_unix_epoch: 1.8,
658 expected_tokens_retrieved_per_second: 5.632000000000001,
659 expected_token_refill_rate: 2.332800,
660 },
661 Attempt {
662 throttled: true,
663 seconds_since_unix_epoch: 2.0,
664 expected_tokens_retrieved_per_second: 4.326400,
665 expected_token_refill_rate: 3.0284799999999996,
666 },
667 Attempt {
668 throttled: false,
669 seconds_since_unix_epoch: 2.2,
670 expected_tokens_retrieved_per_second: 4.326400,
671 expected_token_refill_rate: 3.48663917347026,
672 },
673 Attempt {
674 throttled: false,
675 seconds_since_unix_epoch: 2.4,
676 expected_tokens_retrieved_per_second: 4.326400,
677 expected_token_refill_rate: 3.821874416040255,
678 },
679 Attempt {
680 throttled: false,
681 seconds_since_unix_epoch: 2.6,
682 expected_tokens_retrieved_per_second: 5.665280,
683 expected_token_refill_rate: 4.053385727709987,
684 },
685 Attempt {
686 throttled: false,
687 seconds_since_unix_epoch: 2.8,
688 expected_tokens_retrieved_per_second: 5.665280,
689 expected_token_refill_rate: 4.200373108479454,
690 },
691 Attempt {
692 throttled: false,
693 seconds_since_unix_epoch: 3.0,
694 expected_tokens_retrieved_per_second: 4.333056,
695 expected_token_refill_rate: 4.282036558348658,
696 },
697 Attempt {
698 throttled: true,
699 seconds_since_unix_epoch: 3.2,
700 expected_tokens_retrieved_per_second: 4.333056,
701 expected_token_refill_rate: 2.99742559084406,
702 },
703 Attempt {
704 throttled: false,
705 seconds_since_unix_epoch: 3.4,
706 expected_tokens_retrieved_per_second: 4.333056,
707 expected_token_refill_rate: 3.4522263943863463,
708 },
709 ];
710
711 for attempt in attempts {
712 sleep_impl.sleep(TWO_HUNDRED_MILLISECONDS).await;
713 assert_eq!(
714 attempt.seconds_since_unix_epoch,
715 sleep_impl.total_duration().as_secs_f64()
716 );
717
718 rate_limiter.update_rate_limiter(attempt.seconds_since_unix_epoch, attempt.throttled);
719 assert_relative_eq!(
720 attempt.expected_tokens_retrieved_per_second,
721 rate_limiter.inner.lock().unwrap().measured_tx_rate
722 );
723 assert_relative_eq!(
724 attempt.expected_token_refill_rate,
725 rate_limiter.inner.lock().unwrap().fill_rate
726 );
727 }
728 }
729
730 #[tokio::test]
738 async fn test_capacity_never_goes_negative() {
739 let rate_limiter = ClientRateLimiter::builder()
740 .time_of_last_throttle(0.0)
741 .previous_time_bucket(0.0)
742 .build();
743
744 rate_limiter.update_rate_limiter(0.0, true);
745
746 let result =
749 rate_limiter.acquire_permission_to_send_a_request(0.1, RequestReason::InitialRequest);
750
751 assert!(result.is_err(), "should require waiting for capacity");
752
753 let inner = rate_limiter.inner.lock().unwrap();
754 assert_relative_eq!(inner.current_capacity, 0.05, epsilon = 0.01);
755 assert_relative_eq!(result.unwrap_err().as_secs_f64(), 1.9, epsilon = 0.01);
756 }
757
758 #[tokio::test]
759 async fn test_concurrent_acquires_no_cascading_delays() {
760 let rate_limiter = ClientRateLimiter::builder()
761 .time_of_last_throttle(0.0)
762 .previous_time_bucket(0.0)
763 .build();
764
765 rate_limiter.update_rate_limiter(0.0, true);
766
767 let mut delays = Vec::with_capacity(10);
768 for _ in 0..10 {
769 let result = rate_limiter
770 .acquire_permission_to_send_a_request(0.1, RequestReason::InitialRequest);
771 assert!(result.is_err());
772 delays.push(result.unwrap_err());
773 }
774
775 let first = delays[0];
777 for delay in &delays {
778 assert_eq!(*delay, first, "all tasks should get the same delay");
779 }
780
781 let inner = rate_limiter.inner.lock().unwrap();
782 assert!(
783 inner.current_capacity >= 0.0,
784 "capacity must never be negative"
785 );
786 }
787
788 #[tokio::test]
789 async fn test_acquire_succeeds_after_sufficient_refill() {
790 let rate_limiter = ClientRateLimiter::builder()
791 .time_of_last_throttle(0.0)
792 .previous_time_bucket(0.0)
793 .build();
794
795 rate_limiter.update_rate_limiter(0.0, true);
796
797 let result =
798 rate_limiter.acquire_permission_to_send_a_request(0.1, RequestReason::InitialRequest);
799 assert!(result.is_err());
800
801 let result =
803 rate_limiter.acquire_permission_to_send_a_request(2.1, RequestReason::InitialRequest);
804 assert!(result.is_ok(), "should succeed after sufficient refill");
805
806 let inner = rate_limiter.inner.lock().unwrap();
807 assert_relative_eq!(inner.current_capacity, 0.0, epsilon = 0.01);
808 }
809
810 #[tokio::test]
816 async fn test_when_throttling_is_enabled_requests_can_still_be_sent() {
817 let (time_source, sleep_impl) = instant_time_and_sleep(SystemTime::UNIX_EPOCH);
818 let crl = ClientRateLimiter::builder()
819 .time_of_last_throttle(0.0)
820 .previous_time_bucket(0.0)
821 .build();
822
823 crl.update_rate_limiter(0.0, true);
825
826 for _i in 0..100 {
827 let duration = Duration::from_secs_f64(fastrand::f64());
829 sleep_impl.sleep(duration).await;
830 if let Err(delay) = crl.acquire_permission_to_send_a_request(
831 time_source.seconds_since_unix_epoch(),
832 RequestReason::InitialRequest,
833 ) {
834 sleep_impl.sleep(delay).await;
835 }
836
837 crl.update_rate_limiter(time_source.seconds_since_unix_epoch(), false);
839 }
840
841 let inner = crl.inner.lock().unwrap();
842 assert!(inner.enabled, "the rate limiter should still be enabled");
843 assert_relative_eq!(
845 inner.last_timestamp.unwrap(),
846 sleep_impl.total_duration().as_secs_f64(),
847 max_relative = 0.0001
848 );
849 }
850
851 #[tokio::test]
852 async fn test_multi_task_recovery_after_throttle_blip() {
853 let crl = ClientRateLimiter::builder()
857 .time_of_last_throttle(0.0)
858 .previous_time_bucket(0.0)
859 .build();
860
861 let num_tasks = 50;
862 let mut time = 0.0;
863
864 for _ in 0..num_tasks {
866 time += 0.001;
867 crl.update_rate_limiter(time, true);
868 }
869
870 assert_relative_eq!(crl.inner.lock().unwrap().fill_rate, 0.5, epsilon = 0.01);
871
872 let mut total_acquired = 0;
876 let rounds = 200;
877 for _ in 0..rounds {
878 time += 0.1;
879 let mut acquired_this_round = 0;
880
881 for task in 0..num_tasks {
882 let task_time = time + (task as f64) * 0.0001;
883 if crl
884 .acquire_permission_to_send_a_request(task_time, RequestReason::InitialRequest)
885 .is_ok()
886 {
887 acquired_this_round += 1;
888 }
889 }
890
891 for _ in 0..acquired_this_round {
892 time += 0.001;
893 crl.update_rate_limiter(time, false);
894 }
895
896 total_acquired += acquired_this_round;
897 }
898
899 assert!(
900 total_acquired > 100,
901 "expected recovery after throttle blip, but only acquired {total_acquired} tokens in {rounds} rounds"
902 );
903
904 let inner = crl.inner.lock().unwrap();
905 assert!(
906 inner.current_capacity >= 0.0,
907 "capacity must never be negative, got: {}",
908 inner.current_capacity
909 );
910 }
911}