aws_smithy_runtime/client/http/body/
minimum_throughput.rs1pub mod http_body_0_4_x;
12
13pub mod http_body_1_x;
15
16pub mod options;
18pub use throughput::Throughput;
19mod throughput;
20
21use crate::client::http::body::minimum_throughput::throughput::ThroughputReport;
22use aws_smithy_async::rt::sleep::Sleep;
23use aws_smithy_async::rt::sleep::{AsyncSleep, SharedAsyncSleep};
24use aws_smithy_async::time::{SharedTimeSource, TimeSource};
25use aws_smithy_runtime_api::{
26 box_error::BoxError,
27 client::{
28 http::HttpConnectorFuture, result::ConnectorError, runtime_components::RuntimeComponents,
29 stalled_stream_protection::StalledStreamProtectionConfig,
30 },
31};
32use aws_smithy_runtime_api::{client::orchestrator::HttpResponse, shared::IntoShared};
33use aws_smithy_types::config_bag::{ConfigBag, Storable, StoreReplace};
34use options::MinimumThroughputBodyOptions;
35use std::{
36 fmt,
37 sync::{Arc, Mutex},
38 task::Poll,
39};
40use std::{future::Future, pin::Pin};
41use std::{
42 task::Context,
43 time::{Duration, SystemTime},
44};
45use throughput::ThroughputLogs;
46
47#[deprecated(note = "Renamed to MinimumThroughputDownloadBody since it doesn't work for uploads")]
49pub type MinimumThroughputBody<B> = MinimumThroughputDownloadBody<B>;
50
51pin_project_lite::pin_project! {
52 pub struct MinimumThroughputDownloadBody<B> {
56 async_sleep: SharedAsyncSleep,
57 time_source: SharedTimeSource,
58 options: MinimumThroughputBodyOptions,
59 throughput_logs: ThroughputLogs,
60 resolution: Duration,
61 #[pin]
62 sleep_fut: Option<Sleep>,
63 #[pin]
64 grace_period_fut: Option<Sleep>,
65 #[pin]
66 inner: B,
67 }
68}
69
70impl<B> MinimumThroughputDownloadBody<B> {
71 pub fn new(
73 time_source: impl TimeSource + 'static,
74 async_sleep: impl AsyncSleep + 'static,
75 body: B,
76 options: MinimumThroughputBodyOptions,
77 ) -> Self {
78 let time_source: SharedTimeSource = time_source.into_shared();
79 let now = time_source.now();
80 let throughput_logs = ThroughputLogs::new(options.check_window(), now);
81 let resolution = throughput_logs.resolution();
82 Self {
83 throughput_logs,
84 resolution,
85 async_sleep: async_sleep.into_shared(),
86 time_source,
87 inner: body,
88 sleep_fut: None,
89 grace_period_fut: None,
90 options,
91 }
92 }
93}
94
95#[derive(Debug, PartialEq)]
96enum Error {
97 ThroughputBelowMinimum {
98 expected: Throughput,
99 actual: Throughput,
100 },
101}
102
103impl fmt::Display for Error {
104 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
105 match self {
106 Self::ThroughputBelowMinimum { expected, actual } => {
107 write!(
108 f,
109 "minimum throughput was specified at {expected}, but throughput of {actual} was observed",
110 )
111 }
112 }
113 }
114}
115
116impl std::error::Error for Error {}
117
118#[derive(Clone, Debug)]
120pub(crate) struct UploadThroughput {
121 logs: Arc<Mutex<ThroughputLogs>>,
122}
123
124impl UploadThroughput {
125 pub(crate) fn new(time_window: Duration, now: SystemTime) -> Self {
126 Self {
127 logs: Arc::new(Mutex::new(ThroughputLogs::new(time_window, now))),
128 }
129 }
130
131 pub(crate) fn resolution(&self) -> Duration {
132 self.logs.lock().unwrap().resolution()
133 }
134
135 pub(crate) fn push_pending(&self, now: SystemTime) {
136 self.logs.lock().unwrap().push_pending(now);
137 }
138 pub(crate) fn push_bytes_transferred(&self, now: SystemTime, bytes: u64) {
139 self.logs.lock().unwrap().push_bytes_transferred(now, bytes);
140 }
141
142 pub(crate) fn mark_complete(&self) -> bool {
143 self.logs.lock().unwrap().mark_complete()
144 }
145
146 pub(crate) fn report(&self, now: SystemTime) -> ThroughputReport {
147 self.logs.lock().unwrap().report(now)
148 }
149}
150
151impl Storable for UploadThroughput {
152 type Storer = StoreReplace<Self>;
153}
154
155pin_project_lite::pin_project! {
156 pub(crate) struct ThroughputReadingBody<B> {
157 time_source: SharedTimeSource,
158 throughput: UploadThroughput,
159 #[pin]
160 inner: B,
161 }
162}
163
164impl<B> ThroughputReadingBody<B> {
165 pub(crate) fn new(
166 time_source: SharedTimeSource,
167 throughput: UploadThroughput,
168 body: B,
169 ) -> Self {
170 Self {
171 time_source,
172 throughput,
173 inner: body,
174 }
175 }
176}
177
178const ZERO_THROUGHPUT: Throughput = Throughput::new_bytes_per_second(0);
179
180trait UploadReport {
182 fn minimum_throughput_violated(self, minimum_throughput: Throughput) -> (bool, Throughput);
183}
184impl UploadReport for ThroughputReport {
185 fn minimum_throughput_violated(self, minimum_throughput: Throughput) -> (bool, Throughput) {
186 let throughput = match self {
187 ThroughputReport::Complete => return (false, ZERO_THROUGHPUT),
189 ThroughputReport::Incomplete => {
192 tracing::trace!(
193 "not enough data to decide if minimum throughput has been violated"
194 );
195 return (false, ZERO_THROUGHPUT);
196 }
197 ThroughputReport::Pending => {
200 tracing::debug!(
201 "the user has stalled; this will not become a minimum throughput violation"
202 );
203 return (false, ZERO_THROUGHPUT);
204 }
205 ThroughputReport::NoPolling => ZERO_THROUGHPUT,
209 ThroughputReport::Transferred(tp) => tp,
210 };
211 if throughput < minimum_throughput {
212 tracing::debug!(
213 "current throughput: {throughput} is below minimum: {minimum_throughput}"
214 );
215 (true, throughput)
216 } else {
217 (false, throughput)
218 }
219 }
220}
221
222pin_project_lite::pin_project! {
223 pub(crate) struct UploadThroughputCheckFuture {
226 #[pin]
227 response: HttpConnectorFuture,
228 #[pin]
229 check_interval: Option<Sleep>,
230 #[pin]
231 grace_period: Option<Sleep>,
232
233 time_source: SharedTimeSource,
234 sleep_impl: SharedAsyncSleep,
235 upload_throughput: UploadThroughput,
236 resolution: Duration,
237 options: MinimumThroughputBodyOptions,
238
239 failing_throughput: Option<Throughput>,
240 }
241}
242
243impl UploadThroughputCheckFuture {
244 fn new(
245 response: HttpConnectorFuture,
246 time_source: SharedTimeSource,
247 sleep_impl: SharedAsyncSleep,
248 upload_throughput: UploadThroughput,
249 options: MinimumThroughputBodyOptions,
250 ) -> Self {
251 let resolution = upload_throughput.resolution();
252 Self {
253 response,
254 check_interval: Some(sleep_impl.sleep(resolution)),
255 grace_period: None,
256 time_source,
257 sleep_impl,
258 upload_throughput,
259 resolution,
260 options,
261 failing_throughput: None,
262 }
263 }
264}
265
266impl Future for UploadThroughputCheckFuture {
267 type Output = Result<HttpResponse, ConnectorError>;
268
269 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
270 let mut this = self.project();
271
272 if let Poll::Ready(output) = this.response.poll(cx) {
273 return Poll::Ready(output);
274 } else {
275 let mut below_minimum_throughput = false;
276 let check_interval_expired = this
277 .check_interval
278 .as_mut()
279 .as_pin_mut()
280 .expect("always set")
281 .poll(cx)
282 .is_ready();
283 if check_interval_expired {
284 *this.check_interval = Some(this.sleep_impl.sleep(*this.resolution));
286
287 cx.waker().wake_by_ref();
291 }
292
293 let should_check = check_interval_expired || this.grace_period.is_some();
294 if should_check {
295 let now = this.time_source.now();
296 let report = this.upload_throughput.report(now);
297 let (violated, current_throughput) =
298 report.minimum_throughput_violated(this.options.minimum_throughput());
299 below_minimum_throughput = violated;
300 if below_minimum_throughput && !this.failing_throughput.is_some() {
301 *this.failing_throughput = Some(current_throughput);
302 } else if !below_minimum_throughput {
303 *this.failing_throughput = None;
304 }
305 }
306
307 if !below_minimum_throughput && this.grace_period.is_some() {
309 tracing::debug!("upload minimum throughput recovered during grace period");
310 *this.grace_period = None;
311 }
312 if below_minimum_throughput {
313 if this.grace_period.is_none() {
315 tracing::debug!(
316 grace_period=?this.options.grace_period(),
317 "upload minimum throughput below configured minimum; starting grace period"
318 );
319 *this.grace_period = Some(this.sleep_impl.sleep(this.options.grace_period()));
320 }
321 if let Some(grace_period) = this.grace_period.as_pin_mut() {
323 if grace_period.poll(cx).is_ready() {
324 tracing::debug!("grace period ended; timing out request");
325 return Poll::Ready(Err(ConnectorError::timeout(
326 Error::ThroughputBelowMinimum {
327 expected: this.options.minimum_throughput(),
328 actual: this
329 .failing_throughput
330 .expect("always set if there's a grace period"),
331 }
332 .into(),
333 )));
334 }
335 }
336 }
337 }
338 Poll::Pending
339 }
340}
341
342pin_project_lite::pin_project! {
343 #[project = EnumProj]
344 pub(crate) enum MaybeUploadThroughputCheckFuture {
345 Direct { #[pin] future: HttpConnectorFuture },
346 Checked { #[pin] future: UploadThroughputCheckFuture },
347 }
348}
349
350impl MaybeUploadThroughputCheckFuture {
351 pub(crate) fn new(
352 cfg: &mut ConfigBag,
353 components: &RuntimeComponents,
354 connector_future: HttpConnectorFuture,
355 ) -> Self {
356 if let Some(sspcfg) = cfg.load::<StalledStreamProtectionConfig>().cloned() {
357 if sspcfg.is_enabled() {
358 let options = MinimumThroughputBodyOptions::from(sspcfg);
359 return Self::new_inner(
360 connector_future,
361 components.time_source(),
362 components.sleep_impl(),
363 cfg.interceptor_state().load::<UploadThroughput>().cloned(),
364 Some(options),
365 );
366 }
367 }
368 tracing::debug!("no minimum upload throughput checks");
369 Self::new_inner(connector_future, None, None, None, None)
370 }
371
372 fn new_inner(
373 response: HttpConnectorFuture,
374 time_source: Option<SharedTimeSource>,
375 sleep_impl: Option<SharedAsyncSleep>,
376 upload_throughput: Option<UploadThroughput>,
377 options: Option<MinimumThroughputBodyOptions>,
378 ) -> Self {
379 match (time_source, sleep_impl, upload_throughput, options) {
380 (Some(time_source), Some(sleep_impl), Some(upload_throughput), Some(options)) => {
381 tracing::debug!(options=?options, "applying minimum upload throughput check future");
382 Self::Checked {
383 future: UploadThroughputCheckFuture::new(
384 response,
385 time_source,
386 sleep_impl,
387 upload_throughput,
388 options,
389 ),
390 }
391 }
392 _ => Self::Direct { future: response },
393 }
394 }
395}
396
397impl Future for MaybeUploadThroughputCheckFuture {
398 type Output = Result<HttpResponse, ConnectorError>;
399
400 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
401 match self.project() {
402 EnumProj::Direct { future } => future.poll(cx),
403 EnumProj::Checked { future } => future.poll(cx),
404 }
405 }
406}