aws_smithy_runtime/client/http/body/minimum_throughput/
throughput.rs1use std::fmt;
7use std::time::{Duration, SystemTime};
8
9#[derive(Debug, Clone, Copy)]
11#[cfg_attr(test, derive(Eq))]
12pub struct Throughput {
13 pub(super) bytes_read: u64,
14 pub(super) per_time_elapsed: Duration,
15}
16
17impl Throughput {
18 pub fn new(bytes_read: u64, per_time_elapsed: Duration) -> Self {
20 debug_assert!(
21 !per_time_elapsed.is_zero(),
22 "cannot create a throughput if per_time_elapsed == 0"
23 );
24
25 Self {
26 bytes_read,
27 per_time_elapsed,
28 }
29 }
30
31 pub const fn new_bytes_per_second(bytes: u64) -> Self {
33 Self {
34 bytes_read: bytes,
35 per_time_elapsed: Duration::from_secs(1),
36 }
37 }
38
39 pub const fn new_kilobytes_per_second(kilobytes: u64) -> Self {
41 Self {
42 bytes_read: kilobytes * 1000,
43 per_time_elapsed: Duration::from_secs(1),
44 }
45 }
46
47 pub const fn new_megabytes_per_second(megabytes: u64) -> Self {
49 Self {
50 bytes_read: megabytes * 1000 * 1000,
51 per_time_elapsed: Duration::from_secs(1),
52 }
53 }
54
55 pub(super) fn bytes_per_second(&self) -> f64 {
56 let per_time_elapsed_secs = self.per_time_elapsed.as_secs_f64();
57 if per_time_elapsed_secs == 0.0 {
58 return 0.0; };
60
61 self.bytes_read as f64 / per_time_elapsed_secs
62 }
63}
64
65impl PartialEq for Throughput {
66 fn eq(&self, other: &Self) -> bool {
67 self.bytes_per_second() == other.bytes_per_second()
68 }
69}
70
71impl PartialOrd for Throughput {
72 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
73 self.bytes_per_second()
74 .partial_cmp(&other.bytes_per_second())
75 }
76}
77
78impl fmt::Display for Throughput {
79 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
80 let pretty_bytes_per_second = (self.bytes_per_second() * 1000.0).round() / 1000.0;
86
87 write!(f, "{pretty_bytes_per_second} B/s")
88 }
89}
90
91impl From<(u64, Duration)> for Throughput {
92 fn from(value: (u64, Duration)) -> Self {
93 Self {
94 bytes_read: value.0,
95 per_time_elapsed: value.1,
96 }
97 }
98}
99
100#[derive(Copy, Clone, Debug, Ord, PartialOrd, Eq, PartialEq)]
102enum BinLabel {
103 Empty,
108
109 NoPolling,
111
112 Pending,
114
115 TransferredBytes,
117}
118
119#[derive(Copy, Clone, Debug)]
121struct Bin {
122 label: BinLabel,
123 bytes: u64,
124}
125
126impl Bin {
127 const fn new(label: BinLabel, bytes: u64) -> Self {
128 Self { label, bytes }
129 }
130 const fn empty() -> Self {
131 Self::new(BinLabel::Empty, 0)
132 }
133
134 fn is_empty(&self) -> bool {
135 matches!(self.label, BinLabel::Empty)
136 }
137
138 fn merge(&mut self, other: Bin) -> &mut Self {
139 self.label = if other.label > self.label {
145 other.label
146 } else {
147 self.label
148 };
149 self.bytes += other.bytes;
150 self
151 }
152
153 fn bytes(&self) -> u64 {
155 self.bytes
156 }
157}
158
159#[derive(Copy, Clone, Debug, Default)]
160struct BinCounts {
161 empty: usize,
163 no_polling: usize,
165 transferred: usize,
167 pending: usize,
169}
170
171#[derive(Copy, Clone, Debug)]
174struct LogBuffer<const N: usize> {
175 entries: [Bin; N],
176 length: usize,
181}
182
183impl<const N: usize> LogBuffer<N> {
184 fn new() -> Self {
185 Self {
186 entries: [Bin::empty(); N],
187 length: 0,
188 }
189 }
190
191 fn tail_mut(&mut self) -> &mut Bin {
197 debug_assert!(self.length > 0);
198 &mut self.entries[self.length - 1]
199 }
200
201 fn push(&mut self, bin: Bin) {
204 if self.filled() {
205 self.entries.rotate_left(1);
206 self.entries[N - 1] = bin;
207 } else {
208 self.entries[self.length] = bin;
209 self.length += 1;
210 }
211 }
212
213 fn bytes_transferred(&self) -> u64 {
215 self.entries.iter().take(self.length).map(Bin::bytes).sum()
216 }
217
218 #[inline]
219 fn filled(&self) -> bool {
220 self.length == N
221 }
222
223 fn fill_gaps(&mut self) {
229 for entry in self.entries.iter_mut().take(self.length) {
230 if entry.is_empty() {
231 *entry = Bin::new(BinLabel::NoPolling, 0);
232 }
233 }
234 }
235
236 fn counts(&self) -> BinCounts {
238 let mut counts = BinCounts::default();
239 for entry in &self.entries {
240 match entry.label {
241 BinLabel::Empty => counts.empty += 1,
242 BinLabel::NoPolling => counts.no_polling += 1,
243 BinLabel::TransferredBytes => counts.transferred += 1,
244 BinLabel::Pending => counts.pending += 1,
245 }
246 }
247 counts
248 }
249
250 fn is_empty(&self) -> bool {
252 self.length == 0
253 }
254}
255
256#[cfg_attr(test, derive(Debug, Eq, PartialEq))]
258pub(crate) enum ThroughputReport {
259 Incomplete,
261 NoPolling,
263 Pending,
265 Transferred(Throughput),
267 Complete,
269}
270
271const BIN_COUNT: usize = 10;
272
273#[derive(Clone, Debug)]
290pub(super) struct ThroughputLogs {
291 resolution: Duration,
292 current_tail: SystemTime,
293 buffer: LogBuffer<BIN_COUNT>,
294 stream_complete: bool,
295}
296
297impl ThroughputLogs {
298 pub(super) fn new(time_window: Duration, now: SystemTime) -> Self {
306 assert!(!time_window.is_zero());
307 let resolution = time_window.div_f64(BIN_COUNT as f64);
308 Self {
309 resolution,
310 current_tail: now,
311 buffer: LogBuffer::new(),
312 stream_complete: false,
313 }
314 }
315
316 pub(super) fn resolution(&self) -> Duration {
320 self.resolution
321 }
322
323 pub(super) fn push_pending(&mut self, time: SystemTime) {
329 self.push(time, Bin::new(BinLabel::Pending, 0));
330 }
331
332 pub(super) fn push_bytes_transferred(&mut self, time: SystemTime, bytes: u64) {
336 self.push(time, Bin::new(BinLabel::TransferredBytes, bytes));
337 }
338
339 fn push(&mut self, now: SystemTime, value: Bin) {
340 self.catch_up(now);
341 if self.buffer.is_empty() {
342 self.buffer.push(value)
343 } else {
344 self.buffer.tail_mut().merge(value);
345 }
346 self.buffer.fill_gaps();
347 }
348
349 fn catch_up(&mut self, now: SystemTime) {
351 while now >= self.current_tail {
352 self.current_tail += self.resolution;
353 self.buffer.push(Bin::empty());
354 }
355 assert!(self.current_tail >= now);
356 }
357
358 pub(super) fn mark_complete(&mut self) -> bool {
365 let prev = self.stream_complete;
366 self.stream_complete = true;
367 !prev
368 }
369
370 pub(super) fn report(&mut self, now: SystemTime) -> ThroughputReport {
372 if self.stream_complete {
373 return ThroughputReport::Complete;
374 }
375
376 self.catch_up(now);
377 self.buffer.fill_gaps();
378
379 let BinCounts {
380 empty,
381 no_polling,
382 transferred,
383 pending,
384 } = self.buffer.counts();
385
386 if empty > 0 {
389 return ThroughputReport::Incomplete;
390 }
391
392 let bytes = self.buffer.bytes_transferred();
393 let time = self.resolution * (BIN_COUNT - empty) as u32;
394 let throughput = Throughput::new(bytes, time);
395
396 let half = BIN_COUNT / 2;
397 match (transferred > 0, no_polling >= half, pending >= half) {
398 (true, _, _) => ThroughputReport::Transferred(throughput),
399 (_, true, _) => ThroughputReport::NoPolling,
400 (_, _, true) => ThroughputReport::Pending,
401 _ => ThroughputReport::Incomplete,
402 }
403 }
404}
405
406#[cfg(test)]
407mod test {
408 use super::*;
409 use std::time::Duration;
410
411 #[test]
412 fn test_log_buffer_bin_label_priority() {
413 use BinLabel::*;
414 assert!(Empty < NoPolling);
415 assert!(NoPolling < Pending);
416 assert!(Pending < TransferredBytes);
417 }
418
419 #[test]
420 fn test_throughput_eq() {
421 let t1 = Throughput::new(1, Duration::from_secs(1));
422 let t2 = Throughput::new(25, Duration::from_secs(25));
423 let t3 = Throughput::new(100, Duration::from_secs(100));
424
425 assert_eq!(t1, t2);
426 assert_eq!(t2, t3);
427 }
428
429 #[test]
430 fn incomplete_no_entries() {
431 let start = SystemTime::UNIX_EPOCH;
432 let mut logs = ThroughputLogs::new(Duration::from_secs(1), start);
433 let report = logs.report(start);
434 assert_eq!(ThroughputReport::Incomplete, report);
435 }
436
437 #[test]
438 fn incomplete_with_entries() {
439 let start = SystemTime::UNIX_EPOCH;
440 let mut logs = ThroughputLogs::new(Duration::from_secs(1), start);
441 logs.push_pending(start);
442
443 let report = logs.report(start + Duration::from_millis(300));
444 assert_eq!(ThroughputReport::Incomplete, report);
445 }
446
447 #[test]
448 fn incomplete_with_transferred() {
449 let start = SystemTime::UNIX_EPOCH;
450 let mut logs = ThroughputLogs::new(Duration::from_secs(1), start);
451 logs.push_pending(start);
452 logs.push_bytes_transferred(start + Duration::from_millis(100), 10);
453
454 let report = logs.report(start + Duration::from_millis(300));
455 assert_eq!(ThroughputReport::Incomplete, report);
456 }
457
458 #[test]
459 fn push_pending_at_the_beginning_of_each_tick() {
460 let start = SystemTime::UNIX_EPOCH;
461 let mut logs = ThroughputLogs::new(Duration::from_secs(1), start);
462
463 let mut now = start;
464 for i in 1..=BIN_COUNT {
465 logs.push_pending(now);
466 now += logs.resolution();
467
468 assert_eq!(i, logs.buffer.counts().pending);
469 }
470
471 let report = dbg!(&mut logs).report(now);
472 assert_eq!(ThroughputReport::Pending, report);
473 }
474
475 #[test]
476 fn push_pending_at_the_end_of_each_tick() {
477 let start = SystemTime::UNIX_EPOCH;
478 let mut logs = ThroughputLogs::new(Duration::from_secs(1), start);
479
480 let mut now = start;
481 for i in 1..BIN_COUNT {
482 now += logs.resolution();
483 logs.push_pending(now);
484
485 assert_eq!(i, dbg!(&logs).buffer.counts().pending);
486 assert_eq!(0, logs.buffer.counts().transferred);
487 assert_eq!(1, logs.buffer.counts().no_polling);
488 }
489 now += logs.resolution();
491 logs.push_pending(now);
492 assert_eq!(0, logs.buffer.counts().no_polling);
493
494 let report = dbg!(&mut logs).report(now);
495 assert_eq!(ThroughputReport::Pending, report);
496 }
497
498 #[test]
499 fn push_transferred_at_the_beginning_of_each_tick() {
500 let start = SystemTime::UNIX_EPOCH;
501 let mut logs = ThroughputLogs::new(Duration::from_secs(1), start);
502
503 let mut now = start;
504 for i in 1..=BIN_COUNT {
505 logs.push_bytes_transferred(now, 10);
506 if i != BIN_COUNT {
507 now += logs.resolution();
508 }
509
510 assert_eq!(i, logs.buffer.counts().transferred);
511 assert_eq!(0, logs.buffer.counts().pending);
512 assert_eq!(0, logs.buffer.counts().no_polling);
513 }
514
515 let report = dbg!(&mut logs).report(now);
516 assert_eq!(
517 ThroughputReport::Transferred(Throughput::new(100, Duration::from_secs(1))),
518 report
519 );
520 }
521
522 #[test]
523 fn no_polling() {
524 let start = SystemTime::UNIX_EPOCH;
525 let mut logs = ThroughputLogs::new(Duration::from_secs(1), start);
526 let report = logs.report(start + Duration::from_secs(2));
527 assert_eq!(ThroughputReport::NoPolling, report);
528 }
529
530 #[test]
532 fn mixed_bag_mostly_pending() {
533 let start = SystemTime::UNIX_EPOCH;
534 let mut logs = ThroughputLogs::new(Duration::from_secs(1), start);
535
536 logs.push_bytes_transferred(start + Duration::from_millis(50), 10);
537 logs.push_pending(start + Duration::from_millis(150));
538 logs.push_pending(start + Duration::from_millis(250));
539 logs.push_bytes_transferred(start + Duration::from_millis(350), 10);
540 logs.push_pending(start + Duration::from_millis(450));
541 logs.push_pending(start + Duration::from_millis(650));
543 logs.push_pending(start + Duration::from_millis(750));
544 logs.push_pending(start + Duration::from_millis(850));
545
546 let report = logs.report(start + Duration::from_millis(999));
547 assert_eq!(
548 ThroughputReport::Transferred(Throughput::new_bytes_per_second(20)),
549 report
550 );
551 }
552
553 #[test]
554 fn mixed_bag_mostly_pending_no_transferred() {
555 let start = SystemTime::UNIX_EPOCH;
556 let mut logs = ThroughputLogs::new(Duration::from_secs(1), start);
557
558 logs.push_pending(start + Duration::from_millis(50));
559 logs.push_pending(start + Duration::from_millis(150));
560 logs.push_pending(start + Duration::from_millis(250));
561 logs.push_pending(start + Duration::from_millis(450));
563 logs.push_pending(start + Duration::from_millis(650));
565 logs.push_pending(start + Duration::from_millis(750));
566 logs.push_pending(start + Duration::from_millis(850));
567
568 let report = logs.report(start + Duration::from_millis(999));
569 assert_eq!(ThroughputReport::Pending, report);
570 }
571
572 #[test]
573 fn test_first_push_succeeds_although_time_window_has_not_elapsed() {
574 let t0 = SystemTime::UNIX_EPOCH;
575 let t1 = t0 + Duration::from_secs(1);
576 let mut tl = ThroughputLogs::new(Duration::from_secs(1), t1);
577
578 tl.push_pending(t0);
579 }
580
581 #[test]
582 fn test_label_transferred_bytes_should_not_be_overwritten_by_pending() {
583 let start = SystemTime::UNIX_EPOCH;
584 let mut logs = ThroughputLogs::new(Duration::from_secs(1), start);
586
587 logs.push_bytes_transferred(start + Duration::from_millis(10), 10);
589 logs.push_pending(start + Duration::from_millis(20));
590
591 let BinCounts {
592 empty,
593 no_polling,
594 transferred,
595 pending,
596 } = logs.buffer.counts();
597
598 assert_eq!(9, empty);
599 assert_eq!(0, no_polling);
600 assert_eq!(1, transferred); assert_eq!(0, pending); }
603}