aws_smithy_compression/
body.rs

1/*
2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6//! HTTP body-wrappers that perform request compression
7
8// Putting this in a `mod` since I expect we'll have to handle response
9// decompression some day.
10/// Functionality for compressing an HTTP request body.
11pub mod compress {
12    use aws_smithy_types::body::SdkBody;
13    use pin_project_lite::pin_project;
14
15    pin_project! {
16        /// A `Body` that may compress its data with a `CompressRequest` implementor.
17        ///
18        /// Compression options may disable request compression for small data payload, or entirely.
19        /// Additionally, some services may not support compression.
20        pub struct CompressedBody<InnerBody, CompressionImpl> {
21            #[pin]
22            body: InnerBody,
23            compress_request: CompressionImpl,
24            is_end_stream: bool,
25        }
26    }
27
28    impl<CR> CompressedBody<SdkBody, CR> {
29        /// Given an [`SdkBody`] and a `Box<dyn CompressRequest>`, create a new `CompressedBody<SdkBody, CR>`.
30        pub fn new(body: SdkBody, compress_request: CR) -> Self {
31            Self {
32                body,
33                compress_request,
34                is_end_stream: false,
35            }
36        }
37    }
38
39    /// Support for the `http-body-1-0` and `http-1-0` crates.
40    pub mod http_body_1_x {
41        use crate::body::compress::CompressedBody;
42        use crate::http::CompressRequest;
43        use aws_smithy_runtime_api::box_error::BoxError;
44        use aws_smithy_types::body::SdkBody;
45        use http_body_1x::{Body, Frame, SizeHint};
46        use std::pin::Pin;
47        use std::task::{ready, Context, Poll};
48
49        impl Body for CompressedBody<SdkBody, Box<dyn CompressRequest>> {
50            type Data = bytes::Bytes;
51            type Error = aws_smithy_types::body::Error;
52
53            fn poll_frame(
54                mut self: Pin<&mut Self>,
55                cx: &mut Context<'_>,
56            ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
57                let this = self.as_mut().project();
58                Poll::Ready(match ready!(this.body.poll_frame(cx)) {
59                    Some(Ok(f)) => {
60                        if f.is_data() {
61                            let d = f.into_data().expect("we checked for data first");
62                            let mut out = Vec::new();
63                            this.compress_request.compress_bytes(&d, &mut out)?;
64                            Some(Ok(Frame::data(out.into())))
65                        } else if f.is_trailers() {
66                            // Trailers don't get compressed.
67                            Some(Ok(f))
68                        } else {
69                            unreachable!("Frame is either data or trailers")
70                        }
71                    }
72                    None => {
73                        *this.is_end_stream = true;
74                        None
75                    }
76                    other => other,
77                })
78            }
79
80            fn is_end_stream(&self) -> bool {
81                self.is_end_stream
82            }
83
84            fn size_hint(&self) -> SizeHint {
85                // We can't return a hint because we don't know exactly how
86                // compression will affect the content length
87                SizeHint::default()
88            }
89        }
90        impl CompressedBody<SdkBody, Box<dyn CompressRequest>> {
91            /// Consumes this `CompressedBody` and returns an [`SdkBody`] containing the compressed data.
92            ///
93            /// This *requires* that the inner `SdkBody` is in-memory (i.e. not streaming). Otherwise, an error is returned.
94            /// If compression fails, an error is returned.
95            pub fn into_compressed_sdk_body(mut self) -> Result<SdkBody, BoxError> {
96                let mut compressed_body = Vec::new();
97                let bytes = self.body.bytes().ok_or_else(|| "`into_compressed_sdk_body` requires that the inner body is 'in-memory', but it was streaming".to_string())?;
98
99                self.compress_request
100                    .compress_bytes(bytes, &mut compressed_body)?;
101                Ok(SdkBody::from(compressed_body))
102            }
103        }
104    }
105}
106
107#[cfg(test)]
108mod test {
109    use crate::body::compress::CompressedBody;
110    use crate::{CompressionAlgorithm, CompressionOptions};
111    use aws_smithy_types::body::SdkBody;
112    use bytes::Buf;
113    use bytes_utils::SegmentedBuf;
114    use std::io::Read;
115    const UNCOMPRESSED_INPUT: &[u8] = b"hello world";
116    const COMPRESSED_OUTPUT: &[u8] = &[
117        31, 139, 8, 0, 0, 0, 0, 0, 0, 255, 203, 72, 205, 201, 201, 87, 40, 207, 47, 202, 73, 1, 0,
118        133, 17, 74, 13, 11, 0, 0, 0,
119    ];
120
121    use http_body_util::BodyExt;
122
123    #[tokio::test]
124    async fn test_body_is_compressed() {
125        let compression_options = CompressionOptions::default()
126            .with_min_compression_size_bytes(0)
127            .unwrap();
128        let compress_request =
129            CompressionAlgorithm::Gzip.into_impl_http_body_1_x(&compression_options);
130        let body = SdkBody::from(UNCOMPRESSED_INPUT);
131        let mut compressed_body = CompressedBody::new(body, compress_request);
132
133        let mut output = SegmentedBuf::new();
134
135        loop {
136            let data = match compressed_body.frame().await {
137                Some(Ok(frame)) => frame.into_data(),
138                Some(Err(e)) => panic!("Error: {}", e),
139                // No more frames, break out of loop
140                None => break,
141            }
142            .expect("frame is OK");
143            output.push(data);
144        }
145
146        let mut actual_output = Vec::new();
147        output
148            .reader()
149            .read_to_end(&mut actual_output)
150            .expect("Doesn't cause IO errors");
151        // Verify data is compressed as expected
152        assert_eq!(COMPRESSED_OUTPUT, actual_output);
153    }
154}