1 + | /*
|
2 + | * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
3 + | * SPDX-License-Identifier: Apache-2.0
|
4 + | */
|
5 + |
|
6 + | use super::{BoxError, Error, MinimumThroughputDownloadBody};
|
7 + | use crate::client::http::body::minimum_throughput::throughput::DownloadReport;
|
8 + | use crate::client::http::body::minimum_throughput::ThroughputReadingBody;
|
9 + | use aws_smithy_async::rt::sleep::AsyncSleep;
|
10 + | use http_body_1x::Frame;
|
11 + | use std::future::Future;
|
12 + | use std::pin::{pin, Pin};
|
13 + | use std::task::{Context, Poll};
|
14 + |
|
15 + | impl<B> http_body_1x::Body for MinimumThroughputDownloadBody<B>
|
16 + | where
|
17 + | B: http_body_1x::Body<Data = bytes::Bytes, Error = BoxError>,
|
18 + | {
|
19 + | type Data = bytes::Bytes;
|
20 + | type Error = BoxError;
|
21 + |
|
22 + | fn poll_frame(
|
23 + | mut self: Pin<&mut Self>,
|
24 + | cx: &mut Context<'_>,
|
25 + | ) -> Poll<Option<Result<http_body_1x::Frame<Self::Data>, Self::Error>>> {
|
26 + | #[allow(unused_imports)]
|
27 + | use crate::client::http::body::minimum_throughput::throughput::ThroughputReport;
|
28 + | // this code is called quite frequently in production—one every millisecond or so when downloading
|
29 + | // a stream. However, SystemTime::now is on the order of nanoseconds
|
30 + | let now = self.time_source.now();
|
31 + | // Attempt to read the data from the inner body, then update the
|
32 + | // throughput logs.
|
33 + | let mut this = self.as_mut().project();
|
34 + | let poll_res = match this.inner.poll_frame(cx) {
|
35 + | Poll::Ready(Some(Ok(frame))) => {
|
36 + | if frame.is_data() {
|
37 + | let bytes = frame.into_data().expect("Is data frame");
|
38 + | tracing::trace!("received data: {}", bytes.len());
|
39 + | this.throughput_logs
|
40 + | .push_bytes_transferred(now, bytes.len() as u64);
|
41 + | Poll::Ready(Some(Ok(Frame::data(bytes))))
|
42 + | } else {
|
43 + | tracing::trace!("received trailer");
|
44 + | Poll::Ready(Some(Ok(frame)))
|
45 + | }
|
46 + | }
|
47 + | Poll::Pending => {
|
48 + | tracing::trace!("received poll pending");
|
49 + | this.throughput_logs.push_pending(now);
|
50 + | Poll::Pending
|
51 + | }
|
52 + | // If we've read all the data or an error occurred, then return that result.
|
53 + | res => return res,
|
54 + | };
|
55 + |
|
56 + | // Check the sleep future to see if it needs refreshing.
|
57 + | let mut sleep_fut = this
|
58 + | .sleep_fut
|
59 + | .take()
|
60 + | .unwrap_or_else(|| this.async_sleep.sleep(*this.resolution));
|
61 + | if let Poll::Ready(()) = pin!(&mut sleep_fut).poll(cx) {
|
62 + | tracing::trace!("sleep future triggered—triggering a wakeup");
|
63 + | // Whenever the sleep future expires, we replace it.
|
64 + | sleep_fut = this.async_sleep.sleep(*this.resolution);
|
65 + |
|
66 + | // We also schedule a wake up for current task to ensure that
|
67 + | // it gets polled at least one more time.
|
68 + | cx.waker().wake_by_ref();
|
69 + | };
|
70 + | this.sleep_fut.replace(sleep_fut);
|
71 + |
|
72 + | // Calculate the current throughput and emit an error if it's too low and
|
73 + | // the grace period has elapsed.
|
74 + | let report = this.throughput_logs.report(now);
|
75 + | let (violated, current_throughput) =
|
76 + | report.minimum_throughput_violated(this.options.minimum_throughput());
|
77 + | if violated {
|
78 + | if this.grace_period_fut.is_none() {
|
79 + | tracing::debug!("entering minimum throughput grace period");
|
80 + | }
|
81 + | let mut grace_period_fut = this
|
82 + | .grace_period_fut
|
83 + | .take()
|
84 + | .unwrap_or_else(|| this.async_sleep.sleep(this.options.grace_period()));
|
85 + | if let Poll::Ready(()) = pin!(&mut grace_period_fut).poll(cx) {
|
86 + | // The grace period has ended!
|
87 + | return Poll::Ready(Some(Err(Box::new(Error::ThroughputBelowMinimum {
|
88 + | expected: self.options.minimum_throughput(),
|
89 + | actual: current_throughput,
|
90 + | }))));
|
91 + | };
|
92 + | this.grace_period_fut.replace(grace_period_fut);
|
93 + | } else {
|
94 + | // Ensure we don't have an active grace period future if we're not
|
95 + | // currently below the minimum throughput.
|
96 + | if this.grace_period_fut.is_some() {
|
97 + | tracing::debug!("throughput recovered; exiting grace period");
|
98 + | }
|
99 + | let _ = this.grace_period_fut.take();
|
100 + | }
|
101 + |
|
102 + | poll_res
|
103 + | }
|
104 + |
|
105 + | fn is_end_stream(&self) -> bool {
|
106 + | self.inner.is_end_stream()
|
107 + | }
|
108 + |
|
109 + | fn size_hint(&self) -> http_body_1x::SizeHint {
|
110 + | self.inner.size_hint()
|
111 + | }
|
112 + | }
|
113 + |
|
114 + | impl<B> http_body_1x::Body for ThroughputReadingBody<B>
|
115 + | where
|
116 + | B: http_body_1x::Body<Data = bytes::Bytes, Error = BoxError>,
|
117 + | {
|
118 + | type Data = bytes::Bytes;
|
119 + | type Error = BoxError;
|
120 + |
|
121 + | fn poll_frame(
|
122 + | mut self: Pin<&mut Self>,
|
123 + | cx: &mut Context<'_>,
|
124 + | ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
|
125 + | // this code is called quite frequently in production—one every millisecond or so when downloading
|
126 + | // a stream. However, SystemTime::now is on the order of nanoseconds
|
127 + | let now = self.time_source.now();
|
128 + | // Attempt to read the data from the inner body, then update the
|
129 + | // throughput logs.
|
130 + | let this = self.as_mut().project();
|
131 + | match this.inner.poll_frame(cx) {
|
132 + | Poll::Ready(Some(Ok(frame))) => {
|
133 + | if frame.is_data() {
|
134 + | let bytes = frame.into_data().expect("Is data frame");
|
135 + | tracing::trace!("received data: {}", bytes.len());
|
136 + | this.throughput
|
137 + | .push_bytes_transferred(now, bytes.len() as u64);
|
138 + |
|
139 + | // hyper will optimistically stop polling when end of stream is reported
|
140 + | // (e.g. when content-length amount of data has been consumed) which means
|
141 + | // we may never get to `Poll:Ready(None)`. Check for same condition and
|
142 + | // attempt to stop checking throughput violations _now_ as we may never
|
143 + | // get polled again. The caveat here is that it depends on `Body` implementations
|
144 + | // implementing `is_end_stream()` correctly. Users can also disable SSP as an
|
145 + | // alternative for such fringe use cases.
|
146 + | if self.is_end_stream() {
|
147 + | tracing::trace!("stream reported end of stream before Poll::Ready(None) reached; marking stream complete");
|
148 + | self.throughput.mark_complete();
|
149 + | }
|
150 + | Poll::Ready(Some(Ok(Frame::data(bytes))))
|
151 + | } else {
|
152 + | Poll::Ready(Some(Ok(frame)))
|
153 + | }
|
154 + | }
|
155 + | Poll::Pending => {
|
156 + | tracing::trace!("received poll pending");
|
157 + | this.throughput.push_pending(now);
|
158 + | Poll::Pending
|
159 + | }
|
160 + | // If we've read all the data or an error occurred, then return that result.
|
161 + | res => {
|
162 + | if this.throughput.mark_complete() {
|
163 + | tracing::trace!("stream completed: {:?}", res);
|
164 + | }
|
165 + | res
|
166 + | }
|
167 + | }
|
168 + | }
|
169 + |
|
170 + | fn is_end_stream(&self) -> bool {
|
171 + | self.inner.is_end_stream()
|
172 + | }
|
173 + |
|
174 + | fn size_hint(&self) -> http_body_1x::SizeHint {
|
175 + | self.inner.size_hint()
|
176 + | }
|
177 + | }
|