aws_smithy_runtime/client/http/body/
minimum_throughput.rs

1/*
2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6//! A body-wrapping type that ensures data is being streamed faster than some lower limit.
7//!
8//! If data is being streamed too slowly, this body type will emit an error next time it's polled.
9
10/// An implementation of v0.4 `http_body::Body` for `MinimumThroughputBody` and related code.
11pub mod http_body_0_4_x;
12
13/// An implementation of v1.0 `http_body::Body` for `MinimumThroughputBody` and related code.
14pub mod http_body_1_x;
15
16/// Options for a [`MinimumThroughputBody`].
17pub mod options;
18pub use throughput::Throughput;
19mod throughput;
20
21use crate::client::http::body::minimum_throughput::throughput::ThroughputReport;
22use aws_smithy_async::rt::sleep::Sleep;
23use aws_smithy_async::rt::sleep::{AsyncSleep, SharedAsyncSleep};
24use aws_smithy_async::time::{SharedTimeSource, TimeSource};
25use aws_smithy_runtime_api::{
26    box_error::BoxError,
27    client::{
28        http::HttpConnectorFuture, result::ConnectorError, runtime_components::RuntimeComponents,
29        stalled_stream_protection::StalledStreamProtectionConfig,
30    },
31};
32use aws_smithy_runtime_api::{client::orchestrator::HttpResponse, shared::IntoShared};
33use aws_smithy_types::config_bag::{ConfigBag, Storable, StoreReplace};
34use options::MinimumThroughputBodyOptions;
35use std::{
36    fmt,
37    sync::{Arc, Mutex},
38    task::Poll,
39};
40use std::{future::Future, pin::Pin};
41use std::{
42    task::Context,
43    time::{Duration, SystemTime},
44};
45use throughput::ThroughputLogs;
46
47/// Use [`MinimumThroughputDownloadBody`] instead.
48#[deprecated(note = "Renamed to MinimumThroughputDownloadBody since it doesn't work for uploads")]
49pub type MinimumThroughputBody<B> = MinimumThroughputDownloadBody<B>;
50
51pin_project_lite::pin_project! {
52    /// A body-wrapping type that ensures data is being streamed faster than some lower limit.
53    ///
54    /// If data is being streamed too slowly, this body type will emit an error next time it's polled.
55    pub struct MinimumThroughputDownloadBody<B> {
56        async_sleep: SharedAsyncSleep,
57        time_source: SharedTimeSource,
58        options: MinimumThroughputBodyOptions,
59        throughput_logs: ThroughputLogs,
60        resolution: Duration,
61        #[pin]
62        sleep_fut: Option<Sleep>,
63        #[pin]
64        grace_period_fut: Option<Sleep>,
65        #[pin]
66        inner: B,
67    }
68}
69
70impl<B> MinimumThroughputDownloadBody<B> {
71    /// Create a new minimum throughput body.
72    pub fn new(
73        time_source: impl TimeSource + 'static,
74        async_sleep: impl AsyncSleep + 'static,
75        body: B,
76        options: MinimumThroughputBodyOptions,
77    ) -> Self {
78        let time_source: SharedTimeSource = time_source.into_shared();
79        let now = time_source.now();
80        let throughput_logs = ThroughputLogs::new(options.check_window(), now);
81        let resolution = throughput_logs.resolution();
82        Self {
83            throughput_logs,
84            resolution,
85            async_sleep: async_sleep.into_shared(),
86            time_source,
87            inner: body,
88            sleep_fut: None,
89            grace_period_fut: None,
90            options,
91        }
92    }
93}
94
95#[derive(Debug, PartialEq)]
96enum Error {
97    ThroughputBelowMinimum {
98        expected: Throughput,
99        actual: Throughput,
100    },
101}
102
103impl fmt::Display for Error {
104    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
105        match self {
106            Self::ThroughputBelowMinimum { expected, actual } => {
107                write!(
108                    f,
109                    "minimum throughput was specified at {expected}, but throughput of {actual} was observed",
110                )
111            }
112        }
113    }
114}
115
116impl std::error::Error for Error {}
117
118/// Used to store the upload throughput in the interceptor context.
119#[derive(Clone, Debug)]
120pub(crate) struct UploadThroughput {
121    logs: Arc<Mutex<ThroughputLogs>>,
122}
123
124impl UploadThroughput {
125    pub(crate) fn new(time_window: Duration, now: SystemTime) -> Self {
126        Self {
127            logs: Arc::new(Mutex::new(ThroughputLogs::new(time_window, now))),
128        }
129    }
130
131    pub(crate) fn resolution(&self) -> Duration {
132        self.logs.lock().unwrap().resolution()
133    }
134
135    pub(crate) fn push_pending(&self, now: SystemTime) {
136        self.logs.lock().unwrap().push_pending(now);
137    }
138    pub(crate) fn push_bytes_transferred(&self, now: SystemTime, bytes: u64) {
139        self.logs.lock().unwrap().push_bytes_transferred(now, bytes);
140    }
141
142    pub(crate) fn mark_complete(&self) -> bool {
143        self.logs.lock().unwrap().mark_complete()
144    }
145
146    pub(crate) fn report(&self, now: SystemTime) -> ThroughputReport {
147        self.logs.lock().unwrap().report(now)
148    }
149}
150
151impl Storable for UploadThroughput {
152    type Storer = StoreReplace<Self>;
153}
154
155pin_project_lite::pin_project! {
156    pub(crate) struct ThroughputReadingBody<B> {
157        time_source: SharedTimeSource,
158        throughput: UploadThroughput,
159        #[pin]
160        inner: B,
161    }
162}
163
164impl<B> ThroughputReadingBody<B> {
165    pub(crate) fn new(
166        time_source: SharedTimeSource,
167        throughput: UploadThroughput,
168        body: B,
169    ) -> Self {
170        Self {
171            time_source,
172            throughput,
173            inner: body,
174        }
175    }
176}
177
178const ZERO_THROUGHPUT: Throughput = Throughput::new_bytes_per_second(0);
179
180// Helper trait for interpretting the throughput report.
181trait UploadReport {
182    fn minimum_throughput_violated(self, minimum_throughput: Throughput) -> (bool, Throughput);
183}
184impl UploadReport for ThroughputReport {
185    fn minimum_throughput_violated(self, minimum_throughput: Throughput) -> (bool, Throughput) {
186        let throughput = match self {
187            // stream has been exhausted, stop tracking violations
188            ThroughputReport::Complete => return (false, ZERO_THROUGHPUT),
189            // If the report is incomplete, then we don't have enough data yet to
190            // decide if minimum throughput was violated.
191            ThroughputReport::Incomplete => {
192                tracing::trace!(
193                    "not enough data to decide if minimum throughput has been violated"
194                );
195                return (false, ZERO_THROUGHPUT);
196            }
197            // If most of the datapoints are Poll::Pending, then the user has stalled.
198            // In this case, we don't want to say minimum throughput was violated.
199            ThroughputReport::Pending => {
200                tracing::debug!(
201                    "the user has stalled; this will not become a minimum throughput violation"
202                );
203                return (false, ZERO_THROUGHPUT);
204            }
205            // If there has been no polling, then the server has stalled. Alternatively,
206            // if we're transferring data, but it's too slow, then we also want to say
207            // that the minimum throughput has been violated.
208            ThroughputReport::NoPolling => ZERO_THROUGHPUT,
209            ThroughputReport::Transferred(tp) => tp,
210        };
211        if throughput < minimum_throughput {
212            tracing::debug!(
213                "current throughput: {throughput} is below minimum: {minimum_throughput}"
214            );
215            (true, throughput)
216        } else {
217            (false, throughput)
218        }
219    }
220}
221
222pin_project_lite::pin_project! {
223    /// Future that pairs with [`UploadThroughput`] to add a minimum throughput
224    /// requirement to a request upload stream.
225    pub(crate) struct UploadThroughputCheckFuture {
226        #[pin]
227        response: HttpConnectorFuture,
228        #[pin]
229        check_interval: Option<Sleep>,
230        #[pin]
231        grace_period: Option<Sleep>,
232
233        time_source: SharedTimeSource,
234        sleep_impl: SharedAsyncSleep,
235        upload_throughput: UploadThroughput,
236        resolution: Duration,
237        options: MinimumThroughputBodyOptions,
238
239        failing_throughput: Option<Throughput>,
240    }
241}
242
243impl UploadThroughputCheckFuture {
244    fn new(
245        response: HttpConnectorFuture,
246        time_source: SharedTimeSource,
247        sleep_impl: SharedAsyncSleep,
248        upload_throughput: UploadThroughput,
249        options: MinimumThroughputBodyOptions,
250    ) -> Self {
251        let resolution = upload_throughput.resolution();
252        Self {
253            response,
254            check_interval: Some(sleep_impl.sleep(resolution)),
255            grace_period: None,
256            time_source,
257            sleep_impl,
258            upload_throughput,
259            resolution,
260            options,
261            failing_throughput: None,
262        }
263    }
264}
265
266impl Future for UploadThroughputCheckFuture {
267    type Output = Result<HttpResponse, ConnectorError>;
268
269    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
270        let mut this = self.project();
271
272        if let Poll::Ready(output) = this.response.poll(cx) {
273            return Poll::Ready(output);
274        } else {
275            let mut below_minimum_throughput = false;
276            let check_interval_expired = this
277                .check_interval
278                .as_mut()
279                .as_pin_mut()
280                .expect("always set")
281                .poll(cx)
282                .is_ready();
283            if check_interval_expired {
284                // Set up the next check interval
285                *this.check_interval = Some(this.sleep_impl.sleep(*this.resolution));
286
287                // Wake so that the check interval future gets polled
288                // next time this poll method is called. If it never gets polled,
289                // then this task won't be woken to check again.
290                cx.waker().wake_by_ref();
291            }
292
293            let should_check = check_interval_expired || this.grace_period.is_some();
294            if should_check {
295                let now = this.time_source.now();
296                let report = this.upload_throughput.report(now);
297                let (violated, current_throughput) =
298                    report.minimum_throughput_violated(this.options.minimum_throughput());
299                below_minimum_throughput = violated;
300                if below_minimum_throughput && !this.failing_throughput.is_some() {
301                    *this.failing_throughput = Some(current_throughput);
302                } else if !below_minimum_throughput {
303                    *this.failing_throughput = None;
304                }
305            }
306
307            // If we kicked off a grace period and are now satisfied, clear out the grace period
308            if !below_minimum_throughput && this.grace_period.is_some() {
309                tracing::debug!("upload minimum throughput recovered during grace period");
310                *this.grace_period = None;
311            }
312            if below_minimum_throughput {
313                // Start a grace period if below minimum throughput
314                if this.grace_period.is_none() {
315                    tracing::debug!(
316                        grace_period=?this.options.grace_period(),
317                        "upload minimum throughput below configured minimum; starting grace period"
318                    );
319                    *this.grace_period = Some(this.sleep_impl.sleep(this.options.grace_period()));
320                }
321                // Check the grace period if one is already set and we're not satisfied
322                if let Some(grace_period) = this.grace_period.as_pin_mut() {
323                    if grace_period.poll(cx).is_ready() {
324                        tracing::debug!("grace period ended; timing out request");
325                        return Poll::Ready(Err(ConnectorError::timeout(
326                            Error::ThroughputBelowMinimum {
327                                expected: this.options.minimum_throughput(),
328                                actual: this
329                                    .failing_throughput
330                                    .expect("always set if there's a grace period"),
331                            }
332                            .into(),
333                        )));
334                    }
335                }
336            }
337        }
338        Poll::Pending
339    }
340}
341
342pin_project_lite::pin_project! {
343    #[project = EnumProj]
344    pub(crate) enum MaybeUploadThroughputCheckFuture {
345        Direct { #[pin] future: HttpConnectorFuture },
346        Checked { #[pin] future: UploadThroughputCheckFuture },
347    }
348}
349
350impl MaybeUploadThroughputCheckFuture {
351    pub(crate) fn new(
352        cfg: &mut ConfigBag,
353        components: &RuntimeComponents,
354        connector_future: HttpConnectorFuture,
355    ) -> Self {
356        if let Some(sspcfg) = cfg.load::<StalledStreamProtectionConfig>().cloned() {
357            if sspcfg.is_enabled() {
358                let options = MinimumThroughputBodyOptions::from(sspcfg);
359                return Self::new_inner(
360                    connector_future,
361                    components.time_source(),
362                    components.sleep_impl(),
363                    cfg.interceptor_state().load::<UploadThroughput>().cloned(),
364                    Some(options),
365                );
366            }
367        }
368        tracing::debug!("no minimum upload throughput checks");
369        Self::new_inner(connector_future, None, None, None, None)
370    }
371
372    fn new_inner(
373        response: HttpConnectorFuture,
374        time_source: Option<SharedTimeSource>,
375        sleep_impl: Option<SharedAsyncSleep>,
376        upload_throughput: Option<UploadThroughput>,
377        options: Option<MinimumThroughputBodyOptions>,
378    ) -> Self {
379        match (time_source, sleep_impl, upload_throughput, options) {
380            (Some(time_source), Some(sleep_impl), Some(upload_throughput), Some(options)) => {
381                tracing::debug!(options=?options, "applying minimum upload throughput check future");
382                Self::Checked {
383                    future: UploadThroughputCheckFuture::new(
384                        response,
385                        time_source,
386                        sleep_impl,
387                        upload_throughput,
388                        options,
389                    ),
390                }
391            }
392            _ => Self::Direct { future: response },
393        }
394    }
395}
396
397impl Future for MaybeUploadThroughputCheckFuture {
398    type Output = Result<HttpResponse, ConnectorError>;
399
400    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
401        match self.project() {
402            EnumProj::Direct { future } => future.poll(cx),
403            EnumProj::Checked { future } => future.poll(cx),
404        }
405    }
406}