1use bytes::Bytes;
9use pin_project_lite::pin_project;
10use std::collections::VecDeque;
11use std::error::Error as StdError;
12use std::fmt::{self, Debug, Formatter};
13use std::future::poll_fn;
14use std::pin::Pin;
15use std::sync::Arc;
16use std::task::{Context, Poll};
17
18#[cfg(feature = "http-body-0-4-x")]
22pub mod http_body_0_4_x;
23#[cfg(feature = "http-body-1-x")]
24pub mod http_body_1_x;
25
26pub type Error = Box<dyn StdError + Send + Sync>;
28
29pin_project! {
30 pub struct SdkBody {
37 #[pin]
38 inner: Inner,
39 rebuild: Option<Arc<dyn (Fn() -> Inner) + Send + Sync>>,
44 bytes_contents: Option<Bytes>,
45 trailers: Option<VecDeque<http_1x::HeaderMap>>,
49 }
50}
51
52impl Debug for SdkBody {
53 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
54 f.debug_struct("SdkBody")
55 .field("inner", &self.inner)
56 .field("retryable", &self.rebuild.is_some())
57 .finish()
58 }
59}
60
61#[allow(dead_code)]
63enum BoxBody {
64 #[cfg(any(
67 feature = "http-body-0-4-x",
68 feature = "http-body-1-x",
69 feature = "rt-tokio"
70 ))]
71 HttpBody04(#[allow(dead_code)] http_body_0_4::combinators::BoxBody<Bytes, Error>),
73
74 #[cfg(feature = "http-body-1-x")]
75 HttpBody1(#[allow(dead_code)] http_body_util::combinators::BoxBody<Bytes, Error>),
76}
77
78pin_project! {
79 #[project = InnerProj]
80 enum Inner {
81 Once {
83 inner: Option<Bytes>
84 },
85 Dyn {
87 #[pin]
88 inner: BoxBody,
89 },
90
91 Taken,
95 }
96}
97
98impl Debug for Inner {
99 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
100 match &self {
101 Inner::Once { inner: once } => f.debug_tuple("Once").field(once).finish(),
102 Inner::Dyn { .. } => write!(f, "BoxBody"),
103 Inner::Taken => f.debug_tuple("Taken").finish(),
104 }
105 }
106}
107
108impl SdkBody {
109 pub fn retryable(f: impl Fn() -> SdkBody + Send + Sync + 'static) -> Self {
118 let initial = f();
119 SdkBody {
120 inner: initial.inner,
121 rebuild: Some(Arc::new(move || f().inner)),
122 bytes_contents: initial.bytes_contents,
123 trailers: None,
124 }
125 }
126
127 pub fn taken() -> Self {
130 Self {
131 inner: Inner::Taken,
132 rebuild: None,
133 bytes_contents: None,
134 trailers: None,
135 }
136 }
137
138 pub fn empty() -> Self {
140 Self {
141 inner: Inner::Once { inner: None },
142 rebuild: Some(Arc::new(|| Inner::Once { inner: None })),
143 bytes_contents: Some(Bytes::new()),
144 trailers: None,
145 }
146 }
147
148 pub(crate) async fn next(&mut self) -> Option<Result<Bytes, Error>> {
149 let mut me = Pin::new(self);
150 poll_fn(|cx| me.as_mut().poll_next(cx)).await
151 }
152
153 pub(crate) fn poll_next(
154 self: Pin<&mut Self>,
155 #[allow(unused)] cx: &mut Context<'_>,
156 ) -> Poll<Option<Result<Bytes, Error>>> {
157 let this = self.project();
158 match this.inner.project() {
159 InnerProj::Once { ref mut inner } => {
160 let data = inner.take();
161 match data {
162 Some(bytes) if bytes.is_empty() => Poll::Ready(None),
163 Some(bytes) => Poll::Ready(Some(Ok(bytes))),
164 None => Poll::Ready(None),
165 }
166 }
167 InnerProj::Dyn { inner: body } => match body.get_mut() {
168 #[cfg(feature = "http-body-0-4-x")]
169 BoxBody::HttpBody04(box_body) => {
170 use http_body_0_4::Body;
171 Pin::new(box_body).poll_data(cx)
172 }
173 #[cfg(feature = "http-body-1-x")]
174 BoxBody::HttpBody1(box_body) => {
175 if this.trailers.is_some() {
177 return Poll::Ready(None);
178 }
179 use http_body_1_0::Body;
180 let maybe_data = Pin::new(box_body).poll_frame(cx);
181 match maybe_data {
182 Poll::Ready(Some(Ok(frame))) => {
183 if frame.is_data() {
184 Poll::Ready(Some(Ok(frame
185 .into_data()
186 .expect("Confirmed data frame"))))
187 } else if frame.is_trailers() {
188 let trailers =
189 frame.into_trailers().expect("Confirmed trailer frame");
190 this.trailers.get_or_insert_with(VecDeque::new).push_back(trailers);
192
193 Poll::Ready(None)
194 } else {
195 unreachable!("Frame must be either data or trailers");
196 }
197 }
198 Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err))),
199 Poll::Ready(None) => Poll::Ready(None),
200 Poll::Pending => Poll::Pending,
201 }
202 }
203 #[allow(unreachable_patterns)]
204 _ => unreachable!(
205 "enabling `http-body-0-4-x` or `http-body-1-x` is the only way to create the `Dyn` variant"
206 ),
207 },
208 InnerProj::Taken => {
209 Poll::Ready(Some(Err("A `Taken` body should never be polled".into())))
210 }
211 }
212 }
213
214 #[allow(dead_code)]
215 #[cfg(any(
216 feature = "http-body-0-4-x",
217 feature = "http-body-1-x",
218 feature = "rt-tokio"
219 ))]
220 pub(crate) fn from_body_0_4_internal<T, E>(body: T) -> Self
221 where
222 T: http_body_0_4::Body<Data = Bytes, Error = E> + Send + Sync + 'static,
223 E: Into<Error> + 'static,
224 {
225 Self {
226 inner: Inner::Dyn {
227 inner: BoxBody::HttpBody04(http_body_0_4::combinators::BoxBody::new(
228 body.map_err(Into::into),
229 )),
230 },
231 rebuild: None,
232 bytes_contents: None,
233 trailers: None,
234 }
235 }
236
237 #[cfg(feature = "http-body-1-x")]
238 pub(crate) fn from_body_1_x_internal<T, E>(body: T) -> Self
239 where
240 T: http_body_1_0::Body<Data = Bytes, Error = E> + Send + Sync + 'static,
241 E: Into<Error> + 'static,
242 {
243 use http_body_util::BodyExt;
244 Self {
245 inner: Inner::Dyn {
246 inner: BoxBody::HttpBody1(http_body_util::combinators::BoxBody::new(
247 body.map_err(Into::into),
248 )),
249 },
250 rebuild: None,
251 bytes_contents: None,
252 trailers: None,
253 }
254 }
255
256 #[cfg(any(feature = "http-body-0-4-x", feature = "http-body-1-x",))]
257 pub(crate) fn poll_next_trailers(
258 self: Pin<&mut Self>,
259 cx: &mut Context<'_>,
260 ) -> Poll<Result<Option<http_1x::HeaderMap<http_1x::HeaderValue>>, Error>> {
261 #[cfg(all(feature = "http-body-0-4-x", not(feature = "http-body-1-x")))]
267 use crate::body::http_body_0_4_x::convert_headers_0x_1x;
268 #[cfg(feature = "http-body-1-x")]
269 use crate::body::http_body_1_x::convert_headers_0x_1x;
270
271 let this = self.project();
272 match this.inner.project() {
273 InnerProj::Once { .. } => Poll::Ready(Ok(None)),
274 InnerProj::Dyn { inner } => match inner.get_mut() {
275 BoxBody::HttpBody04(box_body) => {
276 use http_body_0_4::Body;
277 let polled = Pin::new(box_body).poll_trailers(cx);
278
279 match polled {
280 Poll::Ready(Ok(maybe_trailers)) => {
281 let http_1x_trailers = maybe_trailers.map(convert_headers_0x_1x);
282 Poll::Ready(Ok(http_1x_trailers))
283 }
284 Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
285 Poll::Pending => Poll::Pending,
286 }
287 }
288 #[cfg(feature = "http-body-1-x")]
289 BoxBody::HttpBody1(box_body) => {
290 use http_body_1_0::Body;
291 if let Some(trailer_buf) = this.trailers {
293 if let Some(next_trailer) = trailer_buf.pop_front() {
294 return Poll::Ready(Ok(Some(next_trailer)));
295 }
296 }
297
298 let polled = Pin::new(box_body).poll_frame(cx);
299 match polled {
300 Poll::Ready(Some(Ok(maybe_trailers))) => {
301 if maybe_trailers.is_data() {
302 Poll::Ready(Err("Trailers polled while body still has data".into()))
303 } else {
304 let trailers = maybe_trailers
305 .into_trailers()
306 .expect("Frame must be trailers because it is not data");
307 Poll::Ready(Ok(Some(trailers)))
308 }
309 }
310 Poll::Ready(None) => Poll::Ready(Ok(None)),
311 Poll::Ready(Some(Err(err))) => Poll::Ready(Err(err)),
312 Poll::Pending => Poll::Pending,
313 }
314 }
315 },
316 InnerProj::Taken => Poll::Ready(Err(
317 "A `Taken` body should never be polled for trailers".into(),
318 )),
319 }
320 }
321
322 pub fn bytes(&self) -> Option<&[u8]> {
327 match &self.bytes_contents {
328 Some(b) => Some(b),
329 None => None,
330 }
331 }
332
333 pub fn try_clone(&self) -> Option<Self> {
336 self.rebuild.as_ref().map(|rebuild| {
337 let next = rebuild();
338 Self {
339 inner: next,
340 rebuild: self.rebuild.clone(),
341 bytes_contents: self.bytes_contents.clone(),
342 trailers: self.trailers.clone(),
343 }
344 })
345 }
346
347 pub fn is_streaming(&self) -> bool {
349 matches!(self.inner, Inner::Dyn { .. })
350 }
351
352 pub fn content_length(&self) -> Option<u64> {
355 match self.bounds_on_remaining_length() {
356 (lo, Some(hi)) if lo == hi => Some(lo),
357 _ => None,
358 }
359 }
360
361 #[allow(dead_code)] pub(crate) fn is_end_stream(&self) -> bool {
363 match &self.inner {
364 Inner::Once { inner: None } => true,
365 Inner::Once { inner: Some(bytes) } => bytes.is_empty(),
366 Inner::Dyn { inner: box_body } => match box_body {
367 #[cfg(feature = "http-body-0-4-x")]
368 BoxBody::HttpBody04(box_body) => {
369 use http_body_0_4::Body;
370 box_body.is_end_stream()
371 }
372 #[cfg(feature = "http-body-1-x")]
373 BoxBody::HttpBody1(box_body) => {
374 use http_body_1_0::Body;
375 box_body.is_end_stream()
376 }
377 #[allow(unreachable_patterns)]
378 _ => unreachable!(
379 "enabling `http-body-0-4-x` or `http-body-1-x` is the only way to create the `Dyn` variant"
380 ),
381 },
382 Inner::Taken => true,
383 }
384 }
385
386 pub(crate) fn bounds_on_remaining_length(&self) -> (u64, Option<u64>) {
387 match &self.inner {
388 Inner::Once { inner: None } => (0, Some(0)),
389 Inner::Once { inner: Some(bytes) } => {
390 let len = bytes.len() as u64;
391 (len, Some(len))
392 }
393 Inner::Dyn { inner: box_body } => match box_body {
394 #[cfg(feature = "http-body-0-4-x")]
395 BoxBody::HttpBody04(box_body) => {
396 use http_body_0_4::Body;
397 let hint = box_body.size_hint();
398 (hint.lower(), hint.upper())
399 }
400 #[cfg(feature = "http-body-1-x")]
401 BoxBody::HttpBody1(box_body) => {
402 use http_body_1_0::Body;
403 let hint = box_body.size_hint();
404 (hint.lower(), hint.upper())
405 }
406 #[allow(unreachable_patterns)]
407 _ => unreachable!(
408 "enabling `http-body-0-4-x` or `http-body-1-x` is the only way to create the `Dyn` variant"
409 ),
410 },
411 Inner::Taken => (0, Some(0)),
412 }
413 }
414
415 pub fn map(self, f: impl Fn(SdkBody) -> SdkBody + Sync + Send + 'static) -> SdkBody {
418 if self.rebuild.is_some() {
419 SdkBody::retryable(move || f(self.try_clone().unwrap()))
420 } else {
421 f(self)
422 }
423 }
424
425 pub fn map_preserve_contents(
431 self,
432 f: impl Fn(SdkBody) -> SdkBody + Sync + Send + 'static,
433 ) -> SdkBody {
434 let contents = self.bytes_contents.clone();
435 let mut out = if self.rebuild.is_some() {
436 SdkBody::retryable(move || f(self.try_clone().unwrap()))
437 } else {
438 f(self)
439 };
440 out.bytes_contents = contents;
441 out
442 }
443}
444
445impl From<&str> for SdkBody {
446 fn from(s: &str) -> Self {
447 Self::from(s.as_bytes())
448 }
449}
450
451impl From<Bytes> for SdkBody {
452 fn from(bytes: Bytes) -> Self {
453 let b = bytes.clone();
454 SdkBody {
455 inner: Inner::Once {
456 inner: Some(bytes.clone()),
457 },
458 rebuild: Some(Arc::new(move || Inner::Once {
459 inner: Some(bytes.clone()),
460 })),
461 bytes_contents: Some(b),
462 trailers: None,
463 }
464 }
465}
466
467impl From<Vec<u8>> for SdkBody {
468 fn from(data: Vec<u8>) -> Self {
469 Self::from(Bytes::from(data))
470 }
471}
472
473impl From<String> for SdkBody {
474 fn from(s: String) -> Self {
475 Self::from(s.into_bytes())
476 }
477}
478
479impl From<&[u8]> for SdkBody {
480 fn from(data: &[u8]) -> Self {
481 Self::from(Bytes::copy_from_slice(data))
482 }
483}
484
485#[cfg(test)]
486mod test {
487 use crate::body::SdkBody;
488 use std::pin::Pin;
489
490 #[test]
491 fn valid_size_hint() {
492 assert_eq!(SdkBody::from("hello").content_length(), Some(5));
493 assert_eq!(SdkBody::from("").content_length(), Some(0));
494 }
495
496 #[allow(clippy::bool_assert_comparison)]
497 #[test]
498 fn valid_eos() {
499 assert_eq!(SdkBody::from("hello").is_end_stream(), false);
500 assert_eq!(SdkBody::from("").is_end_stream(), true);
501 }
502
503 #[tokio::test]
504 async fn http_body_consumes_data() {
505 let mut body = SdkBody::from("hello!");
506 let mut body = Pin::new(&mut body);
507 assert!(!body.is_end_stream());
508 let data = body.next().await;
509 assert!(data.is_some());
510 let data = body.next().await;
511 assert!(data.is_none());
512 assert!(body.is_end_stream());
513 }
514
515 #[tokio::test]
516 async fn empty_body_returns_none() {
517 let mut body = SdkBody::from("");
519 let mut body = Pin::new(&mut body);
520 let data = body.next().await;
521 assert!(data.is_none());
522 }
523
524 #[test]
525 fn sdkbody_debug_once() {
526 let body = SdkBody::from("123");
527 assert!(format!("{:?}", body).contains("Once"));
528 }
529
530 #[test]
531 fn sdk_body_is_send() {
532 fn is_send<T: Send>() {}
533 is_send::<SdkBody>()
534 }
535}