aws_smithy_runtime/client/http/body/minimum_throughput/
throughput.rs1use std::fmt;
7use std::time::{Duration, SystemTime};
8
9#[derive(Debug, Clone, Copy)]
11#[cfg_attr(test, derive(Eq))]
12pub struct Throughput {
13    pub(super) bytes_read: u64,
14    pub(super) per_time_elapsed: Duration,
15}
16
17impl Throughput {
18    pub fn new(bytes_read: u64, per_time_elapsed: Duration) -> Self {
20        debug_assert!(
21            !per_time_elapsed.is_zero(),
22            "cannot create a throughput if per_time_elapsed == 0"
23        );
24
25        Self {
26            bytes_read,
27            per_time_elapsed,
28        }
29    }
30
31    pub const fn new_bytes_per_second(bytes: u64) -> Self {
33        Self {
34            bytes_read: bytes,
35            per_time_elapsed: Duration::from_secs(1),
36        }
37    }
38
39    pub const fn new_kilobytes_per_second(kilobytes: u64) -> Self {
41        Self {
42            bytes_read: kilobytes * 1000,
43            per_time_elapsed: Duration::from_secs(1),
44        }
45    }
46
47    pub const fn new_megabytes_per_second(megabytes: u64) -> Self {
49        Self {
50            bytes_read: megabytes * 1000 * 1000,
51            per_time_elapsed: Duration::from_secs(1),
52        }
53    }
54
55    pub(super) fn bytes_per_second(&self) -> f64 {
56        let per_time_elapsed_secs = self.per_time_elapsed.as_secs_f64();
57        if per_time_elapsed_secs == 0.0 {
58            return 0.0; };
60
61        self.bytes_read as f64 / per_time_elapsed_secs
62    }
63}
64
65impl PartialEq for Throughput {
66    fn eq(&self, other: &Self) -> bool {
67        self.bytes_per_second() == other.bytes_per_second()
68    }
69}
70
71impl PartialOrd for Throughput {
72    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
73        self.bytes_per_second()
74            .partial_cmp(&other.bytes_per_second())
75    }
76}
77
78impl fmt::Display for Throughput {
79    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
80        let pretty_bytes_per_second = (self.bytes_per_second() * 1000.0).round() / 1000.0;
86
87        write!(f, "{pretty_bytes_per_second} B/s")
88    }
89}
90
91impl From<(u64, Duration)> for Throughput {
92    fn from(value: (u64, Duration)) -> Self {
93        Self {
94            bytes_read: value.0,
95            per_time_elapsed: value.1,
96        }
97    }
98}
99
100#[derive(Copy, Clone, Debug, Ord, PartialOrd, Eq, PartialEq)]
102enum BinLabel {
103    Empty,
108
109    NoPolling,
111
112    Pending,
114
115    TransferredBytes,
117}
118
119#[derive(Copy, Clone, Debug)]
121struct Bin {
122    label: BinLabel,
123    bytes: u64,
124}
125
126impl Bin {
127    const fn new(label: BinLabel, bytes: u64) -> Self {
128        Self { label, bytes }
129    }
130    const fn empty() -> Self {
131        Self::new(BinLabel::Empty, 0)
132    }
133
134    fn is_empty(&self) -> bool {
135        matches!(self.label, BinLabel::Empty)
136    }
137
138    fn merge(&mut self, other: Bin) -> &mut Self {
139        self.label = if other.label > self.label {
145            other.label
146        } else {
147            self.label
148        };
149        self.bytes += other.bytes;
150        self
151    }
152
153    fn bytes(&self) -> u64 {
155        self.bytes
156    }
157}
158
159#[derive(Copy, Clone, Debug, Default)]
160struct BinCounts {
161    empty: usize,
163    no_polling: usize,
165    transferred: usize,
167    pending: usize,
169}
170
171#[derive(Copy, Clone, Debug)]
174struct LogBuffer<const N: usize> {
175    entries: [Bin; N],
176    length: usize,
181}
182
183impl<const N: usize> LogBuffer<N> {
184    fn new() -> Self {
185        Self {
186            entries: [Bin::empty(); N],
187            length: 0,
188        }
189    }
190
191    fn tail_mut(&mut self) -> &mut Bin {
197        debug_assert!(self.length > 0);
198        &mut self.entries[self.length - 1]
199    }
200
201    fn push(&mut self, bin: Bin) {
204        if self.filled() {
205            self.entries.rotate_left(1);
206            self.entries[N - 1] = bin;
207        } else {
208            self.entries[self.length] = bin;
209            self.length += 1;
210        }
211    }
212
213    fn bytes_transferred(&self) -> u64 {
215        self.entries.iter().take(self.length).map(Bin::bytes).sum()
216    }
217
218    #[inline]
219    fn filled(&self) -> bool {
220        self.length == N
221    }
222
223    fn fill_gaps(&mut self) {
229        for entry in self.entries.iter_mut().take(self.length) {
230            if entry.is_empty() {
231                *entry = Bin::new(BinLabel::NoPolling, 0);
232            }
233        }
234    }
235
236    fn counts(&self) -> BinCounts {
238        let mut counts = BinCounts::default();
239        for entry in &self.entries {
240            match entry.label {
241                BinLabel::Empty => counts.empty += 1,
242                BinLabel::NoPolling => counts.no_polling += 1,
243                BinLabel::TransferredBytes => counts.transferred += 1,
244                BinLabel::Pending => counts.pending += 1,
245            }
246        }
247        counts
248    }
249
250    fn is_empty(&self) -> bool {
252        self.length == 0
253    }
254}
255
256#[cfg_attr(test, derive(Debug, Eq, PartialEq))]
258pub(crate) enum ThroughputReport {
259    Incomplete,
261    NoPolling,
263    Pending,
265    Transferred(Throughput),
267    Complete,
269}
270
271const BIN_COUNT: usize = 10;
272
273#[derive(Clone, Debug)]
290pub(super) struct ThroughputLogs {
291    resolution: Duration,
292    current_tail: SystemTime,
293    buffer: LogBuffer<BIN_COUNT>,
294    stream_complete: bool,
295}
296
297impl ThroughputLogs {
298    pub(super) fn new(time_window: Duration, now: SystemTime) -> Self {
306        assert!(!time_window.is_zero());
307        let resolution = time_window.div_f64(BIN_COUNT as f64);
308        Self {
309            resolution,
310            current_tail: now,
311            buffer: LogBuffer::new(),
312            stream_complete: false,
313        }
314    }
315
316    pub(super) fn resolution(&self) -> Duration {
320        self.resolution
321    }
322
323    pub(super) fn push_pending(&mut self, time: SystemTime) {
329        self.push(time, Bin::new(BinLabel::Pending, 0));
330    }
331
332    pub(super) fn push_bytes_transferred(&mut self, time: SystemTime, bytes: u64) {
336        self.push(time, Bin::new(BinLabel::TransferredBytes, bytes));
337    }
338
339    fn push(&mut self, now: SystemTime, value: Bin) {
340        self.catch_up(now);
341        if self.buffer.is_empty() {
342            self.buffer.push(value)
343        } else {
344            self.buffer.tail_mut().merge(value);
345        }
346        self.buffer.fill_gaps();
347    }
348
349    fn catch_up(&mut self, now: SystemTime) {
351        while now >= self.current_tail {
352            self.current_tail += self.resolution;
353            self.buffer.push(Bin::empty());
354        }
355        assert!(self.current_tail >= now);
356    }
357
358    pub(super) fn mark_complete(&mut self) -> bool {
365        let prev = self.stream_complete;
366        self.stream_complete = true;
367        !prev
368    }
369
370    pub(super) fn report(&mut self, now: SystemTime) -> ThroughputReport {
372        if self.stream_complete {
373            return ThroughputReport::Complete;
374        }
375
376        self.catch_up(now);
377        self.buffer.fill_gaps();
378
379        let BinCounts {
380            empty,
381            no_polling,
382            transferred,
383            pending,
384        } = self.buffer.counts();
385
386        if empty > 0 {
389            return ThroughputReport::Incomplete;
390        }
391
392        let bytes = self.buffer.bytes_transferred();
393        let time = self.resolution * (BIN_COUNT - empty) as u32;
394        let throughput = Throughput::new(bytes, time);
395
396        let half = BIN_COUNT / 2;
397        match (transferred > 0, no_polling >= half, pending >= half) {
398            (true, _, _) => ThroughputReport::Transferred(throughput),
399            (_, true, _) => ThroughputReport::NoPolling,
400            (_, _, true) => ThroughputReport::Pending,
401            _ => ThroughputReport::Incomplete,
402        }
403    }
404}
405
406#[cfg(test)]
407mod test {
408    use super::*;
409    use std::time::Duration;
410
411    #[test]
412    fn test_log_buffer_bin_label_priority() {
413        use BinLabel::*;
414        assert!(Empty < NoPolling);
415        assert!(NoPolling < Pending);
416        assert!(Pending < TransferredBytes);
417    }
418
419    #[test]
420    fn test_throughput_eq() {
421        let t1 = Throughput::new(1, Duration::from_secs(1));
422        let t2 = Throughput::new(25, Duration::from_secs(25));
423        let t3 = Throughput::new(100, Duration::from_secs(100));
424
425        assert_eq!(t1, t2);
426        assert_eq!(t2, t3);
427    }
428
429    #[test]
430    fn incomplete_no_entries() {
431        let start = SystemTime::UNIX_EPOCH;
432        let mut logs = ThroughputLogs::new(Duration::from_secs(1), start);
433        let report = logs.report(start);
434        assert_eq!(ThroughputReport::Incomplete, report);
435    }
436
437    #[test]
438    fn incomplete_with_entries() {
439        let start = SystemTime::UNIX_EPOCH;
440        let mut logs = ThroughputLogs::new(Duration::from_secs(1), start);
441        logs.push_pending(start);
442
443        let report = logs.report(start + Duration::from_millis(300));
444        assert_eq!(ThroughputReport::Incomplete, report);
445    }
446
447    #[test]
448    fn incomplete_with_transferred() {
449        let start = SystemTime::UNIX_EPOCH;
450        let mut logs = ThroughputLogs::new(Duration::from_secs(1), start);
451        logs.push_pending(start);
452        logs.push_bytes_transferred(start + Duration::from_millis(100), 10);
453
454        let report = logs.report(start + Duration::from_millis(300));
455        assert_eq!(ThroughputReport::Incomplete, report);
456    }
457
458    #[test]
459    fn push_pending_at_the_beginning_of_each_tick() {
460        let start = SystemTime::UNIX_EPOCH;
461        let mut logs = ThroughputLogs::new(Duration::from_secs(1), start);
462
463        let mut now = start;
464        for i in 1..=BIN_COUNT {
465            logs.push_pending(now);
466            now += logs.resolution();
467
468            assert_eq!(i, logs.buffer.counts().pending);
469        }
470
471        let report = dbg!(&mut logs).report(now);
472        assert_eq!(ThroughputReport::Pending, report);
473    }
474
475    #[test]
476    fn push_pending_at_the_end_of_each_tick() {
477        let start = SystemTime::UNIX_EPOCH;
478        let mut logs = ThroughputLogs::new(Duration::from_secs(1), start);
479
480        let mut now = start;
481        for i in 1..BIN_COUNT {
482            now += logs.resolution();
483            logs.push_pending(now);
484
485            assert_eq!(i, dbg!(&logs).buffer.counts().pending);
486            assert_eq!(0, logs.buffer.counts().transferred);
487            assert_eq!(1, logs.buffer.counts().no_polling);
488        }
489        now += logs.resolution();
491        logs.push_pending(now);
492        assert_eq!(0, logs.buffer.counts().no_polling);
493
494        let report = dbg!(&mut logs).report(now);
495        assert_eq!(ThroughputReport::Pending, report);
496    }
497
498    #[test]
499    fn push_transferred_at_the_beginning_of_each_tick() {
500        let start = SystemTime::UNIX_EPOCH;
501        let mut logs = ThroughputLogs::new(Duration::from_secs(1), start);
502
503        let mut now = start;
504        for i in 1..=BIN_COUNT {
505            logs.push_bytes_transferred(now, 10);
506            if i != BIN_COUNT {
507                now += logs.resolution();
508            }
509
510            assert_eq!(i, logs.buffer.counts().transferred);
511            assert_eq!(0, logs.buffer.counts().pending);
512            assert_eq!(0, logs.buffer.counts().no_polling);
513        }
514
515        let report = dbg!(&mut logs).report(now);
516        assert_eq!(
517            ThroughputReport::Transferred(Throughput::new(100, Duration::from_secs(1))),
518            report
519        );
520    }
521
522    #[test]
523    fn no_polling() {
524        let start = SystemTime::UNIX_EPOCH;
525        let mut logs = ThroughputLogs::new(Duration::from_secs(1), start);
526        let report = logs.report(start + Duration::from_secs(2));
527        assert_eq!(ThroughputReport::NoPolling, report);
528    }
529
530    #[test]
532    fn mixed_bag_mostly_pending() {
533        let start = SystemTime::UNIX_EPOCH;
534        let mut logs = ThroughputLogs::new(Duration::from_secs(1), start);
535
536        logs.push_bytes_transferred(start + Duration::from_millis(50), 10);
537        logs.push_pending(start + Duration::from_millis(150));
538        logs.push_pending(start + Duration::from_millis(250));
539        logs.push_bytes_transferred(start + Duration::from_millis(350), 10);
540        logs.push_pending(start + Duration::from_millis(450));
541        logs.push_pending(start + Duration::from_millis(650));
543        logs.push_pending(start + Duration::from_millis(750));
544        logs.push_pending(start + Duration::from_millis(850));
545
546        let report = logs.report(start + Duration::from_millis(999));
547        assert_eq!(
548            ThroughputReport::Transferred(Throughput::new_bytes_per_second(20)),
549            report
550        );
551    }
552
553    #[test]
554    fn mixed_bag_mostly_pending_no_transferred() {
555        let start = SystemTime::UNIX_EPOCH;
556        let mut logs = ThroughputLogs::new(Duration::from_secs(1), start);
557
558        logs.push_pending(start + Duration::from_millis(50));
559        logs.push_pending(start + Duration::from_millis(150));
560        logs.push_pending(start + Duration::from_millis(250));
561        logs.push_pending(start + Duration::from_millis(450));
563        logs.push_pending(start + Duration::from_millis(650));
565        logs.push_pending(start + Duration::from_millis(750));
566        logs.push_pending(start + Duration::from_millis(850));
567
568        let report = logs.report(start + Duration::from_millis(999));
569        assert_eq!(ThroughputReport::Pending, report);
570    }
571
572    #[test]
573    fn test_first_push_succeeds_although_time_window_has_not_elapsed() {
574        let t0 = SystemTime::UNIX_EPOCH;
575        let t1 = t0 + Duration::from_secs(1);
576        let mut tl = ThroughputLogs::new(Duration::from_secs(1), t1);
577
578        tl.push_pending(t0);
579    }
580
581    #[test]
582    fn test_label_transferred_bytes_should_not_be_overwritten_by_pending() {
583        let start = SystemTime::UNIX_EPOCH;
584        let mut logs = ThroughputLogs::new(Duration::from_secs(1), start);
586
587        logs.push_bytes_transferred(start + Duration::from_millis(10), 10);
589        logs.push_pending(start + Duration::from_millis(20));
590
591        let BinCounts {
592            empty,
593            no_polling,
594            transferred,
595            pending,
596        } = logs.buffer.counts();
597
598        assert_eq!(9, empty);
599        assert_eq!(0, no_polling);
600        assert_eq!(1, transferred); assert_eq!(0, pending); }
603}