aws_smithy_types/
body.rs

1/*
2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6//! Types for representing the body of an HTTP request or response
7
8use bytes::Bytes;
9use pin_project_lite::pin_project;
10use std::collections::VecDeque;
11use std::error::Error as StdError;
12use std::fmt::{self, Debug, Formatter};
13use std::future::poll_fn;
14use std::pin::Pin;
15use std::sync::Arc;
16use std::task::{Context, Poll};
17
18/// This module is named after the `http-body` version number since we anticipate
19/// needing to provide equivalent functionality for 1.x of that crate in the future.
20/// The name has a suffix `_x` to avoid name collision with a third-party `http-body-0-4`.
21#[cfg(feature = "http-body-0-4-x")]
22pub mod http_body_0_4_x;
23#[cfg(feature = "http-body-1-x")]
24pub mod http_body_1_x;
25
26/// A generic, boxed error that's `Send` and `Sync`
27pub type Error = Box<dyn StdError + Send + Sync>;
28
29pin_project! {
30    /// SdkBody type
31    ///
32    /// This is the Body used for dispatching all HTTP Requests.
33    /// For handling responses, the type of the body will be controlled
34    /// by the HTTP stack.
35    ///
36    pub struct SdkBody {
37        #[pin]
38        inner: Inner,
39        // An optional function to recreate the inner body
40        //
41        // In the event of retry, this function will be called to generate a new body. See
42        // [`try_clone()`](SdkBody::try_clone)
43        rebuild: Option<Arc<dyn (Fn() -> Inner) + Send + Sync>>,
44        bytes_contents: Option<Bytes>,
45        // Here the optionality indicates whether we have started streaming trailers, and the
46        // VecDeque serves as a buffer for trailer frames that are polled by poll_next instead
47        // of poll_next_trailers
48        trailers: Option<VecDeque<http_1x::HeaderMap>>,
49    }
50}
51
52impl Debug for SdkBody {
53    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
54        f.debug_struct("SdkBody")
55            .field("inner", &self.inner)
56            .field("retryable", &self.rebuild.is_some())
57            .finish()
58    }
59}
60
61/// A boxed generic HTTP body that, when consumed, will result in [`Bytes`] or an [`Error`].
62#[allow(dead_code)]
63enum BoxBody {
64    // This is enabled by the **dependency**, not the feature. This allows us to construct it
65    // whenever we have the dependency and keep the APIs private
66    #[cfg(any(
67        feature = "http-body-0-4-x",
68        feature = "http-body-1-x",
69        feature = "rt-tokio"
70    ))]
71    // will be dead code with `--no-default-features --features rt-tokio`
72    HttpBody04(#[allow(dead_code)] http_body_0_4::combinators::BoxBody<Bytes, Error>),
73
74    #[cfg(feature = "http-body-1-x")]
75    HttpBody1(#[allow(dead_code)] http_body_util::combinators::BoxBody<Bytes, Error>),
76}
77
78pin_project! {
79    #[project = InnerProj]
80    enum Inner {
81        // An in-memory body
82        Once {
83            inner: Option<Bytes>
84        },
85        // A streaming body
86        Dyn {
87            #[pin]
88            inner: BoxBody,
89        },
90
91        /// When a streaming body is transferred out to a stream parser, the body is replaced with
92        /// `Taken`. This will return an Error when polled. Attempting to read data out of a `Taken`
93        /// Body is a bug.
94        Taken,
95    }
96}
97
98impl Debug for Inner {
99    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
100        match &self {
101            Inner::Once { inner: once } => f.debug_tuple("Once").field(once).finish(),
102            Inner::Dyn { .. } => write!(f, "BoxBody"),
103            Inner::Taken => f.debug_tuple("Taken").finish(),
104        }
105    }
106}
107
108impl SdkBody {
109    /// Construct an explicitly retryable SDK body
110    ///
111    /// _Note: This is probably not what you want_
112    ///
113    /// All bodies constructed from in-memory data (`String`, `Vec<u8>`, `Bytes`, etc.) will be
114    /// retryable out of the box. If you want to read data from a file, you should use
115    /// [`ByteStream::from_path`](crate::byte_stream::ByteStream::from_path). This function
116    /// is only necessary when you need to enable retries for your own streaming container.
117    pub fn retryable(f: impl Fn() -> SdkBody + Send + Sync + 'static) -> Self {
118        let initial = f();
119        SdkBody {
120            inner: initial.inner,
121            rebuild: Some(Arc::new(move || f().inner)),
122            bytes_contents: initial.bytes_contents,
123            trailers: None,
124        }
125    }
126
127    /// When an SdkBody is read, the inner data must be consumed. In order to do this, the SdkBody
128    /// is swapped with a "taken" body. This "taken" body cannot be read but aids in debugging.
129    pub fn taken() -> Self {
130        Self {
131            inner: Inner::Taken,
132            rebuild: None,
133            bytes_contents: None,
134            trailers: None,
135        }
136    }
137
138    /// Create an empty SdkBody for requests and responses that don't transfer any data in the body.
139    pub fn empty() -> Self {
140        Self {
141            inner: Inner::Once { inner: None },
142            rebuild: Some(Arc::new(|| Inner::Once { inner: None })),
143            bytes_contents: Some(Bytes::new()),
144            trailers: None,
145        }
146    }
147
148    pub(crate) async fn next(&mut self) -> Option<Result<Bytes, Error>> {
149        let mut me = Pin::new(self);
150        poll_fn(|cx| me.as_mut().poll_next(cx)).await
151    }
152
153    pub(crate) fn poll_next(
154        self: Pin<&mut Self>,
155        #[allow(unused)] cx: &mut Context<'_>,
156    ) -> Poll<Option<Result<Bytes, Error>>> {
157        let this = self.project();
158        match this.inner.project() {
159            InnerProj::Once { ref mut inner } => {
160                let data = inner.take();
161                match data {
162                    Some(bytes) if bytes.is_empty() => Poll::Ready(None),
163                    Some(bytes) => Poll::Ready(Some(Ok(bytes))),
164                    None => Poll::Ready(None),
165                }
166            }
167            InnerProj::Dyn { inner: body } => match body.get_mut() {
168                #[cfg(feature = "http-body-0-4-x")]
169                BoxBody::HttpBody04(box_body) => {
170                    use http_body_0_4::Body;
171                    Pin::new(box_body).poll_data(cx)
172                }
173                #[cfg(feature = "http-body-1-x")]
174                BoxBody::HttpBody1(box_body) => {
175                    // If this is polled after the trailers have been cached end early
176                    if this.trailers.is_some() {
177                        return Poll::Ready(None);
178                    }
179                    use http_body_1_0::Body;
180                    let maybe_data = Pin::new(box_body).poll_frame(cx);
181                    match maybe_data {
182                        Poll::Ready(Some(Ok(frame))) => {
183                            if frame.is_data() {
184                                Poll::Ready(Some(Ok(frame
185                                    .into_data()
186                                    .expect("Confirmed data frame"))))
187                            } else if frame.is_trailers() {
188                                let trailers =
189                                    frame.into_trailers().expect("Confirmed trailer frame");
190                                // Buffer the trailers for the trailer poll
191                                this.trailers.get_or_insert_with(VecDeque::new).push_back(trailers);
192
193                                Poll::Ready(None)
194                            } else {
195                                unreachable!("Frame must be either data or trailers");
196                            }
197                        }
198                        Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err))),
199                        Poll::Ready(None) => Poll::Ready(None),
200                        Poll::Pending => Poll::Pending,
201                    }
202                }
203                #[allow(unreachable_patterns)]
204                _ => unreachable!(
205                    "enabling `http-body-0-4-x` or `http-body-1-x` is the only way to create the `Dyn` variant"
206                ),
207            },
208            InnerProj::Taken => {
209                Poll::Ready(Some(Err("A `Taken` body should never be polled".into())))
210            }
211        }
212    }
213
214    #[allow(dead_code)]
215    #[cfg(any(
216        feature = "http-body-0-4-x",
217        feature = "http-body-1-x",
218        feature = "rt-tokio"
219    ))]
220    pub(crate) fn from_body_0_4_internal<T, E>(body: T) -> Self
221    where
222        T: http_body_0_4::Body<Data = Bytes, Error = E> + Send + Sync + 'static,
223        E: Into<Error> + 'static,
224    {
225        Self {
226            inner: Inner::Dyn {
227                inner: BoxBody::HttpBody04(http_body_0_4::combinators::BoxBody::new(
228                    body.map_err(Into::into),
229                )),
230            },
231            rebuild: None,
232            bytes_contents: None,
233            trailers: None,
234        }
235    }
236
237    #[cfg(feature = "http-body-1-x")]
238    pub(crate) fn from_body_1_x_internal<T, E>(body: T) -> Self
239    where
240        T: http_body_1_0::Body<Data = Bytes, Error = E> + Send + Sync + 'static,
241        E: Into<Error> + 'static,
242    {
243        use http_body_util::BodyExt;
244        Self {
245            inner: Inner::Dyn {
246                inner: BoxBody::HttpBody1(http_body_util::combinators::BoxBody::new(
247                    body.map_err(Into::into),
248                )),
249            },
250            rebuild: None,
251            bytes_contents: None,
252            trailers: None,
253        }
254    }
255
256    #[cfg(any(feature = "http-body-0-4-x", feature = "http-body-1-x",))]
257    pub(crate) fn poll_next_trailers(
258        self: Pin<&mut Self>,
259        cx: &mut Context<'_>,
260    ) -> Poll<Result<Option<http_1x::HeaderMap<http_1x::HeaderValue>>, Error>> {
261        // Three cases that matter here:
262        // 1) Both http-body features disabled, doesn't matter because this func won't compile
263        // 2) http-body-0-4-x enabled but 1-x disabled, we use the http_body_0_4_x conversion
264        // 3) http-body-1-x enabled (and 0-4-x is enabled or disabled), we use the 1-x conversion
265        // as our default whenever it is available
266        #[cfg(all(feature = "http-body-0-4-x", not(feature = "http-body-1-x")))]
267        use crate::body::http_body_0_4_x::convert_headers_0x_1x;
268        #[cfg(feature = "http-body-1-x")]
269        use crate::body::http_body_1_x::convert_headers_0x_1x;
270
271        let this = self.project();
272        match this.inner.project() {
273            InnerProj::Once { .. } => Poll::Ready(Ok(None)),
274            InnerProj::Dyn { inner } => match inner.get_mut() {
275                BoxBody::HttpBody04(box_body) => {
276                    use http_body_0_4::Body;
277                    let polled = Pin::new(box_body).poll_trailers(cx);
278
279                    match polled {
280                        Poll::Ready(Ok(maybe_trailers)) => {
281                            let http_1x_trailers = maybe_trailers.map(convert_headers_0x_1x);
282                            Poll::Ready(Ok(http_1x_trailers))
283                        }
284                        Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
285                        Poll::Pending => Poll::Pending,
286                    }
287                }
288                #[cfg(feature = "http-body-1-x")]
289                BoxBody::HttpBody1(box_body) => {
290                    use http_body_1_0::Body;
291                    // Return the cached trailers without polling
292                    if let Some(trailer_buf) = this.trailers {
293                        if let Some(next_trailer) = trailer_buf.pop_front() {
294                            return Poll::Ready(Ok(Some(next_trailer)));
295                        }
296                    }
297
298                    let polled = Pin::new(box_body).poll_frame(cx);
299                    match polled {
300                        Poll::Ready(Some(Ok(maybe_trailers))) => {
301                            if maybe_trailers.is_data() {
302                                Poll::Ready(Err("Trailers polled while body still has data".into()))
303                            } else {
304                                let trailers = maybe_trailers
305                                    .into_trailers()
306                                    .expect("Frame must be trailers because it is not data");
307                                Poll::Ready(Ok(Some(trailers)))
308                            }
309                        }
310                        Poll::Ready(None) => Poll::Ready(Ok(None)),
311                        Poll::Ready(Some(Err(err))) => Poll::Ready(Err(err)),
312                        Poll::Pending => Poll::Pending,
313                    }
314                }
315            },
316            InnerProj::Taken => Poll::Ready(Err(
317                "A `Taken` body should never be polled for trailers".into(),
318            )),
319        }
320    }
321
322    /// If possible, return a reference to this body as `&[u8]`
323    ///
324    /// If this SdkBody is NOT streaming, this will return the byte slab
325    /// If this SdkBody is streaming, this will return `None`
326    pub fn bytes(&self) -> Option<&[u8]> {
327        match &self.bytes_contents {
328            Some(b) => Some(b),
329            None => None,
330        }
331    }
332
333    /// Attempt to clone this SdkBody. This will fail if the inner data is not cloneable, such as when
334    /// it is a single-use stream that can't be recreated.
335    pub fn try_clone(&self) -> Option<Self> {
336        self.rebuild.as_ref().map(|rebuild| {
337            let next = rebuild();
338            Self {
339                inner: next,
340                rebuild: self.rebuild.clone(),
341                bytes_contents: self.bytes_contents.clone(),
342                trailers: self.trailers.clone(),
343            }
344        })
345    }
346
347    /// Return `true` if this SdkBody is streaming, `false` if it is in-memory.
348    pub fn is_streaming(&self) -> bool {
349        matches!(self.inner, Inner::Dyn { .. })
350    }
351
352    /// Return the length, in bytes, of this SdkBody. If this returns `None`, then the body does not
353    /// have a known length.
354    pub fn content_length(&self) -> Option<u64> {
355        match self.bounds_on_remaining_length() {
356            (lo, Some(hi)) if lo == hi => Some(lo),
357            _ => None,
358        }
359    }
360
361    #[allow(dead_code)] // used by a feature-gated `http-body`'s trait method
362    pub(crate) fn is_end_stream(&self) -> bool {
363        match &self.inner {
364            Inner::Once { inner: None } => true,
365            Inner::Once { inner: Some(bytes) } => bytes.is_empty(),
366            Inner::Dyn { inner: box_body } => match box_body {
367                #[cfg(feature = "http-body-0-4-x")]
368                BoxBody::HttpBody04(box_body) => {
369                    use http_body_0_4::Body;
370                    box_body.is_end_stream()
371                }
372                #[cfg(feature = "http-body-1-x")]
373                BoxBody::HttpBody1(box_body) => {
374                    use http_body_1_0::Body;
375                    box_body.is_end_stream()
376                }
377                #[allow(unreachable_patterns)]
378                _ => unreachable!(
379                    "enabling `http-body-0-4-x` or `http-body-1-x` is the only way to create the `Dyn` variant"
380                ),
381            },
382            Inner::Taken => true,
383        }
384    }
385
386    pub(crate) fn bounds_on_remaining_length(&self) -> (u64, Option<u64>) {
387        match &self.inner {
388            Inner::Once { inner: None } => (0, Some(0)),
389            Inner::Once { inner: Some(bytes) } => {
390                let len = bytes.len() as u64;
391                (len, Some(len))
392            }
393            Inner::Dyn { inner: box_body } => match box_body {
394                #[cfg(feature = "http-body-0-4-x")]
395                BoxBody::HttpBody04(box_body) => {
396                    use http_body_0_4::Body;
397                    let hint = box_body.size_hint();
398                    (hint.lower(), hint.upper())
399                }
400                #[cfg(feature = "http-body-1-x")]
401                BoxBody::HttpBody1(box_body) => {
402                    use http_body_1_0::Body;
403                    let hint = box_body.size_hint();
404                    (hint.lower(), hint.upper())
405                }
406                #[allow(unreachable_patterns)]
407                _ => unreachable!(
408                    "enabling `http-body-0-4-x` or `http-body-1-x` is the only way to create the `Dyn` variant"
409                ),
410            },
411            Inner::Taken => (0, Some(0)),
412        }
413    }
414
415    /// Given a function to modify an `SdkBody`, run that function against this `SdkBody` before
416    /// returning the result.
417    pub fn map(self, f: impl Fn(SdkBody) -> SdkBody + Sync + Send + 'static) -> SdkBody {
418        if self.rebuild.is_some() {
419            SdkBody::retryable(move || f(self.try_clone().unwrap()))
420        } else {
421            f(self)
422        }
423    }
424
425    /// Update this `SdkBody` with `map`. **This function MUST NOT alter the data of the body.**
426    ///
427    /// This function is useful for adding metadata like progress tracking to an [`SdkBody`] that
428    /// does not alter the actual byte data. If your mapper alters the contents of the body, use [`SdkBody::map`]
429    /// instead.
430    pub fn map_preserve_contents(
431        self,
432        f: impl Fn(SdkBody) -> SdkBody + Sync + Send + 'static,
433    ) -> SdkBody {
434        let contents = self.bytes_contents.clone();
435        let mut out = if self.rebuild.is_some() {
436            SdkBody::retryable(move || f(self.try_clone().unwrap()))
437        } else {
438            f(self)
439        };
440        out.bytes_contents = contents;
441        out
442    }
443}
444
445impl From<&str> for SdkBody {
446    fn from(s: &str) -> Self {
447        Self::from(s.as_bytes())
448    }
449}
450
451impl From<Bytes> for SdkBody {
452    fn from(bytes: Bytes) -> Self {
453        let b = bytes.clone();
454        SdkBody {
455            inner: Inner::Once {
456                inner: Some(bytes.clone()),
457            },
458            rebuild: Some(Arc::new(move || Inner::Once {
459                inner: Some(bytes.clone()),
460            })),
461            bytes_contents: Some(b),
462            trailers: None,
463        }
464    }
465}
466
467impl From<Vec<u8>> for SdkBody {
468    fn from(data: Vec<u8>) -> Self {
469        Self::from(Bytes::from(data))
470    }
471}
472
473impl From<String> for SdkBody {
474    fn from(s: String) -> Self {
475        Self::from(s.into_bytes())
476    }
477}
478
479impl From<&[u8]> for SdkBody {
480    fn from(data: &[u8]) -> Self {
481        Self::from(Bytes::copy_from_slice(data))
482    }
483}
484
485#[cfg(test)]
486mod test {
487    use crate::body::SdkBody;
488    use std::pin::Pin;
489
490    #[test]
491    fn valid_size_hint() {
492        assert_eq!(SdkBody::from("hello").content_length(), Some(5));
493        assert_eq!(SdkBody::from("").content_length(), Some(0));
494    }
495
496    #[allow(clippy::bool_assert_comparison)]
497    #[test]
498    fn valid_eos() {
499        assert_eq!(SdkBody::from("hello").is_end_stream(), false);
500        assert_eq!(SdkBody::from("").is_end_stream(), true);
501    }
502
503    #[tokio::test]
504    async fn http_body_consumes_data() {
505        let mut body = SdkBody::from("hello!");
506        let mut body = Pin::new(&mut body);
507        assert!(!body.is_end_stream());
508        let data = body.next().await;
509        assert!(data.is_some());
510        let data = body.next().await;
511        assert!(data.is_none());
512        assert!(body.is_end_stream());
513    }
514
515    #[tokio::test]
516    async fn empty_body_returns_none() {
517        // Its important to avoid sending empty chunks of data to avoid H2 data frame problems
518        let mut body = SdkBody::from("");
519        let mut body = Pin::new(&mut body);
520        let data = body.next().await;
521        assert!(data.is_none());
522    }
523
524    #[test]
525    fn sdkbody_debug_once() {
526        let body = SdkBody::from("123");
527        assert!(format!("{:?}", body).contains("Once"));
528    }
529
530    #[test]
531    fn sdk_body_is_send() {
532        fn is_send<T: Send>() {}
533        is_send::<SdkBody>()
534    }
535}