1 1 | /*
|
2 2 | * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
3 3 | * SPDX-License-Identifier: Apache-2.0
|
4 4 | */
|
5 5 |
|
6 6 | //! Types for representing the body of an HTTP request or response
|
7 7 |
|
8 8 | use bytes::Bytes;
|
9 9 | use pin_project_lite::pin_project;
|
10 + | use std::collections::VecDeque;
|
10 11 | use std::error::Error as StdError;
|
11 12 | use std::fmt::{self, Debug, Formatter};
|
12 13 | use std::future::poll_fn;
|
13 14 | use std::pin::Pin;
|
14 15 | use std::sync::Arc;
|
15 16 | use std::task::{Context, Poll};
|
16 17 |
|
17 18 | /// This module is named after the `http-body` version number since we anticipate
|
18 19 | /// needing to provide equivalent functionality for 1.x of that crate in the future.
|
19 20 | /// The name has a suffix `_x` to avoid name collision with a third-party `http-body-0-4`.
|
20 21 | #[cfg(feature = "http-body-0-4-x")]
|
21 22 | pub mod http_body_0_4_x;
|
22 23 | #[cfg(feature = "http-body-1-x")]
|
23 24 | pub mod http_body_1_x;
|
24 25 |
|
25 26 | /// A generic, boxed error that's `Send` and `Sync`
|
26 27 | pub type Error = Box<dyn StdError + Send + Sync>;
|
27 28 |
|
28 29 | pin_project! {
|
29 30 | /// SdkBody type
|
30 31 | ///
|
31 32 | /// This is the Body used for dispatching all HTTP Requests.
|
32 33 | /// For handling responses, the type of the body will be controlled
|
33 34 | /// by the HTTP stack.
|
34 35 | ///
|
35 36 | pub struct SdkBody {
|
36 37 | #[pin]
|
37 38 | inner: Inner,
|
38 39 | // An optional function to recreate the inner body
|
39 40 | //
|
40 41 | // In the event of retry, this function will be called to generate a new body. See
|
41 42 | // [`try_clone()`](SdkBody::try_clone)
|
42 43 | rebuild: Option<Arc<dyn (Fn() -> Inner) + Send + Sync>>,
|
43 - | bytes_contents: Option<Bytes>
|
44 + | bytes_contents: Option<Bytes>,
|
45 + | // Here the optionality indicates whether we have started streaming trailers, and the
|
46 + | // VecDeque serves as a buffer for trailer frames that are polled by poll_next instead
|
47 + | // of poll_next_trailers
|
48 + | trailers: Option<VecDeque<http_1x::HeaderMap>>,
|
44 49 | }
|
45 50 | }
|
46 51 |
|
47 52 | impl Debug for SdkBody {
|
48 53 | fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
|
49 54 | f.debug_struct("SdkBody")
|
50 55 | .field("inner", &self.inner)
|
51 56 | .field("retryable", &self.rebuild.is_some())
|
52 57 | .finish()
|
53 58 | }
|
54 59 | }
|
55 60 |
|
56 61 | /// A boxed generic HTTP body that, when consumed, will result in [`Bytes`] or an [`Error`].
|
62 + | #[allow(dead_code)]
|
57 63 | enum BoxBody {
|
58 64 | // This is enabled by the **dependency**, not the feature. This allows us to construct it
|
59 65 | // whenever we have the dependency and keep the APIs private
|
60 66 | #[cfg(any(
|
61 67 | feature = "http-body-0-4-x",
|
62 68 | feature = "http-body-1-x",
|
63 69 | feature = "rt-tokio"
|
64 70 | ))]
|
65 71 | // will be dead code with `--no-default-features --features rt-tokio`
|
66 72 | HttpBody04(#[allow(dead_code)] http_body_0_4::combinators::BoxBody<Bytes, Error>),
|
73 + |
|
74 + | #[cfg(feature = "http-body-1-x")]
|
75 + | HttpBody1(#[allow(dead_code)] http_body_util::combinators::BoxBody<Bytes, Error>),
|
67 76 | }
|
68 77 |
|
69 78 | pin_project! {
|
70 79 | #[project = InnerProj]
|
71 80 | enum Inner {
|
72 81 | // An in-memory body
|
73 82 | Once {
|
74 83 | inner: Option<Bytes>
|
75 84 | },
|
76 85 | // A streaming body
|
77 86 | Dyn {
|
78 87 | #[pin]
|
79 88 | inner: BoxBody,
|
80 89 | },
|
81 90 |
|
82 91 | /// When a streaming body is transferred out to a stream parser, the body is replaced with
|
83 92 | /// `Taken`. This will return an Error when polled. Attempting to read data out of a `Taken`
|
84 93 | /// Body is a bug.
|
85 94 | Taken,
|
86 95 | }
|
87 96 | }
|
88 97 |
|
89 98 | impl Debug for Inner {
|
90 99 | fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
|
91 100 | match &self {
|
92 101 | Inner::Once { inner: once } => f.debug_tuple("Once").field(once).finish(),
|
93 102 | Inner::Dyn { .. } => write!(f, "BoxBody"),
|
94 103 | Inner::Taken => f.debug_tuple("Taken").finish(),
|
95 104 | }
|
96 105 | }
|
97 106 | }
|
98 107 |
|
99 108 | impl SdkBody {
|
100 109 | /// Construct an explicitly retryable SDK body
|
101 110 | ///
|
102 111 | /// _Note: This is probably not what you want_
|
103 112 | ///
|
104 113 | /// All bodies constructed from in-memory data (`String`, `Vec<u8>`, `Bytes`, etc.) will be
|
105 114 | /// retryable out of the box. If you want to read data from a file, you should use
|
106 115 | /// [`ByteStream::from_path`](crate::byte_stream::ByteStream::from_path). This function
|
107 116 | /// is only necessary when you need to enable retries for your own streaming container.
|
108 117 | pub fn retryable(f: impl Fn() -> SdkBody + Send + Sync + 'static) -> Self {
|
109 118 | let initial = f();
|
110 119 | SdkBody {
|
111 120 | inner: initial.inner,
|
112 121 | rebuild: Some(Arc::new(move || f().inner)),
|
113 122 | bytes_contents: initial.bytes_contents,
|
123 + | trailers: None,
|
114 124 | }
|
115 125 | }
|
116 126 |
|
117 127 | /// When an SdkBody is read, the inner data must be consumed. In order to do this, the SdkBody
|
118 128 | /// is swapped with a "taken" body. This "taken" body cannot be read but aids in debugging.
|
119 129 | pub fn taken() -> Self {
|
120 130 | Self {
|
121 131 | inner: Inner::Taken,
|
122 132 | rebuild: None,
|
123 133 | bytes_contents: None,
|
134 + | trailers: None,
|
124 135 | }
|
125 136 | }
|
126 137 |
|
127 138 | /// Create an empty SdkBody for requests and responses that don't transfer any data in the body.
|
128 139 | pub fn empty() -> Self {
|
129 140 | Self {
|
130 141 | inner: Inner::Once { inner: None },
|
131 142 | rebuild: Some(Arc::new(|| Inner::Once { inner: None })),
|
132 143 | bytes_contents: Some(Bytes::new()),
|
144 + | trailers: None,
|
133 145 | }
|
134 146 | }
|
135 147 |
|
136 148 | pub(crate) async fn next(&mut self) -> Option<Result<Bytes, Error>> {
|
137 149 | let mut me = Pin::new(self);
|
138 150 | poll_fn(|cx| me.as_mut().poll_next(cx)).await
|
139 151 | }
|
140 152 |
|
141 153 | pub(crate) fn poll_next(
|
142 154 | self: Pin<&mut Self>,
|
143 155 | #[allow(unused)] cx: &mut Context<'_>,
|
144 156 | ) -> Poll<Option<Result<Bytes, Error>>> {
|
145 157 | let this = self.project();
|
146 158 | match this.inner.project() {
|
147 159 | InnerProj::Once { ref mut inner } => {
|
148 160 | let data = inner.take();
|
149 161 | match data {
|
150 162 | Some(bytes) if bytes.is_empty() => Poll::Ready(None),
|
151 163 | Some(bytes) => Poll::Ready(Some(Ok(bytes))),
|
152 164 | None => Poll::Ready(None),
|
153 165 | }
|
154 166 | }
|
155 167 | InnerProj::Dyn { inner: body } => match body.get_mut() {
|
156 168 | #[cfg(feature = "http-body-0-4-x")]
|
157 169 | BoxBody::HttpBody04(box_body) => {
|
158 170 | use http_body_0_4::Body;
|
159 171 | Pin::new(box_body).poll_data(cx)
|
160 172 | }
|
173 + | #[cfg(feature = "http-body-1-x")]
|
174 + | BoxBody::HttpBody1(box_body) => {
|
175 + | // If this is polled after the trailers have been cached end early
|
176 + | if this.trailers.is_some() {
|
177 + | return Poll::Ready(None);
|
178 + | }
|
179 + | use http_body_1_0::Body;
|
180 + | let maybe_data = Pin::new(box_body).poll_frame(cx);
|
181 + | match maybe_data {
|
182 + | Poll::Ready(Some(Ok(frame))) => {
|
183 + | if frame.is_data() {
|
184 + | Poll::Ready(Some(Ok(frame
|
185 + | .into_data()
|
186 + | .expect("Confirmed data frame"))))
|
187 + | } else if frame.is_trailers() {
|
188 + | let trailers =
|
189 + | frame.into_trailers().expect("Confirmed trailer frame");
|
190 + | // Buffer the trailers for the trailer poll
|
191 + | this.trailers.get_or_insert_with(VecDeque::new).push_back(trailers);
|
192 + |
|
193 + | Poll::Ready(None)
|
194 + | } else {
|
195 + | unreachable!("Frame must be either data or trailers");
|
196 + | }
|
197 + | }
|
198 + | Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err))),
|
199 + | Poll::Ready(None) => Poll::Ready(None),
|
200 + | Poll::Pending => Poll::Pending,
|
201 + | }
|
202 + | }
|
161 203 | #[allow(unreachable_patterns)]
|
162 204 | _ => unreachable!(
|
163 - | "enabling `http-body-0-4-x` is the only way to create the `Dyn` variant"
|
205 + | "enabling `http-body-0-4-x` or `http-body-1-x` is the only way to create the `Dyn` variant"
|
164 206 | ),
|
165 207 | },
|
166 208 | InnerProj::Taken => {
|
167 209 | Poll::Ready(Some(Err("A `Taken` body should never be polled".into())))
|
168 210 | }
|
169 211 | }
|
170 212 | }
|
171 213 |
|
214 + | #[allow(dead_code)]
|
172 215 | #[cfg(any(
|
173 216 | feature = "http-body-0-4-x",
|
174 217 | feature = "http-body-1-x",
|
175 218 | feature = "rt-tokio"
|
176 219 | ))]
|
177 220 | pub(crate) fn from_body_0_4_internal<T, E>(body: T) -> Self
|
178 221 | where
|
179 222 | T: http_body_0_4::Body<Data = Bytes, Error = E> + Send + Sync + 'static,
|
180 223 | E: Into<Error> + 'static,
|
181 224 | {
|
182 225 | Self {
|
183 226 | inner: Inner::Dyn {
|
184 227 | inner: BoxBody::HttpBody04(http_body_0_4::combinators::BoxBody::new(
|
185 228 | body.map_err(Into::into),
|
186 229 | )),
|
187 230 | },
|
188 231 | rebuild: None,
|
189 232 | bytes_contents: None,
|
233 + | trailers: None,
|
234 + | }
|
235 + | }
|
236 + |
|
237 + | #[cfg(feature = "http-body-1-x")]
|
238 + | pub(crate) fn from_body_1_x_internal<T, E>(body: T) -> Self
|
239 + | where
|
240 + | T: http_body_1_0::Body<Data = Bytes, Error = E> + Send + Sync + 'static,
|
241 + | E: Into<Error> + 'static,
|
242 + | {
|
243 + | use http_body_util::BodyExt;
|
244 + | Self {
|
245 + | inner: Inner::Dyn {
|
246 + | inner: BoxBody::HttpBody1(http_body_util::combinators::BoxBody::new(
|
247 + | body.map_err(Into::into),
|
248 + | )),
|
249 + | },
|
250 + | rebuild: None,
|
251 + | bytes_contents: None,
|
252 + | trailers: None,
|
190 253 | }
|
191 254 | }
|
192 255 |
|
193 256 | #[cfg(any(feature = "http-body-0-4-x", feature = "http-body-1-x",))]
|
194 257 | pub(crate) fn poll_next_trailers(
|
195 258 | self: Pin<&mut Self>,
|
196 259 | cx: &mut Context<'_>,
|
197 - | ) -> Poll<Result<Option<http::HeaderMap<http::HeaderValue>>, Error>> {
|
260 + | ) -> Poll<Result<Option<http_1x::HeaderMap<http_1x::HeaderValue>>, Error>> {
|
261 + | // Three cases that matter here:
|
262 + | // 1) Both http-body features disabled, doesn't matter because this func won't compile
|
263 + | // 2) http-body-0-4-x enabled but 1-x disabled, we use the http_body_0_4_x conversion
|
264 + | // 3) http-body-1-x enabled (and 0-4-x is enabled or disabled), we use the 1-x conversion
|
265 + | // as our default whenever it is available
|
266 + | #[cfg(all(feature = "http-body-0-4-x", not(feature = "http-body-1-x")))]
|
267 + | use crate::body::http_body_0_4_x::convert_headers_0x_1x;
|
268 + | #[cfg(feature = "http-body-1-x")]
|
269 + | use crate::body::http_body_1_x::convert_headers_0x_1x;
|
270 + |
|
198 271 | let this = self.project();
|
199 272 | match this.inner.project() {
|
200 273 | InnerProj::Once { .. } => Poll::Ready(Ok(None)),
|
201 274 | InnerProj::Dyn { inner } => match inner.get_mut() {
|
202 275 | BoxBody::HttpBody04(box_body) => {
|
203 276 | use http_body_0_4::Body;
|
204 - | Pin::new(box_body).poll_trailers(cx)
|
277 + | let polled = Pin::new(box_body).poll_trailers(cx);
|
278 + |
|
279 + | match polled {
|
280 + | Poll::Ready(Ok(maybe_trailers)) => {
|
281 + | let http_1x_trailers = maybe_trailers.map(convert_headers_0x_1x);
|
282 + | Poll::Ready(Ok(http_1x_trailers))
|
283 + | }
|
284 + | Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
|
285 + | Poll::Pending => Poll::Pending,
|
286 + | }
|
287 + | }
|
288 + | #[cfg(feature = "http-body-1-x")]
|
289 + | BoxBody::HttpBody1(box_body) => {
|
290 + | use http_body_1_0::Body;
|
291 + | // Return the cached trailers without polling
|
292 + | if let Some(trailer_buf) = this.trailers {
|
293 + | if let Some(next_trailer) = trailer_buf.pop_front() {
|
294 + | return Poll::Ready(Ok(Some(next_trailer)));
|
295 + | }
|
296 + | }
|
297 + |
|
298 + | let polled = Pin::new(box_body).poll_frame(cx);
|
299 + | match polled {
|
300 + | Poll::Ready(Some(Ok(maybe_trailers))) => {
|
301 + | if maybe_trailers.is_data() {
|
302 + | Poll::Ready(Err("Trailers polled while body still has data".into()))
|
303 + | } else {
|
304 + | let trailers = maybe_trailers
|
305 + | .into_trailers()
|
306 + | .expect("Frame must be trailers because it is not data");
|
307 + | Poll::Ready(Ok(Some(trailers)))
|
308 + | }
|
309 + | }
|
310 + | Poll::Ready(None) => Poll::Ready(Ok(None)),
|
311 + | Poll::Ready(Some(Err(err))) => Poll::Ready(Err(err)),
|
312 + | Poll::Pending => Poll::Pending,
|
313 + | }
|
205 314 | }
|
206 315 | },
|
207 316 | InnerProj::Taken => Poll::Ready(Err(
|
208 317 | "A `Taken` body should never be polled for trailers".into(),
|
209 318 | )),
|
210 319 | }
|
211 320 | }
|
212 321 |
|
213 322 | /// If possible, return a reference to this body as `&[u8]`
|
214 323 | ///
|
215 324 | /// If this SdkBody is NOT streaming, this will return the byte slab
|
216 325 | /// If this SdkBody is streaming, this will return `None`
|
217 326 | pub fn bytes(&self) -> Option<&[u8]> {
|
218 327 | match &self.bytes_contents {
|
219 328 | Some(b) => Some(b),
|
220 329 | None => None,
|
221 330 | }
|
222 331 | }
|
223 332 |
|
224 333 | /// Attempt to clone this SdkBody. This will fail if the inner data is not cloneable, such as when
|
225 334 | /// it is a single-use stream that can't be recreated.
|
226 335 | pub fn try_clone(&self) -> Option<Self> {
|
227 336 | self.rebuild.as_ref().map(|rebuild| {
|
228 337 | let next = rebuild();
|
229 338 | Self {
|
230 339 | inner: next,
|
231 340 | rebuild: self.rebuild.clone(),
|
232 341 | bytes_contents: self.bytes_contents.clone(),
|
342 + | trailers: self.trailers.clone(),
|
233 343 | }
|
234 344 | })
|
235 345 | }
|
236 346 |
|
237 347 | /// Return `true` if this SdkBody is streaming, `false` if it is in-memory.
|
238 348 | pub fn is_streaming(&self) -> bool {
|
239 349 | matches!(self.inner, Inner::Dyn { .. })
|
240 350 | }
|
241 351 |
|
242 352 | /// Return the length, in bytes, of this SdkBody. If this returns `None`, then the body does not
|
243 353 | /// have a known length.
|
244 354 | pub fn content_length(&self) -> Option<u64> {
|
245 355 | match self.bounds_on_remaining_length() {
|
246 356 | (lo, Some(hi)) if lo == hi => Some(lo),
|
247 357 | _ => None,
|
248 358 | }
|
249 359 | }
|
250 360 |
|
251 361 | #[allow(dead_code)] // used by a feature-gated `http-body`'s trait method
|
252 362 | pub(crate) fn is_end_stream(&self) -> bool {
|
253 363 | match &self.inner {
|
254 364 | Inner::Once { inner: None } => true,
|
255 365 | Inner::Once { inner: Some(bytes) } => bytes.is_empty(),
|
256 366 | Inner::Dyn { inner: box_body } => match box_body {
|
257 367 | #[cfg(feature = "http-body-0-4-x")]
|
258 368 | BoxBody::HttpBody04(box_body) => {
|
259 369 | use http_body_0_4::Body;
|
260 370 | box_body.is_end_stream()
|
261 371 | }
|
372 + | #[cfg(feature = "http-body-1-x")]
|
373 + | BoxBody::HttpBody1(box_body) => {
|
374 + | use http_body_1_0::Body;
|
375 + | box_body.is_end_stream()
|
376 + | }
|
262 377 | #[allow(unreachable_patterns)]
|
263 378 | _ => unreachable!(
|
264 - | "enabling `http-body-0-4-x` is the only way to create the `Dyn` variant"
|
379 + | "enabling `http-body-0-4-x` or `http-body-1-x` is the only way to create the `Dyn` variant"
|
265 380 | ),
|
266 381 | },
|
267 382 | Inner::Taken => true,
|
268 383 | }
|
269 384 | }
|
270 385 |
|
271 386 | pub(crate) fn bounds_on_remaining_length(&self) -> (u64, Option<u64>) {
|
272 387 | match &self.inner {
|
273 388 | Inner::Once { inner: None } => (0, Some(0)),
|
274 389 | Inner::Once { inner: Some(bytes) } => {
|
275 390 | let len = bytes.len() as u64;
|
276 391 | (len, Some(len))
|
277 392 | }
|
278 393 | Inner::Dyn { inner: box_body } => match box_body {
|
279 394 | #[cfg(feature = "http-body-0-4-x")]
|
280 395 | BoxBody::HttpBody04(box_body) => {
|
281 396 | use http_body_0_4::Body;
|
282 397 | let hint = box_body.size_hint();
|
283 398 | (hint.lower(), hint.upper())
|
284 399 | }
|
400 + | #[cfg(feature = "http-body-1-x")]
|
401 + | BoxBody::HttpBody1(box_body) => {
|
402 + | use http_body_1_0::Body;
|
403 + | let hint = box_body.size_hint();
|
404 + | (hint.lower(), hint.upper())
|
405 + | }
|
285 406 | #[allow(unreachable_patterns)]
|
286 407 | _ => unreachable!(
|
287 - | "enabling `http-body-0-4-x` is the only way to create the `Dyn` variant"
|
408 + | "enabling `http-body-0-4-x` or `http-body-1-x` is the only way to create the `Dyn` variant"
|
288 409 | ),
|
289 410 | },
|
290 411 | Inner::Taken => (0, Some(0)),
|
291 412 | }
|
292 413 | }
|
293 414 |
|
294 415 | /// Given a function to modify an `SdkBody`, run that function against this `SdkBody` before
|
295 416 | /// returning the result.
|
296 417 | pub fn map(self, f: impl Fn(SdkBody) -> SdkBody + Sync + Send + 'static) -> SdkBody {
|
297 418 | if self.rebuild.is_some() {
|
298 419 | SdkBody::retryable(move || f(self.try_clone().unwrap()))
|
299 420 | } else {
|
300 421 | f(self)
|
301 422 | }
|
302 423 | }
|
303 424 |
|
304 425 | /// Update this `SdkBody` with `map`. **This function MUST NOT alter the data of the body.**
|
305 426 | ///
|
306 427 | /// This function is useful for adding metadata like progress tracking to an [`SdkBody`] that
|
307 428 | /// does not alter the actual byte data. If your mapper alters the contents of the body, use [`SdkBody::map`]
|
308 429 | /// instead.
|
309 430 | pub fn map_preserve_contents(
|
310 431 | self,
|
311 432 | f: impl Fn(SdkBody) -> SdkBody + Sync + Send + 'static,
|
312 433 | ) -> SdkBody {
|
313 434 | let contents = self.bytes_contents.clone();
|
314 435 | let mut out = if self.rebuild.is_some() {
|
315 436 | SdkBody::retryable(move || f(self.try_clone().unwrap()))
|
316 437 | } else {
|
317 438 | f(self)
|
318 439 | };
|
319 440 | out.bytes_contents = contents;
|
320 441 | out
|
321 442 | }
|
322 443 | }
|
323 444 |
|
324 445 | impl From<&str> for SdkBody {
|
325 446 | fn from(s: &str) -> Self {
|
326 447 | Self::from(s.as_bytes())
|
327 448 | }
|
328 449 | }
|
329 450 |
|
330 451 | impl From<Bytes> for SdkBody {
|
331 452 | fn from(bytes: Bytes) -> Self {
|
332 453 | let b = bytes.clone();
|
333 454 | SdkBody {
|
334 455 | inner: Inner::Once {
|
335 456 | inner: Some(bytes.clone()),
|
336 457 | },
|
337 458 | rebuild: Some(Arc::new(move || Inner::Once {
|
338 459 | inner: Some(bytes.clone()),
|
339 460 | })),
|
340 461 | bytes_contents: Some(b),
|
462 + | trailers: None,
|
341 463 | }
|
342 464 | }
|
343 465 | }
|
344 466 |
|
345 467 | impl From<Vec<u8>> for SdkBody {
|
346 468 | fn from(data: Vec<u8>) -> Self {
|
347 469 | Self::from(Bytes::from(data))
|
348 470 | }
|
349 471 | }
|
350 472 |
|