1use bytes::{Bytes, BytesMut};
7use pin_project_lite::pin_project;
8
9use std::pin::Pin;
10use std::task::{Context, Poll};
11
12const CRLF: &str = "\r\n";
13const CRLF_RAW: &[u8] = b"\r\n";
14
15const CHUNK_TERMINATOR: &str = "0\r\n";
16const CHUNK_TERMINATOR_RAW: &[u8] = b"0\r\n";
17
18const TRAILER_SEPARATOR: &[u8] = b":";
19
20pub mod header_value {
22 pub const AWS_CHUNKED: &str = "aws-chunked";
24}
25
26#[derive(Debug, Default)]
28#[non_exhaustive]
29pub struct AwsChunkedBodyOptions {
30 stream_length: u64,
34 trailer_lengths: Vec<u64>,
37}
38
39impl AwsChunkedBodyOptions {
40 pub fn new(stream_length: u64, trailer_lengths: Vec<u64>) -> Self {
42 Self {
43 stream_length,
44 trailer_lengths,
45 }
46 }
47
48 fn total_trailer_length(&self) -> u64 {
49 self.trailer_lengths.iter().sum::<u64>()
50 + (self.trailer_lengths.len() * CRLF.len()) as u64
52 }
53
54 pub fn with_trailer_len(mut self, trailer_len: u64) -> Self {
56 self.trailer_lengths.push(trailer_len);
57 self
58 }
59}
60
61#[derive(Debug, PartialEq, Eq)]
62enum AwsChunkedBodyState {
63 WritingChunkSize,
66 WritingChunk,
70 WritingTrailers,
73 Closed,
75}
76
77pin_project! {
78 #[derive(Debug)]
99 pub struct AwsChunkedBody<InnerBody> {
100 #[pin]
101 inner: InnerBody,
102 #[pin]
103 state: AwsChunkedBodyState,
104 options: AwsChunkedBodyOptions,
105 inner_body_bytes_read_so_far: usize,
106 }
107}
108
109impl<Inner> AwsChunkedBody<Inner> {
110 pub fn new(body: Inner, options: AwsChunkedBodyOptions) -> Self {
112 Self {
113 inner: body,
114 state: AwsChunkedBodyState::WritingChunkSize,
115 options,
116 inner_body_bytes_read_so_far: 0,
117 }
118 }
119
120 fn encoded_length(&self) -> u64 {
121 let mut length = 0;
122 if self.options.stream_length != 0 {
123 length += get_unsigned_chunk_bytes_length(self.options.stream_length);
124 }
125
126 length += CHUNK_TERMINATOR.len() as u64;
128
129 for len in self.options.trailer_lengths.iter() {
131 length += len + CRLF.len() as u64;
132 }
133
134 length += CRLF.len() as u64;
136
137 length
138 }
139}
140
141impl<Inner> http_body_04x::Body for AwsChunkedBody<Inner>
142where
143 Inner: http_body_04x::Body<Data = Bytes, Error = aws_smithy_types::body::Error>,
144{
145 type Data = Bytes;
146 type Error = aws_smithy_types::body::Error;
147
148 fn poll_data(
149 self: Pin<&mut Self>,
150 cx: &mut Context<'_>,
151 ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
152 tracing::trace!(state = ?self.state, "polling AwsChunkedBody");
153 let mut this = self.project();
154
155 match *this.state {
156 AwsChunkedBodyState::WritingChunkSize => {
157 if this.options.stream_length == 0 {
158 *this.state = AwsChunkedBodyState::WritingTrailers;
160 tracing::trace!("stream is empty, writing chunk terminator");
161 Poll::Ready(Some(Ok(Bytes::from([CHUNK_TERMINATOR].concat()))))
162 } else {
163 *this.state = AwsChunkedBodyState::WritingChunk;
164 let chunk_size = format!("{:X?}{CRLF}", this.options.stream_length);
166 tracing::trace!(%chunk_size, "writing chunk size");
167 let chunk_size = Bytes::from(chunk_size);
168 Poll::Ready(Some(Ok(chunk_size)))
169 }
170 }
171 AwsChunkedBodyState::WritingChunk => match this.inner.poll_data(cx) {
172 Poll::Ready(Some(Ok(data))) => {
173 tracing::trace!(len = data.len(), "writing chunk data");
174 *this.inner_body_bytes_read_so_far += data.len();
175 Poll::Ready(Some(Ok(data)))
176 }
177 Poll::Ready(None) => {
178 let actual_stream_length = *this.inner_body_bytes_read_so_far as u64;
179 let expected_stream_length = this.options.stream_length;
180 if actual_stream_length != expected_stream_length {
181 let err = Box::new(AwsChunkedBodyError::StreamLengthMismatch {
182 actual: actual_stream_length,
183 expected: expected_stream_length,
184 });
185 return Poll::Ready(Some(Err(err)));
186 };
187
188 tracing::trace!("no more chunk data, writing CRLF and chunk terminator");
189 *this.state = AwsChunkedBodyState::WritingTrailers;
190 Poll::Ready(Some(Ok(Bytes::from([CRLF, CHUNK_TERMINATOR].concat()))))
193 }
194 Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
195 Poll::Pending => Poll::Pending,
196 },
197 AwsChunkedBodyState::WritingTrailers => {
198 return match this.inner.poll_trailers(cx) {
199 Poll::Ready(Ok(trailers)) => {
200 *this.state = AwsChunkedBodyState::Closed;
201 let expected_length =
202 http_02x_utils::total_rendered_length_of_trailers(trailers.as_ref());
203 let actual_length = this.options.total_trailer_length();
204
205 if expected_length != actual_length {
206 let err =
207 Box::new(AwsChunkedBodyError::ReportedTrailerLengthMismatch {
208 actual: actual_length,
209 expected: expected_length,
210 });
211 return Poll::Ready(Some(Err(err)));
212 }
213
214 let mut trailers = http_02x_utils::trailers_as_aws_chunked_bytes(
215 trailers,
216 actual_length + 1,
217 );
218 trailers.extend_from_slice(CRLF.as_bytes());
220
221 Poll::Ready(Some(Ok(trailers.into())))
222 }
223 Poll::Pending => Poll::Pending,
224 Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))),
225 };
226 }
227 AwsChunkedBodyState::Closed => Poll::Ready(None),
228 }
229 }
230
231 fn poll_trailers(
232 self: Pin<&mut Self>,
233 _cx: &mut Context<'_>,
234 ) -> Poll<Result<Option<http_02x::HeaderMap<http_02x::HeaderValue>>, Self::Error>> {
235 Poll::Ready(Ok(None))
237 }
238
239 fn is_end_stream(&self) -> bool {
240 self.state == AwsChunkedBodyState::Closed
241 }
242
243 fn size_hint(&self) -> http_body_04x::SizeHint {
244 http_body_04x::SizeHint::with_exact(self.encoded_length())
245 }
246}
247
248mod http_02x_utils {
250 use super::{CRLF, TRAILER_SEPARATOR};
251 use bytes::BytesMut;
252 use http_02x::HeaderMap;
253
254 pub(super) fn trailers_as_aws_chunked_bytes(
261 trailer_map: Option<HeaderMap>,
262 estimated_length: u64,
263 ) -> BytesMut {
264 if let Some(trailer_map) = trailer_map {
265 let mut current_header_name = None;
266 let mut trailers =
267 BytesMut::with_capacity(estimated_length.try_into().unwrap_or_default());
268
269 for (header_name, header_value) in trailer_map.into_iter() {
270 current_header_name = header_name.or(current_header_name);
274
275 if let Some(header_name) = current_header_name.as_ref() {
277 trailers.extend_from_slice(header_name.as_ref());
278 trailers.extend_from_slice(TRAILER_SEPARATOR);
279 trailers.extend_from_slice(header_value.as_bytes());
280 trailers.extend_from_slice(CRLF.as_bytes());
281 }
282 }
283
284 trailers
285 } else {
286 BytesMut::new()
287 }
288 }
289
290 pub(super) fn total_rendered_length_of_trailers(trailer_map: Option<&HeaderMap>) -> u64 {
297 match trailer_map {
298 Some(trailer_map) => trailer_map
299 .iter()
300 .map(|(trailer_name, trailer_value)| {
301 trailer_name.as_str().len()
302 + TRAILER_SEPARATOR.len()
303 + trailer_value.len()
304 + CRLF.len()
305 })
306 .sum::<usize>() as u64,
307 None => 0,
308 }
309 }
310}
311
312const UNREACHABLE_STATES: &str = "These states already short circuited";
313
314impl<Inner> http_body_1x::Body for AwsChunkedBody<Inner>
316where
317 Inner: http_body_1x::Body<Data = Bytes, Error = aws_smithy_types::body::Error>,
318{
319 type Data = Bytes;
320 type Error = aws_smithy_types::body::Error;
321
322 fn is_end_stream(&self) -> bool {
323 self.state == AwsChunkedBodyState::Closed
324 }
325
326 fn size_hint(&self) -> http_body_1x::SizeHint {
327 http_body_1x::SizeHint::with_exact(self.encoded_length())
328 }
329
330 fn poll_frame(
331 self: Pin<&mut Self>,
332 cx: &mut Context<'_>,
333 ) -> Poll<Option<Result<http_body_1x::Frame<Self::Data>, Self::Error>>> {
334 tracing::trace!(state = ?self.state, "polling AwsChunkedBody");
335 let mut this = self.project();
336
337 if *this.state == AwsChunkedBodyState::WritingChunkSize {
341 if this.options.stream_length == 0 {
342 tracing::trace!("stream is empty, writing chunk terminator");
344 let frame = http_body_1x::Frame::data(Bytes::from(CHUNK_TERMINATOR));
345 *this.state = AwsChunkedBodyState::WritingTrailers;
346 return Poll::Ready(Some(Ok(frame)));
347 } else {
348 let chunk_size = format!(
350 "{:X?}{}",
351 this.options.stream_length,
352 std::str::from_utf8(CRLF_RAW).unwrap()
353 );
354 tracing::trace!(%chunk_size, "writing chunk size");
355 let chunk_size = http_body_1x::Frame::data(Bytes::from(chunk_size));
356 *this.state = AwsChunkedBodyState::WritingChunk;
357 return Poll::Ready(Some(Ok(chunk_size)));
358 }
359 }
360
361 if *this.state == AwsChunkedBodyState::Closed {
363 return Poll::Ready(None);
364 }
365
366 let maybe_frame = this.inner.poll_frame(cx);
368 tracing::trace!(poll_state = ?maybe_frame, "Polling InnerBody");
369
370 match maybe_frame {
371 Poll::Ready(Some(Ok(frame))) => match *this.state {
372 AwsChunkedBodyState::WritingChunk => {
376 if frame.is_data() {
377 let data = frame.data_ref().expect("Data frame has data");
378 tracing::trace!(len = data.len(), "Writing chunk data");
379 *this.inner_body_bytes_read_so_far += data.len();
380 Poll::Ready(Some(Ok(frame)))
381 } else {
382 tracing::trace!(
383 "No more chunk data, writing CRLF + CHUNK_TERMINATOR to end the data, and the first trailer frame"
384 );
385
386 if let Err(poll_stream_len_err) =
388 http_1x_utils::check_for_stream_length_mismatch(
389 *this.inner_body_bytes_read_so_far as u64,
390 this.options.stream_length,
391 )
392 {
393 return poll_stream_len_err;
394 }
395
396 *this.state = AwsChunkedBodyState::WritingTrailers;
397 let trailers = frame.trailers_ref();
398
399 let actual_length: u64 =
408 http_1x_utils::total_rendered_length_of_trailers(trailers);
409 let expected_length = this.options.total_trailer_length();
410 if expected_length != actual_length {
411 let err =
412 Box::new(AwsChunkedBodyError::ReportedTrailerLengthMismatch {
413 actual: actual_length,
414 expected: expected_length,
415 });
416 return Poll::Ready(Some(Err(err)));
417 }
418
419 let mut buf = BytesMut::with_capacity(actual_length as usize + 7);
424 buf.extend_from_slice(&[CRLF_RAW, CHUNK_TERMINATOR_RAW].concat());
426
427 let trailers = http_1x_utils::trailers_as_aws_chunked_bytes(trailers, buf);
431 Poll::Ready(Some(Ok(http_body_1x::Frame::data(trailers.into()))))
432 }
433 }
434 AwsChunkedBodyState::WritingTrailers => {
435 let trailers = frame.trailers_ref();
436 let actual_length: u64 =
437 http_1x_utils::total_rendered_length_of_trailers(trailers);
438 let buf = BytesMut::with_capacity(actual_length as usize + 7);
439 let trailers = http_1x_utils::trailers_as_aws_chunked_bytes(trailers, buf);
440 Poll::Ready(Some(Ok(http_body_1x::Frame::data(trailers.into()))))
441 }
442 AwsChunkedBodyState::Closed | AwsChunkedBodyState::WritingChunkSize => {
443 unreachable!("{}", UNREACHABLE_STATES)
444 }
445 },
446 Poll::Ready(None) => {
448 let trailers = match *this.state {
449 AwsChunkedBodyState::WritingChunk => {
450 if let Err(poll_stream_len_err) =
452 http_1x_utils::check_for_stream_length_mismatch(
453 *this.inner_body_bytes_read_so_far as u64,
454 this.options.stream_length,
455 )
456 {
457 return poll_stream_len_err;
458 }
459
460 let mut trailers = BytesMut::with_capacity(7);
464 trailers.extend_from_slice(
465 &[CRLF_RAW, CHUNK_TERMINATOR_RAW, CRLF_RAW].concat(),
466 );
467 trailers
468 }
469 AwsChunkedBodyState::WritingTrailers => {
470 let mut trailers = BytesMut::with_capacity(2);
471 trailers.extend_from_slice(CRLF_RAW);
472 trailers
473 }
474 AwsChunkedBodyState::Closed | AwsChunkedBodyState::WritingChunkSize => {
475 unreachable!("{}", UNREACHABLE_STATES)
476 }
477 };
478
479 let frame = http_body_1x::Frame::data(trailers.into());
480 *this.state = AwsChunkedBodyState::Closed;
481 Poll::Ready(Some(Ok(frame)))
482 }
483 Poll::Pending => Poll::Pending,
485 Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
486 }
487 }
488}
489mod http_1x_utils {
491 use std::task::Poll;
492
493 use super::{CRLF_RAW, TRAILER_SEPARATOR};
494 use bytes::{Bytes, BytesMut};
495 use http_1x::{HeaderMap, HeaderName};
496
497 pub(super) fn trailers_as_aws_chunked_bytes(
504 trailer_map: Option<&HeaderMap>,
505 mut buffer: BytesMut,
506 ) -> BytesMut {
507 if let Some(trailer_map) = trailer_map {
508 let mut current_header_name: Option<HeaderName> = None;
509
510 for (header_name, header_value) in trailer_map.clone().into_iter() {
511 current_header_name = header_name.or(current_header_name);
515
516 if let Some(header_name) = current_header_name.as_ref() {
518 buffer.extend_from_slice(header_name.as_ref());
519 buffer.extend_from_slice(TRAILER_SEPARATOR);
520 buffer.extend_from_slice(header_value.as_bytes());
521 buffer.extend_from_slice(CRLF_RAW);
522 }
523 }
524
525 buffer
526 } else {
527 buffer
528 }
529 }
530
531 pub(super) fn total_rendered_length_of_trailers(trailer_map: Option<&HeaderMap>) -> u64 {
538 match trailer_map {
539 Some(trailer_map) => trailer_map
540 .iter()
541 .map(|(trailer_name, trailer_value)| {
542 trailer_name.as_str().len()
543 + TRAILER_SEPARATOR.len()
544 + trailer_value.len()
545 + CRLF_RAW.len()
546 })
547 .sum::<usize>() as u64,
548 None => 0,
549 }
550 }
551
552 #[allow(clippy::type_complexity)]
555 pub(super) fn check_for_stream_length_mismatch(
556 actual_stream_length: u64,
557 expected_stream_length: u64,
558 ) -> Result<(), Poll<Option<Result<http_body_1x::Frame<Bytes>, aws_smithy_types::body::Error>>>>
559 {
560 if actual_stream_length != expected_stream_length {
561 let err = Box::new(super::AwsChunkedBodyError::StreamLengthMismatch {
562 actual: actual_stream_length,
563 expected: expected_stream_length,
564 });
565 return Err(Poll::Ready(Some(Err(err))));
566 };
567
568 Ok(())
569 }
570}
571
572#[derive(Debug)]
574enum AwsChunkedBodyError {
575 ReportedTrailerLengthMismatch { actual: u64, expected: u64 },
580 StreamLengthMismatch { actual: u64, expected: u64 },
584}
585
586impl std::fmt::Display for AwsChunkedBodyError {
587 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
588 match self {
589 Self::ReportedTrailerLengthMismatch { actual, expected } => {
590 write!(f, "When creating this AwsChunkedBody, length of trailers was reported as {expected}. However, when double checking during trailer encoding, length was found to be {actual} instead.")
591 }
592 Self::StreamLengthMismatch { actual, expected } => {
593 write!(f, "When creating this AwsChunkedBody, stream length was reported as {expected}. However, when double checking during body encoding, length was found to be {actual} instead.")
594 }
595 }
596 }
597}
598
599impl std::error::Error for AwsChunkedBodyError {}
600
601fn int_log16<T>(mut i: T) -> u64
603where
604 T: std::ops::DivAssign + PartialOrd + From<u8> + Copy,
605{
606 let mut len = 0;
607 let zero = T::from(0);
608 let sixteen = T::from(16);
609
610 while i > zero {
611 i /= sixteen;
612 len += 1;
613 }
614
615 len
616}
617
618fn get_unsigned_chunk_bytes_length(payload_length: u64) -> u64 {
619 let hex_repr_len = int_log16(payload_length);
620 hex_repr_len + CRLF.len() as u64 + payload_length + CRLF.len() as u64
621}
622
623#[cfg(test)]
624mod tests {
625
626 #[cfg(test)]
627 mod http_02x_tests {
628 use super::super::{
629 http_02x_utils::{total_rendered_length_of_trailers, trailers_as_aws_chunked_bytes},
630 AwsChunkedBody, AwsChunkedBodyOptions, CHUNK_TERMINATOR, CRLF,
631 };
632
633 use aws_smithy_types::body::SdkBody;
634 use bytes::{Buf, Bytes};
635 use bytes_utils::SegmentedBuf;
636 use http_02x::{HeaderMap, HeaderValue};
637 use http_body_04x::{Body, SizeHint};
638 use pin_project_lite::pin_project;
639
640 use std::io::Read;
641 use std::pin::Pin;
642 use std::task::{Context, Poll};
643 use std::time::Duration;
644
645 pin_project! {
646 struct SputteringBody {
647 parts: Vec<Option<Bytes>>,
648 cursor: usize,
649 delay_in_millis: u64,
650 }
651 }
652
653 impl SputteringBody {
654 fn len(&self) -> usize {
655 self.parts.iter().flatten().map(|b| b.len()).sum()
656 }
657 }
658
659 impl Body for SputteringBody {
660 type Data = Bytes;
661 type Error = aws_smithy_types::body::Error;
662
663 fn poll_data(
664 self: Pin<&mut Self>,
665 cx: &mut Context<'_>,
666 ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
667 if self.cursor == self.parts.len() {
668 return Poll::Ready(None);
669 }
670
671 let this = self.project();
672 let delay_in_millis = *this.delay_in_millis;
673 let next_part = this.parts.get_mut(*this.cursor).unwrap().take();
674
675 match next_part {
676 None => {
677 *this.cursor += 1;
678 let waker = cx.waker().clone();
679 tokio::spawn(async move {
680 tokio::time::sleep(Duration::from_millis(delay_in_millis)).await;
681 waker.wake();
682 });
683 Poll::Pending
684 }
685 Some(data) => {
686 *this.cursor += 1;
687 Poll::Ready(Some(Ok(data)))
688 }
689 }
690 }
691
692 fn poll_trailers(
693 self: Pin<&mut Self>,
694 _cx: &mut Context<'_>,
695 ) -> Poll<Result<Option<HeaderMap<HeaderValue>>, Self::Error>> {
696 Poll::Ready(Ok(None))
697 }
698
699 fn is_end_stream(&self) -> bool {
700 false
701 }
702
703 fn size_hint(&self) -> SizeHint {
704 SizeHint::new()
705 }
706 }
707
708 #[tokio::test]
709 async fn test_aws_chunked_encoding() {
710 let test_fut = async {
711 let input_str = "Hello world";
712 let opts = AwsChunkedBodyOptions::new(input_str.len() as u64, Vec::new());
713 let mut body = AwsChunkedBody::new(SdkBody::from(input_str), opts);
714
715 let mut output = SegmentedBuf::new();
716 while let Some(buf) = body.data().await {
717 output.push(buf.unwrap());
718 }
719
720 let mut actual_output = String::new();
721 output
722 .reader()
723 .read_to_string(&mut actual_output)
724 .expect("Doesn't cause IO errors");
725
726 let expected_output = "B\r\nHello world\r\n0\r\n\r\n";
727
728 assert_eq!(expected_output, actual_output);
729 assert!(
730 body.trailers()
731 .await
732 .expect("no errors occurred during trailer polling")
733 .is_none(),
734 "aws-chunked encoded bodies don't have normal HTTP trailers"
735 );
736
737 };
739
740 let timeout_duration = Duration::from_secs(3);
741 if tokio::time::timeout(timeout_duration, test_fut)
742 .await
743 .is_err()
744 {
745 panic!("test_aws_chunked_encoding timed out after {timeout_duration:?}");
746 }
747 }
748
749 #[tokio::test]
750 async fn test_aws_chunked_encoding_sputtering_body() {
751 let test_fut = async {
752 let input = SputteringBody {
753 parts: vec![
754 Some(Bytes::from_static(b"chunk 1, ")),
755 None,
756 Some(Bytes::from_static(b"chunk 2, ")),
757 Some(Bytes::from_static(b"chunk 3, ")),
758 None,
759 None,
760 Some(Bytes::from_static(b"chunk 4, ")),
761 Some(Bytes::from_static(b"chunk 5, ")),
762 Some(Bytes::from_static(b"chunk 6")),
763 ],
764 cursor: 0,
765 delay_in_millis: 500,
766 };
767 let opts = AwsChunkedBodyOptions::new(input.len() as u64, Vec::new());
768 let mut body = AwsChunkedBody::new(input, opts);
769
770 let mut output = SegmentedBuf::new();
771 while let Some(buf) = body.data().await {
772 output.push(buf.unwrap());
773 }
774
775 let mut actual_output = String::new();
776 output
777 .reader()
778 .read_to_string(&mut actual_output)
779 .expect("Doesn't cause IO errors");
780
781 let expected_output =
782 "34\r\nchunk 1, chunk 2, chunk 3, chunk 4, chunk 5, chunk 6\r\n0\r\n\r\n";
783
784 assert_eq!(expected_output, actual_output);
785 assert!(
786 body.trailers()
787 .await
788 .expect("no errors occurred during trailer polling")
789 .is_none(),
790 "aws-chunked encoded bodies don't have normal HTTP trailers"
791 );
792 };
793
794 let timeout_duration = Duration::from_secs(3);
795 if tokio::time::timeout(timeout_duration, test_fut)
796 .await
797 .is_err()
798 {
799 panic!(
800 "test_aws_chunked_encoding_sputtering_body timed out after {timeout_duration:?}"
801 );
802 }
803 }
804
805 #[tokio::test]
806 #[should_panic = "called `Result::unwrap()` on an `Err` value: ReportedTrailerLengthMismatch { actual: 44, expected: 0 }"]
807 async fn test_aws_chunked_encoding_incorrect_trailer_length_panic() {
808 let input_str = "Hello world";
809 let wrong_trailer_len = 42;
813 let opts = AwsChunkedBodyOptions::new(input_str.len() as u64, vec![wrong_trailer_len]);
814 let mut body = AwsChunkedBody::new(SdkBody::from(input_str), opts);
815
816 while let Some(buf) = body.data().await {
818 drop(buf.unwrap());
819 }
820
821 assert!(
822 body.trailers()
823 .await
824 .expect("no errors occurred during trailer polling")
825 .is_none(),
826 "aws-chunked encoded bodies don't have normal HTTP trailers"
827 );
828 }
829
830 #[tokio::test]
831 async fn test_aws_chunked_encoding_empty_body() {
832 let input_str = "";
833 let opts = AwsChunkedBodyOptions::new(input_str.len() as u64, Vec::new());
834 let mut body = AwsChunkedBody::new(SdkBody::from(input_str), opts);
835
836 let mut output = SegmentedBuf::new();
837 while let Some(buf) = body.data().await {
838 output.push(buf.unwrap());
839 }
840
841 let mut actual_output = String::new();
842 output
843 .reader()
844 .read_to_string(&mut actual_output)
845 .expect("Doesn't cause IO errors");
846
847 let expected_output = [CHUNK_TERMINATOR, CRLF].concat();
848
849 assert_eq!(expected_output, actual_output);
850 assert!(
851 body.trailers()
852 .await
853 .expect("no errors occurred during trailer polling")
854 .is_none(),
855 "aws-chunked encoded bodies don't have normal HTTP trailers"
856 );
857 }
858
859 #[tokio::test]
860 async fn test_total_rendered_length_of_trailers() {
861 let mut trailers = HeaderMap::new();
862
863 trailers.insert("empty_value", HeaderValue::from_static(""));
864
865 trailers.insert("single_value", HeaderValue::from_static("value 1"));
866
867 trailers.insert("two_values", HeaderValue::from_static("value 1"));
868 trailers.append("two_values", HeaderValue::from_static("value 2"));
869
870 trailers.insert("three_values", HeaderValue::from_static("value 1"));
871 trailers.append("three_values", HeaderValue::from_static("value 2"));
872 trailers.append("three_values", HeaderValue::from_static("value 3"));
873
874 let trailers = Some(trailers);
875 let actual_length = total_rendered_length_of_trailers(trailers.as_ref());
876 let expected_length =
877 (trailers_as_aws_chunked_bytes(trailers, actual_length).len()) as u64;
878
879 assert_eq!(expected_length, actual_length);
880 }
881
882 #[tokio::test]
883 async fn test_total_rendered_length_of_empty_trailers() {
884 let trailers = Some(HeaderMap::new());
885 let actual_length = total_rendered_length_of_trailers(trailers.as_ref());
886 let expected_length =
887 (trailers_as_aws_chunked_bytes(trailers, actual_length).len()) as u64;
888
889 assert_eq!(expected_length, actual_length);
890 }
891 }
892
893 #[cfg(test)]
894 mod http_1x_tests {
895 use super::super::{
896 http_1x_utils::{total_rendered_length_of_trailers, trailers_as_aws_chunked_bytes},
897 AwsChunkedBody, AwsChunkedBodyOptions, CHUNK_TERMINATOR_RAW, CRLF_RAW,
898 };
899
900 use aws_smithy_types::body::SdkBody;
901 use bytes::{Buf, Bytes, BytesMut};
902 use bytes_utils::SegmentedBuf;
903 use http_1x::{HeaderMap, HeaderValue};
904 use http_body_1x::{Body, Frame, SizeHint};
905 use http_body_util::BodyExt;
906 use pin_project_lite::pin_project;
907
908 use std::io::Read;
909 use std::pin::Pin;
910 use std::task::{Context, Poll};
911 use std::time::Duration;
912
913 pin_project! {
914 struct SputteringBody {
915 parts: Vec<Option<Bytes>>,
916 cursor: usize,
917 delay_in_millis: u64,
918 }
919 }
920
921 impl SputteringBody {
922 fn len(&self) -> usize {
923 self.parts.iter().flatten().map(|b| b.len()).sum()
924 }
925 }
926
927 impl Body for SputteringBody {
928 type Data = Bytes;
929 type Error = aws_smithy_types::body::Error;
930
931 fn poll_frame(
932 self: Pin<&mut Self>,
933 cx: &mut Context<'_>,
934 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
935 if self.cursor == self.parts.len() {
936 return Poll::Ready(None);
937 }
938
939 let this = self.project();
940 let delay_in_millis = *this.delay_in_millis;
941 let next_part = this.parts.get_mut(*this.cursor).unwrap().take();
942
943 match next_part {
944 None => {
945 *this.cursor += 1;
946 let waker = cx.waker().clone();
947 tokio::spawn(async move {
948 tokio::time::sleep(Duration::from_millis(delay_in_millis)).await;
949 waker.wake();
950 });
951 Poll::Pending
952 }
953 Some(data) => {
954 *this.cursor += 1;
955 let frame = Frame::data(data);
956 Poll::Ready(Some(Ok(frame)))
957 }
958 }
959 }
960
961 fn is_end_stream(&self) -> bool {
962 false
963 }
964
965 fn size_hint(&self) -> SizeHint {
966 SizeHint::new()
967 }
968 }
969
970 #[tokio::test]
971 async fn test_aws_chunked_encoding() {
972 let test_fut = async {
973 let input_str = "Hello world";
974 let opts = AwsChunkedBodyOptions::new(input_str.len() as u64, vec![]);
975 let mut body = AwsChunkedBody::new(SdkBody::from(input_str), opts);
976
977 let mut output = SegmentedBuf::new();
978 while let Some(Ok(buf)) = body.frame().await {
979 output.push(buf.into_data().unwrap());
980 }
981
982 let mut actual_output = String::new();
983 output
984 .reader()
985 .read_to_string(&mut actual_output)
986 .expect("Doesn't cause IO errors");
987
988 let expected_output = "B\r\nHello world\r\n0\r\n\r\n";
989
990 assert_eq!(expected_output, actual_output);
991
992 };
994
995 let timeout_duration = Duration::from_secs(3);
996 if tokio::time::timeout(timeout_duration, test_fut)
997 .await
998 .is_err()
999 {
1000 panic!("test_aws_chunked_encoding timed out after {timeout_duration:?}");
1001 }
1002 }
1003
1004 #[tokio::test]
1005 async fn test_aws_chunked_encoding_sputtering_body() {
1006 let test_fut = async {
1007 let input = SputteringBody {
1008 parts: vec![
1009 Some(Bytes::from_static(b"chunk 1, ")),
1010 None,
1011 Some(Bytes::from_static(b"chunk 2, ")),
1012 Some(Bytes::from_static(b"chunk 3, ")),
1013 None,
1014 None,
1015 Some(Bytes::from_static(b"chunk 4, ")),
1016 Some(Bytes::from_static(b"chunk 5, ")),
1017 Some(Bytes::from_static(b"chunk 6")),
1018 ],
1019 cursor: 0,
1020 delay_in_millis: 500,
1021 };
1022 let opts = AwsChunkedBodyOptions::new(input.len() as u64, vec![]);
1023 let mut body = AwsChunkedBody::new(input, opts);
1024
1025 let mut output = SegmentedBuf::new();
1026 while let Some(Ok(buf)) = body.frame().await {
1027 output.push(buf.into_data().unwrap());
1028 }
1029
1030 let mut actual_output = String::new();
1031 output
1032 .reader()
1033 .read_to_string(&mut actual_output)
1034 .expect("Doesn't cause IO errors");
1035
1036 let expected_output =
1037 "34\r\nchunk 1, chunk 2, chunk 3, chunk 4, chunk 5, chunk 6\r\n0\r\n\r\n";
1038
1039 assert_eq!(expected_output, actual_output);
1040 };
1041
1042 let timeout_duration = Duration::from_secs(3);
1043 if tokio::time::timeout(timeout_duration, test_fut)
1044 .await
1045 .is_err()
1046 {
1047 panic!(
1048 "test_aws_chunked_encoding_sputtering_body timed out after {timeout_duration:?}"
1049 );
1050 }
1051 }
1052
1053 #[tokio::test]
1054 async fn test_aws_chunked_encoding_incorrect_trailer_length_panic() {
1055 let input_str = "Hello world";
1056 let wrong_trailer_len = 42;
1060 let opts = AwsChunkedBodyOptions::new(input_str.len() as u64, vec![wrong_trailer_len]);
1061 let mut body = AwsChunkedBody::new(SdkBody::from(input_str), opts);
1062
1063 while let Some(Ok(frame)) = body.frame().await {
1065 assert!(!frame.is_trailers());
1066 }
1067 }
1068
1069 #[tokio::test]
1070 async fn test_aws_chunked_encoding_empty_body() {
1071 let input_str = "";
1072 let opts = AwsChunkedBodyOptions::new(input_str.len() as u64, vec![]);
1073 let mut body = AwsChunkedBody::new(SdkBody::from(input_str), opts);
1074
1075 let mut output = SegmentedBuf::new();
1076 while let Some(Ok(frame)) = body.frame().await {
1077 output.push(frame.into_data().unwrap());
1078 }
1079
1080 let mut actual_output = String::new();
1081 output
1082 .reader()
1083 .read_to_string(&mut actual_output)
1084 .expect("Doesn't cause IO errors");
1085
1086 let actual_output = std::str::from_utf8(actual_output.as_bytes()).unwrap();
1087 let expected_output = [CHUNK_TERMINATOR_RAW, CRLF_RAW].concat();
1088 let expected_output = std::str::from_utf8(&expected_output).unwrap();
1089
1090 assert_eq!(expected_output, actual_output);
1091 }
1092
1093 #[tokio::test]
1094 async fn test_total_rendered_length_of_trailers() {
1095 let mut trailers = HeaderMap::new();
1096
1097 trailers.insert("empty_value", HeaderValue::from_static(""));
1098
1099 trailers.insert("single_value", HeaderValue::from_static("value 1"));
1100
1101 trailers.insert("two_values", HeaderValue::from_static("value 1"));
1102 trailers.append("two_values", HeaderValue::from_static("value 2"));
1103
1104 trailers.insert("three_values", HeaderValue::from_static("value 1"));
1105 trailers.append("three_values", HeaderValue::from_static("value 2"));
1106 trailers.append("three_values", HeaderValue::from_static("value 3"));
1107
1108 let trailers = Some(&trailers);
1109 let actual_length = total_rendered_length_of_trailers(trailers);
1110 let buf = BytesMut::with_capacity(actual_length as usize);
1111 let expected_length = (trailers_as_aws_chunked_bytes(trailers, buf).len()) as u64;
1112
1113 assert_eq!(expected_length, actual_length);
1114 }
1115
1116 #[tokio::test]
1117 async fn test_total_rendered_length_of_empty_trailers() {
1118 let header_map = HeaderMap::new();
1119 let trailers = Some(&header_map);
1120 let actual_length = total_rendered_length_of_trailers(trailers);
1121 let buf = BytesMut::with_capacity(actual_length as usize);
1122 let expected_length = (trailers_as_aws_chunked_bytes(trailers, buf).len()) as u64;
1123
1124 assert_eq!(expected_length, actual_length);
1125 }
1126 }
1127}