aws_runtime/
content_encoding.rs

1/*
2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6use bytes::{Bytes, BytesMut};
7use pin_project_lite::pin_project;
8
9use std::pin::Pin;
10use std::task::{Context, Poll};
11
12const CRLF: &str = "\r\n";
13const CRLF_RAW: &[u8] = b"\r\n";
14
15const CHUNK_TERMINATOR: &str = "0\r\n";
16const CHUNK_TERMINATOR_RAW: &[u8] = b"0\r\n";
17
18const TRAILER_SEPARATOR: &[u8] = b":";
19
20/// Content encoding header value constants
21pub mod header_value {
22    /// Header value denoting "aws-chunked" encoding
23    pub const AWS_CHUNKED: &str = "aws-chunked";
24}
25
26/// Options used when constructing an [`AwsChunkedBody`].
27#[derive(Debug, Default)]
28#[non_exhaustive]
29pub struct AwsChunkedBodyOptions {
30    /// The total size of the stream. Because we only support unsigned encoding
31    /// this implies that there will only be a single chunk containing the
32    /// underlying payload.
33    stream_length: u64,
34    /// The length of each trailer sent within an `AwsChunkedBody`. Necessary in
35    /// order to correctly calculate the total size of the body accurately.
36    trailer_lengths: Vec<u64>,
37}
38
39impl AwsChunkedBodyOptions {
40    /// Create a new [`AwsChunkedBodyOptions`].
41    pub fn new(stream_length: u64, trailer_lengths: Vec<u64>) -> Self {
42        Self {
43            stream_length,
44            trailer_lengths,
45        }
46    }
47
48    fn total_trailer_length(&self) -> u64 {
49        self.trailer_lengths.iter().sum::<u64>()
50            // We need to account for a CRLF after each trailer name/value pair
51            + (self.trailer_lengths.len() * CRLF.len()) as u64
52    }
53
54    /// Set a trailer len
55    pub fn with_trailer_len(mut self, trailer_len: u64) -> Self {
56        self.trailer_lengths.push(trailer_len);
57        self
58    }
59}
60
61#[derive(Debug, PartialEq, Eq)]
62enum AwsChunkedBodyState {
63    /// Write out the size of the chunk that will follow. Then, transition into the
64    /// `WritingChunk` state.
65    WritingChunkSize,
66    /// Write out the next chunk of data. Multiple polls of the inner body may need to occur before
67    /// all data is written out. Once there is no more data to write, transition into the
68    /// `WritingTrailers` state.
69    WritingChunk,
70    /// Write out all trailers associated with this `AwsChunkedBody` and then transition into the
71    /// `Closed` state.
72    WritingTrailers,
73    /// This is the final state. Write out the body terminator and then remain in this state.
74    Closed,
75}
76
77pin_project! {
78    /// A request body compatible with `Content-Encoding: aws-chunked`. This implementation is only
79    /// capable of writing a single chunk and does not support signed chunks.
80    ///
81    /// Chunked-Body grammar is defined in [ABNF] as:
82    ///
83    /// ```txt
84    /// Chunked-Body    = *chunk
85    ///                   last-chunk
86    ///                   chunked-trailer
87    ///                   CRLF
88    ///
89    /// chunk           = chunk-size CRLF chunk-data CRLF
90    /// chunk-size      = 1*HEXDIG
91    /// last-chunk      = 1*("0") CRLF
92    /// chunked-trailer = *( entity-header CRLF )
93    /// entity-header   = field-name ":" OWS field-value OWS
94    /// ```
95    /// For more info on what the abbreviations mean, see https://datatracker.ietf.org/doc/html/rfc7230#section-1.2
96    ///
97    /// [ABNF]:https://en.wikipedia.org/wiki/Augmented_Backus%E2%80%93Naur_form
98    #[derive(Debug)]
99    pub struct AwsChunkedBody<InnerBody> {
100        #[pin]
101        inner: InnerBody,
102        #[pin]
103        state: AwsChunkedBodyState,
104        options: AwsChunkedBodyOptions,
105        inner_body_bytes_read_so_far: usize,
106    }
107}
108
109impl<Inner> AwsChunkedBody<Inner> {
110    /// Wrap the given body in an outer body compatible with `Content-Encoding: aws-chunked`
111    pub fn new(body: Inner, options: AwsChunkedBodyOptions) -> Self {
112        Self {
113            inner: body,
114            state: AwsChunkedBodyState::WritingChunkSize,
115            options,
116            inner_body_bytes_read_so_far: 0,
117        }
118    }
119
120    fn encoded_length(&self) -> u64 {
121        let mut length = 0;
122        if self.options.stream_length != 0 {
123            length += get_unsigned_chunk_bytes_length(self.options.stream_length);
124        }
125
126        // End chunk
127        length += CHUNK_TERMINATOR.len() as u64;
128
129        // Trailers
130        for len in self.options.trailer_lengths.iter() {
131            length += len + CRLF.len() as u64;
132        }
133
134        // Encoding terminator
135        length += CRLF.len() as u64;
136
137        length
138    }
139}
140
141impl<Inner> http_body_04x::Body for AwsChunkedBody<Inner>
142where
143    Inner: http_body_04x::Body<Data = Bytes, Error = aws_smithy_types::body::Error>,
144{
145    type Data = Bytes;
146    type Error = aws_smithy_types::body::Error;
147
148    fn poll_data(
149        self: Pin<&mut Self>,
150        cx: &mut Context<'_>,
151    ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
152        tracing::trace!(state = ?self.state, "polling AwsChunkedBody");
153        let mut this = self.project();
154
155        match *this.state {
156            AwsChunkedBodyState::WritingChunkSize => {
157                if this.options.stream_length == 0 {
158                    // If the stream is empty, we skip to writing trailers after writing the CHUNK_TERMINATOR.
159                    *this.state = AwsChunkedBodyState::WritingTrailers;
160                    tracing::trace!("stream is empty, writing chunk terminator");
161                    Poll::Ready(Some(Ok(Bytes::from([CHUNK_TERMINATOR].concat()))))
162                } else {
163                    *this.state = AwsChunkedBodyState::WritingChunk;
164                    // A chunk must be prefixed by chunk size in hexadecimal
165                    let chunk_size = format!("{:X?}{CRLF}", this.options.stream_length);
166                    tracing::trace!(%chunk_size, "writing chunk size");
167                    let chunk_size = Bytes::from(chunk_size);
168                    Poll::Ready(Some(Ok(chunk_size)))
169                }
170            }
171            AwsChunkedBodyState::WritingChunk => match this.inner.poll_data(cx) {
172                Poll::Ready(Some(Ok(data))) => {
173                    tracing::trace!(len = data.len(), "writing chunk data");
174                    *this.inner_body_bytes_read_so_far += data.len();
175                    Poll::Ready(Some(Ok(data)))
176                }
177                Poll::Ready(None) => {
178                    let actual_stream_length = *this.inner_body_bytes_read_so_far as u64;
179                    let expected_stream_length = this.options.stream_length;
180                    if actual_stream_length != expected_stream_length {
181                        let err = Box::new(AwsChunkedBodyError::StreamLengthMismatch {
182                            actual: actual_stream_length,
183                            expected: expected_stream_length,
184                        });
185                        return Poll::Ready(Some(Err(err)));
186                    };
187
188                    tracing::trace!("no more chunk data, writing CRLF and chunk terminator");
189                    *this.state = AwsChunkedBodyState::WritingTrailers;
190                    // Since we wrote chunk data, we end it with a CRLF and since we only write
191                    // a single chunk, we write the CHUNK_TERMINATOR immediately after
192                    Poll::Ready(Some(Ok(Bytes::from([CRLF, CHUNK_TERMINATOR].concat()))))
193                }
194                Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
195                Poll::Pending => Poll::Pending,
196            },
197            AwsChunkedBodyState::WritingTrailers => {
198                return match this.inner.poll_trailers(cx) {
199                    Poll::Ready(Ok(trailers)) => {
200                        *this.state = AwsChunkedBodyState::Closed;
201                        let expected_length =
202                            http_02x_utils::total_rendered_length_of_trailers(trailers.as_ref());
203                        let actual_length = this.options.total_trailer_length();
204
205                        if expected_length != actual_length {
206                            let err =
207                                Box::new(AwsChunkedBodyError::ReportedTrailerLengthMismatch {
208                                    actual: actual_length,
209                                    expected: expected_length,
210                                });
211                            return Poll::Ready(Some(Err(err)));
212                        }
213
214                        let mut trailers = http_02x_utils::trailers_as_aws_chunked_bytes(
215                            trailers,
216                            actual_length + 1,
217                        );
218                        // Insert the final CRLF to close the body
219                        trailers.extend_from_slice(CRLF.as_bytes());
220
221                        Poll::Ready(Some(Ok(trailers.into())))
222                    }
223                    Poll::Pending => Poll::Pending,
224                    Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))),
225                };
226            }
227            AwsChunkedBodyState::Closed => Poll::Ready(None),
228        }
229    }
230
231    fn poll_trailers(
232        self: Pin<&mut Self>,
233        _cx: &mut Context<'_>,
234    ) -> Poll<Result<Option<http_02x::HeaderMap<http_02x::HeaderValue>>, Self::Error>> {
235        // Trailers were already appended to the body because of the content encoding scheme
236        Poll::Ready(Ok(None))
237    }
238
239    fn is_end_stream(&self) -> bool {
240        self.state == AwsChunkedBodyState::Closed
241    }
242
243    fn size_hint(&self) -> http_body_04x::SizeHint {
244        http_body_04x::SizeHint::with_exact(self.encoded_length())
245    }
246}
247
248/// Utility functions to help with the [http_body_04x::Body] trait implementation
249mod http_02x_utils {
250    use super::{CRLF, TRAILER_SEPARATOR};
251    use bytes::BytesMut;
252    use http_02x::HeaderMap;
253
254    /// Writes trailers out into a `string` and then converts that `String` to a `Bytes` before
255    /// returning.
256    ///
257    /// - Trailer names are separated by a single colon only, no space.
258    /// - Trailer names with multiple values will be written out one line per value, with the name
259    ///   appearing on each line.
260    pub(super) fn trailers_as_aws_chunked_bytes(
261        trailer_map: Option<HeaderMap>,
262        estimated_length: u64,
263    ) -> BytesMut {
264        if let Some(trailer_map) = trailer_map {
265            let mut current_header_name = None;
266            let mut trailers =
267                BytesMut::with_capacity(estimated_length.try_into().unwrap_or_default());
268
269            for (header_name, header_value) in trailer_map.into_iter() {
270                // When a header has multiple values, the name only comes up in iteration the first time
271                // we see it. Therefore, we need to keep track of the last name we saw and fall back to
272                // it when `header_name == None`.
273                current_header_name = header_name.or(current_header_name);
274
275                // In practice, this will always exist, but `if let` is nicer than unwrap
276                if let Some(header_name) = current_header_name.as_ref() {
277                    trailers.extend_from_slice(header_name.as_ref());
278                    trailers.extend_from_slice(TRAILER_SEPARATOR);
279                    trailers.extend_from_slice(header_value.as_bytes());
280                    trailers.extend_from_slice(CRLF.as_bytes());
281                }
282            }
283
284            trailers
285        } else {
286            BytesMut::new()
287        }
288    }
289
290    /// Given an optional `HeaderMap`, calculate the total number of bytes required to represent the
291    /// `HeaderMap`. If no `HeaderMap` is given as input, return 0.
292    ///
293    /// - Trailer names are separated by a single colon only, no space.
294    /// - Trailer names with multiple values will be written out one line per value, with the name
295    ///   appearing on each line.
296    pub(super) fn total_rendered_length_of_trailers(trailer_map: Option<&HeaderMap>) -> u64 {
297        match trailer_map {
298            Some(trailer_map) => trailer_map
299                .iter()
300                .map(|(trailer_name, trailer_value)| {
301                    trailer_name.as_str().len()
302                        + TRAILER_SEPARATOR.len()
303                        + trailer_value.len()
304                        + CRLF.len()
305                })
306                .sum::<usize>() as u64,
307            None => 0,
308        }
309    }
310}
311
312const UNREACHABLE_STATES: &str = "These states already short circuited";
313
314/// Implementing the [http_body_1x::Body] trait
315impl<Inner> http_body_1x::Body for AwsChunkedBody<Inner>
316where
317    Inner: http_body_1x::Body<Data = Bytes, Error = aws_smithy_types::body::Error>,
318{
319    type Data = Bytes;
320    type Error = aws_smithy_types::body::Error;
321
322    fn is_end_stream(&self) -> bool {
323        self.state == AwsChunkedBodyState::Closed
324    }
325
326    fn size_hint(&self) -> http_body_1x::SizeHint {
327        http_body_1x::SizeHint::with_exact(self.encoded_length())
328    }
329
330    fn poll_frame(
331        self: Pin<&mut Self>,
332        cx: &mut Context<'_>,
333    ) -> Poll<Option<Result<http_body_1x::Frame<Self::Data>, Self::Error>>> {
334        tracing::trace!(state = ?self.state, "polling AwsChunkedBody");
335        let mut this = self.project();
336
337        // Both `WritingChunkSize` and `Closed` states short circuit without polling the inner body
338
339        // Initial setup, we do not poll the inner body here
340        if *this.state == AwsChunkedBodyState::WritingChunkSize {
341            if this.options.stream_length == 0 {
342                // If the stream is empty, we skip to writing trailers after writing the CHUNK_TERMINATOR.
343                tracing::trace!("stream is empty, writing chunk terminator");
344                let frame = http_body_1x::Frame::data(Bytes::from(CHUNK_TERMINATOR));
345                *this.state = AwsChunkedBodyState::WritingTrailers;
346                return Poll::Ready(Some(Ok(frame)));
347            } else {
348                // A chunk must be prefixed by chunk size in hexadecimal
349                let chunk_size = format!(
350                    "{:X?}{}",
351                    this.options.stream_length,
352                    std::str::from_utf8(CRLF_RAW).unwrap()
353                );
354                tracing::trace!(%chunk_size, "writing chunk size");
355                let chunk_size = http_body_1x::Frame::data(Bytes::from(chunk_size));
356                *this.state = AwsChunkedBodyState::WritingChunk;
357                return Poll::Ready(Some(Ok(chunk_size)));
358            }
359        }
360
361        // Polled after completion
362        if *this.state == AwsChunkedBodyState::Closed {
363            return Poll::Ready(None);
364        }
365
366        // For all other states we must poll the inner body
367        let maybe_frame = this.inner.poll_frame(cx);
368        tracing::trace!(poll_state = ?maybe_frame, "Polling InnerBody");
369
370        match maybe_frame {
371            Poll::Ready(Some(Ok(frame))) => match *this.state {
372                // Both data chunks and trailers are written as Frame::data so we treat these states similarly
373                // Importantly we cannot know that the body data of the InnerBody is exhausted until we see a
374                // trailer frame or a Poll::Ready(None)
375                AwsChunkedBodyState::WritingChunk => {
376                    if frame.is_data() {
377                        let data = frame.data_ref().expect("Data frame has data");
378                        tracing::trace!(len = data.len(), "Writing chunk data");
379                        *this.inner_body_bytes_read_so_far += data.len();
380                        Poll::Ready(Some(Ok(frame)))
381                    } else {
382                        tracing::trace!(
383                            "No more chunk data, writing CRLF + CHUNK_TERMINATOR to end the data, and the first trailer frame"
384                        );
385
386                        // We exhausted the body data, now check if the length is correct
387                        if let Err(poll_stream_len_err) =
388                            http_1x_utils::check_for_stream_length_mismatch(
389                                *this.inner_body_bytes_read_so_far as u64,
390                                this.options.stream_length,
391                            )
392                        {
393                            return poll_stream_len_err;
394                        }
395
396                        *this.state = AwsChunkedBodyState::WritingTrailers;
397                        let trailers = frame.trailers_ref();
398
399                        // NOTE: there is a subtle logic bug here (which is present in the http-02x implementation as well)
400                        // The check for this error assumes that all trailers will come in a single trailer frame. Currently
401                        // I believe this will always be the case since the only thing we send trailers for in AwsChunked is
402                        // streaming checksums and that is a single trailer value. But it might not always be true. We should
403                        // fix this bug when we update the behavior here to match the actual spec.
404                        // The fix probably looks like returning Poll::Pending while we buffer all of the trailers and then
405                        // comparing the actual length to the expected length before returning a final frame containing all
406                        // of the trailers.
407                        let actual_length: u64 =
408                            http_1x_utils::total_rendered_length_of_trailers(trailers);
409                        let expected_length = this.options.total_trailer_length();
410                        if expected_length != actual_length {
411                            let err =
412                                Box::new(AwsChunkedBodyError::ReportedTrailerLengthMismatch {
413                                    actual: actual_length,
414                                    expected: expected_length,
415                                });
416                            return Poll::Ready(Some(Err(err)));
417                        }
418
419                        // Capacity = actual_length (in case all of the trailers specified in  come in AwsChunkedBodyOptions
420                        // come in the first trailer frame which is going to be the case most of the time in practice) + 7
421                        // (2 + 3) for the initial CRLF + CHUNK_TERMINATOR to end the chunked data + 2 for the final CRLF
422                        // ending the trailers section.
423                        let mut buf = BytesMut::with_capacity(actual_length as usize + 7);
424                        // End the final data chunk
425                        buf.extend_from_slice(&[CRLF_RAW, CHUNK_TERMINATOR_RAW].concat());
426
427                        // We transform the trailers into raw bytes. We can't write them with Frame::trailers
428                        // since we must include the CRLF + CHUNK_TERMINATOR that end the body and the CRLFs
429                        // after each trailer, so we write them as Frame::data
430                        let trailers = http_1x_utils::trailers_as_aws_chunked_bytes(trailers, buf);
431                        Poll::Ready(Some(Ok(http_body_1x::Frame::data(trailers.into()))))
432                    }
433                }
434                AwsChunkedBodyState::WritingTrailers => {
435                    let trailers = frame.trailers_ref();
436                    let actual_length: u64 =
437                        http_1x_utils::total_rendered_length_of_trailers(trailers);
438                    let buf = BytesMut::with_capacity(actual_length as usize + 7);
439                    let trailers = http_1x_utils::trailers_as_aws_chunked_bytes(trailers, buf);
440                    Poll::Ready(Some(Ok(http_body_1x::Frame::data(trailers.into()))))
441                }
442                AwsChunkedBodyState::Closed | AwsChunkedBodyState::WritingChunkSize => {
443                    unreachable!("{}", UNREACHABLE_STATES)
444                }
445            },
446            // InnerBody data exhausted, add finalizing bytes depending on current state
447            Poll::Ready(None) => {
448                let trailers = match *this.state {
449                    AwsChunkedBodyState::WritingChunk => {
450                        // We exhausted the body data, now check if the length is correct
451                        if let Err(poll_stream_len_err) =
452                            http_1x_utils::check_for_stream_length_mismatch(
453                                *this.inner_body_bytes_read_so_far as u64,
454                                this.options.stream_length,
455                            )
456                        {
457                            return poll_stream_len_err;
458                        }
459
460                        // Since we exhausted the body data, but are still in the WritingChunk state we did
461                        // not poll any trailer frames and we write the CRLF + Chunk terminator to begin the
462                        // trailer section plus a single final CRLF to end the (empty) trailer section
463                        let mut trailers = BytesMut::with_capacity(7);
464                        trailers.extend_from_slice(
465                            &[CRLF_RAW, CHUNK_TERMINATOR_RAW, CRLF_RAW].concat(),
466                        );
467                        trailers
468                    }
469                    AwsChunkedBodyState::WritingTrailers => {
470                        let mut trailers = BytesMut::with_capacity(2);
471                        trailers.extend_from_slice(CRLF_RAW);
472                        trailers
473                    }
474                    AwsChunkedBodyState::Closed | AwsChunkedBodyState::WritingChunkSize => {
475                        unreachable!("{}", UNREACHABLE_STATES)
476                    }
477                };
478
479                let frame = http_body_1x::Frame::data(trailers.into());
480                *this.state = AwsChunkedBodyState::Closed;
481                Poll::Ready(Some(Ok(frame)))
482            }
483            // Passthrough states
484            Poll::Pending => Poll::Pending,
485            Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
486        }
487    }
488}
489/// Utility functions to help with the [http_body_1x::Body] trait implementation
490mod http_1x_utils {
491    use std::task::Poll;
492
493    use super::{CRLF_RAW, TRAILER_SEPARATOR};
494    use bytes::{Bytes, BytesMut};
495    use http_1x::{HeaderMap, HeaderName};
496
497    /// Writes trailers out into a `string` and then converts that `String` to a `Bytes` before
498    /// returning.
499    ///
500    /// - Trailer names are separated by a single colon only, no space.
501    /// - Trailer names with multiple values will be written out one line per value, with the name
502    ///   appearing on each line.
503    pub(super) fn trailers_as_aws_chunked_bytes(
504        trailer_map: Option<&HeaderMap>,
505        mut buffer: BytesMut,
506    ) -> BytesMut {
507        if let Some(trailer_map) = trailer_map {
508            let mut current_header_name: Option<HeaderName> = None;
509
510            for (header_name, header_value) in trailer_map.clone().into_iter() {
511                // When a header has multiple values, the name only comes up in iteration the first time
512                // we see it. Therefore, we need to keep track of the last name we saw and fall back to
513                // it when `header_name == None`.
514                current_header_name = header_name.or(current_header_name);
515
516                // In practice, this will always exist, but `if let` is nicer than unwrap
517                if let Some(header_name) = current_header_name.as_ref() {
518                    buffer.extend_from_slice(header_name.as_ref());
519                    buffer.extend_from_slice(TRAILER_SEPARATOR);
520                    buffer.extend_from_slice(header_value.as_bytes());
521                    buffer.extend_from_slice(CRLF_RAW);
522                }
523            }
524
525            buffer
526        } else {
527            buffer
528        }
529    }
530
531    /// Given an optional `HeaderMap`, calculate the total number of bytes required to represent the
532    /// `HeaderMap`. If no `HeaderMap` is given as input, return 0.
533    ///
534    /// - Trailer names are separated by a single colon only, no space.
535    /// - Trailer names with multiple values will be written out one line per value, with the name
536    ///   appearing on each line.
537    pub(super) fn total_rendered_length_of_trailers(trailer_map: Option<&HeaderMap>) -> u64 {
538        match trailer_map {
539            Some(trailer_map) => trailer_map
540                .iter()
541                .map(|(trailer_name, trailer_value)| {
542                    trailer_name.as_str().len()
543                        + TRAILER_SEPARATOR.len()
544                        + trailer_value.len()
545                        + CRLF_RAW.len()
546                })
547                .sum::<usize>() as u64,
548            None => 0,
549        }
550    }
551
552    /// This is an ugly return type, but in practice it just returns `Ok(())` if the values match
553    /// and `Err(Poll::Ready(Some(Err(AwsChunkedBodyError::StreamLengthMismatch))))` if they don't
554    #[allow(clippy::type_complexity)]
555    pub(super) fn check_for_stream_length_mismatch(
556        actual_stream_length: u64,
557        expected_stream_length: u64,
558    ) -> Result<(), Poll<Option<Result<http_body_1x::Frame<Bytes>, aws_smithy_types::body::Error>>>>
559    {
560        if actual_stream_length != expected_stream_length {
561            let err = Box::new(super::AwsChunkedBodyError::StreamLengthMismatch {
562                actual: actual_stream_length,
563                expected: expected_stream_length,
564            });
565            return Err(Poll::Ready(Some(Err(err))));
566        };
567
568        Ok(())
569    }
570}
571
572/// Errors related to `AwsChunkedBody`
573#[derive(Debug)]
574enum AwsChunkedBodyError {
575    /// Error that occurs when the sum of `trailer_lengths` set when creating an `AwsChunkedBody` is
576    /// not equal to the actual length of the trailers returned by the inner `http_body::Body`
577    /// implementor. These trailer lengths are necessary in order to correctly calculate the total
578    /// size of the body for setting the content length header.
579    ReportedTrailerLengthMismatch { actual: u64, expected: u64 },
580    /// Error that occurs when the `stream_length` set when creating an `AwsChunkedBody` is not
581    /// equal to the actual length of the body returned by the inner `http_body::Body` implementor.
582    /// `stream_length` must be correct in order to set an accurate content length header.
583    StreamLengthMismatch { actual: u64, expected: u64 },
584}
585
586impl std::fmt::Display for AwsChunkedBodyError {
587    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
588        match self {
589            Self::ReportedTrailerLengthMismatch { actual, expected } => {
590                write!(f, "When creating this AwsChunkedBody, length of trailers was reported as {expected}. However, when double checking during trailer encoding, length was found to be {actual} instead.")
591            }
592            Self::StreamLengthMismatch { actual, expected } => {
593                write!(f, "When creating this AwsChunkedBody, stream length was reported as {expected}. However, when double checking during body encoding, length was found to be {actual} instead.")
594            }
595        }
596    }
597}
598
599impl std::error::Error for AwsChunkedBodyError {}
600
601// Used for finding how many hexadecimal digits it takes to represent a base 10 integer
602fn int_log16<T>(mut i: T) -> u64
603where
604    T: std::ops::DivAssign + PartialOrd + From<u8> + Copy,
605{
606    let mut len = 0;
607    let zero = T::from(0);
608    let sixteen = T::from(16);
609
610    while i > zero {
611        i /= sixteen;
612        len += 1;
613    }
614
615    len
616}
617
618fn get_unsigned_chunk_bytes_length(payload_length: u64) -> u64 {
619    let hex_repr_len = int_log16(payload_length);
620    hex_repr_len + CRLF.len() as u64 + payload_length + CRLF.len() as u64
621}
622
623#[cfg(test)]
624mod tests {
625
626    #[cfg(test)]
627    mod http_02x_tests {
628        use super::super::{
629            http_02x_utils::{total_rendered_length_of_trailers, trailers_as_aws_chunked_bytes},
630            AwsChunkedBody, AwsChunkedBodyOptions, CHUNK_TERMINATOR, CRLF,
631        };
632
633        use aws_smithy_types::body::SdkBody;
634        use bytes::{Buf, Bytes};
635        use bytes_utils::SegmentedBuf;
636        use http_02x::{HeaderMap, HeaderValue};
637        use http_body_04x::{Body, SizeHint};
638        use pin_project_lite::pin_project;
639
640        use std::io::Read;
641        use std::pin::Pin;
642        use std::task::{Context, Poll};
643        use std::time::Duration;
644
645        pin_project! {
646            struct SputteringBody {
647                parts: Vec<Option<Bytes>>,
648                cursor: usize,
649                delay_in_millis: u64,
650            }
651        }
652
653        impl SputteringBody {
654            fn len(&self) -> usize {
655                self.parts.iter().flatten().map(|b| b.len()).sum()
656            }
657        }
658
659        impl Body for SputteringBody {
660            type Data = Bytes;
661            type Error = aws_smithy_types::body::Error;
662
663            fn poll_data(
664                self: Pin<&mut Self>,
665                cx: &mut Context<'_>,
666            ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
667                if self.cursor == self.parts.len() {
668                    return Poll::Ready(None);
669                }
670
671                let this = self.project();
672                let delay_in_millis = *this.delay_in_millis;
673                let next_part = this.parts.get_mut(*this.cursor).unwrap().take();
674
675                match next_part {
676                    None => {
677                        *this.cursor += 1;
678                        let waker = cx.waker().clone();
679                        tokio::spawn(async move {
680                            tokio::time::sleep(Duration::from_millis(delay_in_millis)).await;
681                            waker.wake();
682                        });
683                        Poll::Pending
684                    }
685                    Some(data) => {
686                        *this.cursor += 1;
687                        Poll::Ready(Some(Ok(data)))
688                    }
689                }
690            }
691
692            fn poll_trailers(
693                self: Pin<&mut Self>,
694                _cx: &mut Context<'_>,
695            ) -> Poll<Result<Option<HeaderMap<HeaderValue>>, Self::Error>> {
696                Poll::Ready(Ok(None))
697            }
698
699            fn is_end_stream(&self) -> bool {
700                false
701            }
702
703            fn size_hint(&self) -> SizeHint {
704                SizeHint::new()
705            }
706        }
707
708        #[tokio::test]
709        async fn test_aws_chunked_encoding() {
710            let test_fut = async {
711                let input_str = "Hello world";
712                let opts = AwsChunkedBodyOptions::new(input_str.len() as u64, Vec::new());
713                let mut body = AwsChunkedBody::new(SdkBody::from(input_str), opts);
714
715                let mut output = SegmentedBuf::new();
716                while let Some(buf) = body.data().await {
717                    output.push(buf.unwrap());
718                }
719
720                let mut actual_output = String::new();
721                output
722                    .reader()
723                    .read_to_string(&mut actual_output)
724                    .expect("Doesn't cause IO errors");
725
726                let expected_output = "B\r\nHello world\r\n0\r\n\r\n";
727
728                assert_eq!(expected_output, actual_output);
729                assert!(
730                    body.trailers()
731                        .await
732                        .expect("no errors occurred during trailer polling")
733                        .is_none(),
734                    "aws-chunked encoded bodies don't have normal HTTP trailers"
735                );
736
737                // You can insert a `tokio::time::sleep` here to verify the timeout works as intended
738            };
739
740            let timeout_duration = Duration::from_secs(3);
741            if tokio::time::timeout(timeout_duration, test_fut)
742                .await
743                .is_err()
744            {
745                panic!("test_aws_chunked_encoding timed out after {timeout_duration:?}");
746            }
747        }
748
749        #[tokio::test]
750        async fn test_aws_chunked_encoding_sputtering_body() {
751            let test_fut = async {
752                let input = SputteringBody {
753                    parts: vec![
754                        Some(Bytes::from_static(b"chunk 1, ")),
755                        None,
756                        Some(Bytes::from_static(b"chunk 2, ")),
757                        Some(Bytes::from_static(b"chunk 3, ")),
758                        None,
759                        None,
760                        Some(Bytes::from_static(b"chunk 4, ")),
761                        Some(Bytes::from_static(b"chunk 5, ")),
762                        Some(Bytes::from_static(b"chunk 6")),
763                    ],
764                    cursor: 0,
765                    delay_in_millis: 500,
766                };
767                let opts = AwsChunkedBodyOptions::new(input.len() as u64, Vec::new());
768                let mut body = AwsChunkedBody::new(input, opts);
769
770                let mut output = SegmentedBuf::new();
771                while let Some(buf) = body.data().await {
772                    output.push(buf.unwrap());
773                }
774
775                let mut actual_output = String::new();
776                output
777                    .reader()
778                    .read_to_string(&mut actual_output)
779                    .expect("Doesn't cause IO errors");
780
781                let expected_output =
782                    "34\r\nchunk 1, chunk 2, chunk 3, chunk 4, chunk 5, chunk 6\r\n0\r\n\r\n";
783
784                assert_eq!(expected_output, actual_output);
785                assert!(
786                    body.trailers()
787                        .await
788                        .expect("no errors occurred during trailer polling")
789                        .is_none(),
790                    "aws-chunked encoded bodies don't have normal HTTP trailers"
791                );
792            };
793
794            let timeout_duration = Duration::from_secs(3);
795            if tokio::time::timeout(timeout_duration, test_fut)
796                .await
797                .is_err()
798            {
799                panic!(
800                "test_aws_chunked_encoding_sputtering_body timed out after {timeout_duration:?}"
801            );
802            }
803        }
804
805        #[tokio::test]
806        #[should_panic = "called `Result::unwrap()` on an `Err` value: ReportedTrailerLengthMismatch { actual: 44, expected: 0 }"]
807        async fn test_aws_chunked_encoding_incorrect_trailer_length_panic() {
808            let input_str = "Hello world";
809            // Test body has no trailers, so this length is incorrect and will trigger an assert panic
810            // When the panic occurs, it will actually expect a length of 44. This is because, when using
811            // aws-chunked encoding, each trailer will end with a CRLF which is 2 bytes long.
812            let wrong_trailer_len = 42;
813            let opts = AwsChunkedBodyOptions::new(input_str.len() as u64, vec![wrong_trailer_len]);
814            let mut body = AwsChunkedBody::new(SdkBody::from(input_str), opts);
815
816            // We don't care about the body contents but we have to read it all before checking for trailers
817            while let Some(buf) = body.data().await {
818                drop(buf.unwrap());
819            }
820
821            assert!(
822                body.trailers()
823                    .await
824                    .expect("no errors occurred during trailer polling")
825                    .is_none(),
826                "aws-chunked encoded bodies don't have normal HTTP trailers"
827            );
828        }
829
830        #[tokio::test]
831        async fn test_aws_chunked_encoding_empty_body() {
832            let input_str = "";
833            let opts = AwsChunkedBodyOptions::new(input_str.len() as u64, Vec::new());
834            let mut body = AwsChunkedBody::new(SdkBody::from(input_str), opts);
835
836            let mut output = SegmentedBuf::new();
837            while let Some(buf) = body.data().await {
838                output.push(buf.unwrap());
839            }
840
841            let mut actual_output = String::new();
842            output
843                .reader()
844                .read_to_string(&mut actual_output)
845                .expect("Doesn't cause IO errors");
846
847            let expected_output = [CHUNK_TERMINATOR, CRLF].concat();
848
849            assert_eq!(expected_output, actual_output);
850            assert!(
851                body.trailers()
852                    .await
853                    .expect("no errors occurred during trailer polling")
854                    .is_none(),
855                "aws-chunked encoded bodies don't have normal HTTP trailers"
856            );
857        }
858
859        #[tokio::test]
860        async fn test_total_rendered_length_of_trailers() {
861            let mut trailers = HeaderMap::new();
862
863            trailers.insert("empty_value", HeaderValue::from_static(""));
864
865            trailers.insert("single_value", HeaderValue::from_static("value 1"));
866
867            trailers.insert("two_values", HeaderValue::from_static("value 1"));
868            trailers.append("two_values", HeaderValue::from_static("value 2"));
869
870            trailers.insert("three_values", HeaderValue::from_static("value 1"));
871            trailers.append("three_values", HeaderValue::from_static("value 2"));
872            trailers.append("three_values", HeaderValue::from_static("value 3"));
873
874            let trailers = Some(trailers);
875            let actual_length = total_rendered_length_of_trailers(trailers.as_ref());
876            let expected_length =
877                (trailers_as_aws_chunked_bytes(trailers, actual_length).len()) as u64;
878
879            assert_eq!(expected_length, actual_length);
880        }
881
882        #[tokio::test]
883        async fn test_total_rendered_length_of_empty_trailers() {
884            let trailers = Some(HeaderMap::new());
885            let actual_length = total_rendered_length_of_trailers(trailers.as_ref());
886            let expected_length =
887                (trailers_as_aws_chunked_bytes(trailers, actual_length).len()) as u64;
888
889            assert_eq!(expected_length, actual_length);
890        }
891    }
892
893    #[cfg(test)]
894    mod http_1x_tests {
895        use super::super::{
896            http_1x_utils::{total_rendered_length_of_trailers, trailers_as_aws_chunked_bytes},
897            AwsChunkedBody, AwsChunkedBodyOptions, CHUNK_TERMINATOR_RAW, CRLF_RAW,
898        };
899
900        use aws_smithy_types::body::SdkBody;
901        use bytes::{Buf, Bytes, BytesMut};
902        use bytes_utils::SegmentedBuf;
903        use http_1x::{HeaderMap, HeaderValue};
904        use http_body_1x::{Body, Frame, SizeHint};
905        use http_body_util::BodyExt;
906        use pin_project_lite::pin_project;
907
908        use std::io::Read;
909        use std::pin::Pin;
910        use std::task::{Context, Poll};
911        use std::time::Duration;
912
913        pin_project! {
914            struct SputteringBody {
915                parts: Vec<Option<Bytes>>,
916                cursor: usize,
917                delay_in_millis: u64,
918            }
919        }
920
921        impl SputteringBody {
922            fn len(&self) -> usize {
923                self.parts.iter().flatten().map(|b| b.len()).sum()
924            }
925        }
926
927        impl Body for SputteringBody {
928            type Data = Bytes;
929            type Error = aws_smithy_types::body::Error;
930
931            fn poll_frame(
932                self: Pin<&mut Self>,
933                cx: &mut Context<'_>,
934            ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
935                if self.cursor == self.parts.len() {
936                    return Poll::Ready(None);
937                }
938
939                let this = self.project();
940                let delay_in_millis = *this.delay_in_millis;
941                let next_part = this.parts.get_mut(*this.cursor).unwrap().take();
942
943                match next_part {
944                    None => {
945                        *this.cursor += 1;
946                        let waker = cx.waker().clone();
947                        tokio::spawn(async move {
948                            tokio::time::sleep(Duration::from_millis(delay_in_millis)).await;
949                            waker.wake();
950                        });
951                        Poll::Pending
952                    }
953                    Some(data) => {
954                        *this.cursor += 1;
955                        let frame = Frame::data(data);
956                        Poll::Ready(Some(Ok(frame)))
957                    }
958                }
959            }
960
961            fn is_end_stream(&self) -> bool {
962                false
963            }
964
965            fn size_hint(&self) -> SizeHint {
966                SizeHint::new()
967            }
968        }
969
970        #[tokio::test]
971        async fn test_aws_chunked_encoding() {
972            let test_fut = async {
973                let input_str = "Hello world";
974                let opts = AwsChunkedBodyOptions::new(input_str.len() as u64, vec![]);
975                let mut body = AwsChunkedBody::new(SdkBody::from(input_str), opts);
976
977                let mut output = SegmentedBuf::new();
978                while let Some(Ok(buf)) = body.frame().await {
979                    output.push(buf.into_data().unwrap());
980                }
981
982                let mut actual_output = String::new();
983                output
984                    .reader()
985                    .read_to_string(&mut actual_output)
986                    .expect("Doesn't cause IO errors");
987
988                let expected_output = "B\r\nHello world\r\n0\r\n\r\n";
989
990                assert_eq!(expected_output, actual_output);
991
992                // You can insert a `tokio::time::sleep` here to verify the timeout works as intended
993            };
994
995            let timeout_duration = Duration::from_secs(3);
996            if tokio::time::timeout(timeout_duration, test_fut)
997                .await
998                .is_err()
999            {
1000                panic!("test_aws_chunked_encoding timed out after {timeout_duration:?}");
1001            }
1002        }
1003
1004        #[tokio::test]
1005        async fn test_aws_chunked_encoding_sputtering_body() {
1006            let test_fut = async {
1007                let input = SputteringBody {
1008                    parts: vec![
1009                        Some(Bytes::from_static(b"chunk 1, ")),
1010                        None,
1011                        Some(Bytes::from_static(b"chunk 2, ")),
1012                        Some(Bytes::from_static(b"chunk 3, ")),
1013                        None,
1014                        None,
1015                        Some(Bytes::from_static(b"chunk 4, ")),
1016                        Some(Bytes::from_static(b"chunk 5, ")),
1017                        Some(Bytes::from_static(b"chunk 6")),
1018                    ],
1019                    cursor: 0,
1020                    delay_in_millis: 500,
1021                };
1022                let opts = AwsChunkedBodyOptions::new(input.len() as u64, vec![]);
1023                let mut body = AwsChunkedBody::new(input, opts);
1024
1025                let mut output = SegmentedBuf::new();
1026                while let Some(Ok(buf)) = body.frame().await {
1027                    output.push(buf.into_data().unwrap());
1028                }
1029
1030                let mut actual_output = String::new();
1031                output
1032                    .reader()
1033                    .read_to_string(&mut actual_output)
1034                    .expect("Doesn't cause IO errors");
1035
1036                let expected_output =
1037                    "34\r\nchunk 1, chunk 2, chunk 3, chunk 4, chunk 5, chunk 6\r\n0\r\n\r\n";
1038
1039                assert_eq!(expected_output, actual_output);
1040            };
1041
1042            let timeout_duration = Duration::from_secs(3);
1043            if tokio::time::timeout(timeout_duration, test_fut)
1044                .await
1045                .is_err()
1046            {
1047                panic!(
1048                "test_aws_chunked_encoding_sputtering_body timed out after {timeout_duration:?}"
1049            );
1050            }
1051        }
1052
1053        #[tokio::test]
1054        async fn test_aws_chunked_encoding_incorrect_trailer_length_panic() {
1055            let input_str = "Hello world";
1056            // Test body has no trailers, so this length is incorrect and will trigger an assert panic
1057            // When the panic occurs, it will actually expect a length of 44. This is because, when using
1058            // aws-chunked encoding, each trailer will end with a CRLF which is 2 bytes long.
1059            let wrong_trailer_len = 42;
1060            let opts = AwsChunkedBodyOptions::new(input_str.len() as u64, vec![wrong_trailer_len]);
1061            let mut body = AwsChunkedBody::new(SdkBody::from(input_str), opts);
1062
1063            // We don't care about the body contents but we have to read it all before checking for trailers
1064            while let Some(Ok(frame)) = body.frame().await {
1065                assert!(!frame.is_trailers());
1066            }
1067        }
1068
1069        #[tokio::test]
1070        async fn test_aws_chunked_encoding_empty_body() {
1071            let input_str = "";
1072            let opts = AwsChunkedBodyOptions::new(input_str.len() as u64, vec![]);
1073            let mut body = AwsChunkedBody::new(SdkBody::from(input_str), opts);
1074
1075            let mut output = SegmentedBuf::new();
1076            while let Some(Ok(frame)) = body.frame().await {
1077                output.push(frame.into_data().unwrap());
1078            }
1079
1080            let mut actual_output = String::new();
1081            output
1082                .reader()
1083                .read_to_string(&mut actual_output)
1084                .expect("Doesn't cause IO errors");
1085
1086            let actual_output = std::str::from_utf8(actual_output.as_bytes()).unwrap();
1087            let expected_output = [CHUNK_TERMINATOR_RAW, CRLF_RAW].concat();
1088            let expected_output = std::str::from_utf8(&expected_output).unwrap();
1089
1090            assert_eq!(expected_output, actual_output);
1091        }
1092
1093        #[tokio::test]
1094        async fn test_total_rendered_length_of_trailers() {
1095            let mut trailers = HeaderMap::new();
1096
1097            trailers.insert("empty_value", HeaderValue::from_static(""));
1098
1099            trailers.insert("single_value", HeaderValue::from_static("value 1"));
1100
1101            trailers.insert("two_values", HeaderValue::from_static("value 1"));
1102            trailers.append("two_values", HeaderValue::from_static("value 2"));
1103
1104            trailers.insert("three_values", HeaderValue::from_static("value 1"));
1105            trailers.append("three_values", HeaderValue::from_static("value 2"));
1106            trailers.append("three_values", HeaderValue::from_static("value 3"));
1107
1108            let trailers = Some(&trailers);
1109            let actual_length = total_rendered_length_of_trailers(trailers);
1110            let buf = BytesMut::with_capacity(actual_length as usize);
1111            let expected_length = (trailers_as_aws_chunked_bytes(trailers, buf).len()) as u64;
1112
1113            assert_eq!(expected_length, actual_length);
1114        }
1115
1116        #[tokio::test]
1117        async fn test_total_rendered_length_of_empty_trailers() {
1118            let header_map = HeaderMap::new();
1119            let trailers = Some(&header_map);
1120            let actual_length = total_rendered_length_of_trailers(trailers);
1121            let buf = BytesMut::with_capacity(actual_length as usize);
1122            let expected_length = (trailers_as_aws_chunked_bytes(trailers, buf).len()) as u64;
1123
1124            assert_eq!(expected_length, actual_length);
1125        }
1126    }
1127}