aws_smithy_compression/
body.rs

1/*
2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6//! HTTP body-wrappers that perform request compression
7
8// Putting this in a `mod` since I expect we'll have to handle response
9// decompression some day.
10/// Functionality for compressing an HTTP request body.
11pub mod compress {
12    use aws_smithy_types::body::SdkBody;
13    use pin_project_lite::pin_project;
14
15    pin_project! {
16        /// A `Body` that may compress its data with a `CompressRequest` implementor.
17        ///
18        /// Compression options may disable request compression for small data payload, or entirely.
19        /// Additionally, some services may not support compression.
20        pub struct CompressedBody<InnerBody, CompressionImpl> {
21            #[pin]
22            body: InnerBody,
23            compress_request: CompressionImpl,
24            is_end_stream: bool,
25        }
26    }
27
28    impl<CR> CompressedBody<SdkBody, CR> {
29        /// Given an [`SdkBody`] and a `Box<dyn CompressRequest>`, create a new `CompressedBody<SdkBody, CR>`.
30        pub fn new(body: SdkBody, compress_request: CR) -> Self {
31            Self {
32                body,
33                compress_request,
34                is_end_stream: false,
35            }
36        }
37    }
38
39    /// Support for the `http-body-0-4` and `http-0-2` crates.
40    #[cfg(feature = "http-body-0-4-x")]
41    pub mod http_body_0_4_x {
42        use super::CompressedBody;
43        use crate::http::http_body_0_4_x::CompressRequest;
44        use aws_smithy_runtime_api::box_error::BoxError;
45        use aws_smithy_types::body::SdkBody;
46        use http_0_2::HeaderMap;
47        use http_body_0_4::{Body, SizeHint};
48        use std::pin::Pin;
49        use std::task::{Context, Poll};
50
51        impl Body for CompressedBody<SdkBody, Box<dyn CompressRequest>> {
52            type Data = bytes::Bytes;
53            type Error = aws_smithy_types::body::Error;
54
55            fn poll_data(
56                self: Pin<&mut Self>,
57                cx: &mut Context<'_>,
58            ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
59                let this = self.project();
60                match this.body.poll_data(cx)? {
61                    Poll::Ready(Some(data)) => {
62                        let mut out = Vec::new();
63                        this.compress_request.compress_bytes(&data[..], &mut out)?;
64                        Poll::Ready(Some(Ok(out.into())))
65                    }
66                    Poll::Ready(None) => {
67                        *this.is_end_stream = true;
68                        Poll::Ready(None)
69                    }
70                    Poll::Pending => Poll::Pending,
71                }
72            }
73
74            fn poll_trailers(
75                self: Pin<&mut Self>,
76                cx: &mut Context<'_>,
77            ) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
78                let this = self.project();
79                this.body.poll_trailers(cx)
80            }
81
82            fn is_end_stream(&self) -> bool {
83                self.is_end_stream
84            }
85
86            fn size_hint(&self) -> SizeHint {
87                // We can't return a hint because we don't know exactly how
88                // compression will affect the content length
89                SizeHint::default()
90            }
91        }
92
93        impl CompressedBody<SdkBody, Box<dyn CompressRequest>> {
94            /// Consumes this `CompressedBody` and returns an [`SdkBody`] containing the compressed data.
95            ///
96            /// This *requires* that the inner `SdkBody` is in-memory (i.e. not streaming). Otherwise, an error is returned.
97            /// If compression fails, an error is returned.
98            pub fn into_compressed_sdk_body(mut self) -> Result<SdkBody, BoxError> {
99                let mut compressed_body = Vec::new();
100                let bytes = self.body.bytes().ok_or_else(|| "`into_compressed_sdk_body` requires that the inner body is 'in-memory', but it was streaming".to_string())?;
101
102                self.compress_request
103                    .compress_bytes(bytes, &mut compressed_body)?;
104                Ok(SdkBody::from(compressed_body))
105            }
106        }
107    }
108
109    /// Support for the `http-body-1-0` and `http-1-0` crates.
110    #[cfg(feature = "http-body-1-x")]
111    pub mod http_body_1_x {
112        use crate::body::compress::CompressedBody;
113        use crate::http::http_body_1_x::CompressRequest;
114        use aws_smithy_types::body::SdkBody;
115        use http_body_1_0::{Body, Frame, SizeHint};
116        use std::pin::Pin;
117        use std::task::{ready, Context, Poll};
118
119        impl Body for CompressedBody<SdkBody, Box<dyn CompressRequest>> {
120            type Data = bytes::Bytes;
121            type Error = aws_smithy_types::body::Error;
122
123            fn poll_frame(
124                mut self: Pin<&mut Self>,
125                cx: &mut Context<'_>,
126            ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
127                let this = self.as_mut().project();
128                Poll::Ready(match ready!(this.body.poll_frame(cx)) {
129                    Some(Ok(f)) => {
130                        if f.is_data() {
131                            let d = f.into_data().expect("we checked for data first");
132                            let mut out = Vec::new();
133                            this.compress_request.compress_bytes(&d, &mut out)?;
134                            Some(Ok(Frame::data(out.into())))
135                        } else if f.is_trailers() {
136                            // Trailers don't get compressed.
137                            Some(Ok(f))
138                        } else {
139                            unreachable!("Frame is either data or trailers")
140                        }
141                    }
142                    None => {
143                        *this.is_end_stream = true;
144                        None
145                    }
146                    other => other,
147                })
148            }
149
150            fn is_end_stream(&self) -> bool {
151                self.is_end_stream
152            }
153
154            fn size_hint(&self) -> SizeHint {
155                // We can't return a hint because we don't know exactly how
156                // compression will affect the content length
157                SizeHint::default()
158            }
159        }
160    }
161}
162
163#[cfg(any(feature = "http-body-0-4-x", feature = "http-body-1-x"))]
164#[cfg(test)]
165mod test {
166    use crate::body::compress::CompressedBody;
167    use crate::{CompressionAlgorithm, CompressionOptions};
168    use aws_smithy_types::body::SdkBody;
169    use bytes::Buf;
170    use bytes_utils::SegmentedBuf;
171    use std::io::Read;
172    const UNCOMPRESSED_INPUT: &[u8] = b"hello world";
173    const COMPRESSED_OUTPUT: &[u8] = &[
174        31, 139, 8, 0, 0, 0, 0, 0, 0, 255, 203, 72, 205, 201, 201, 87, 40, 207, 47, 202, 73, 1, 0,
175        133, 17, 74, 13, 11, 0, 0, 0,
176    ];
177
178    #[cfg(feature = "http-body-0-4-x")]
179    mod http_body_0_4_x {
180        use super::*;
181        use http_body_0_4::Body;
182
183        #[tokio::test]
184        async fn test_body_is_compressed() {
185            let compression_options = CompressionOptions::default()
186                .with_min_compression_size_bytes(0)
187                .unwrap();
188            let compress_request =
189                CompressionAlgorithm::Gzip.into_impl_http_body_0_4_x(&compression_options);
190            let body = SdkBody::from(UNCOMPRESSED_INPUT);
191            let mut compressed_body = CompressedBody::new(body, compress_request);
192
193            let mut output = SegmentedBuf::new();
194            while let Some(buf) = compressed_body.data().await {
195                output.push(buf.unwrap());
196            }
197
198            let mut actual_output = Vec::new();
199            output
200                .reader()
201                .read_to_end(&mut actual_output)
202                .expect("Doesn't cause IO errors");
203            // Verify data is compressed as expected
204            assert_eq!(COMPRESSED_OUTPUT, actual_output);
205        }
206
207        #[tokio::test]
208        async fn test_into_compressed_sdk_body() {
209            let compression_options = CompressionOptions::default()
210                .with_min_compression_size_bytes(0)
211                .unwrap();
212            let compress_request =
213                CompressionAlgorithm::Gzip.into_impl_http_body_0_4_x(&compression_options);
214            let body = SdkBody::from(UNCOMPRESSED_INPUT);
215            let compressed_sdk_body = CompressedBody::new(body, compress_request)
216                .into_compressed_sdk_body()
217                .unwrap();
218
219            // Verify data is compressed as expected
220            assert_eq!(
221                COMPRESSED_OUTPUT,
222                compressed_sdk_body.bytes().expect("body is in-memory")
223            );
224        }
225    }
226
227    #[cfg(feature = "http-body-1-x")]
228    mod http_body_1_x {
229        use super::*;
230        use http_body_util::BodyExt;
231
232        #[tokio::test]
233        async fn test_body_is_compressed() {
234            let compression_options = CompressionOptions::default()
235                .with_min_compression_size_bytes(0)
236                .unwrap();
237            let compress_request =
238                CompressionAlgorithm::Gzip.into_impl_http_body_1_x(&compression_options);
239            let body = SdkBody::from(UNCOMPRESSED_INPUT);
240            let mut compressed_body = CompressedBody::new(body, compress_request);
241
242            let mut output = SegmentedBuf::new();
243
244            loop {
245                let data = match compressed_body.frame().await {
246                    Some(Ok(frame)) => frame.into_data(),
247                    Some(Err(e)) => panic!("Error: {}", e),
248                    // No more frames, break out of loop
249                    None => break,
250                }
251                .expect("frame is OK");
252                output.push(data);
253            }
254
255            let mut actual_output = Vec::new();
256            output
257                .reader()
258                .read_to_end(&mut actual_output)
259                .expect("Doesn't cause IO errors");
260            // Verify data is compressed as expected
261            assert_eq!(COMPRESSED_OUTPUT, actual_output);
262        }
263    }
264}