aws_smithy_compression/
body.rs1pub mod compress {
12    use aws_smithy_types::body::SdkBody;
13    use pin_project_lite::pin_project;
14
15    pin_project! {
16        pub struct CompressedBody<InnerBody, CompressionImpl> {
21            #[pin]
22            body: InnerBody,
23            compress_request: CompressionImpl,
24            is_end_stream: bool,
25        }
26    }
27
28    impl<CR> CompressedBody<SdkBody, CR> {
29        pub fn new(body: SdkBody, compress_request: CR) -> Self {
31            Self {
32                body,
33                compress_request,
34                is_end_stream: false,
35            }
36        }
37    }
38
39    #[cfg(feature = "http-body-0-4-x")]
41    pub mod http_body_0_4_x {
42        use super::CompressedBody;
43        use crate::http::http_body_0_4_x::CompressRequest;
44        use aws_smithy_runtime_api::box_error::BoxError;
45        use aws_smithy_types::body::SdkBody;
46        use http_0_2::HeaderMap;
47        use http_body_0_4::{Body, SizeHint};
48        use std::pin::Pin;
49        use std::task::{Context, Poll};
50
51        impl Body for CompressedBody<SdkBody, Box<dyn CompressRequest>> {
52            type Data = bytes::Bytes;
53            type Error = aws_smithy_types::body::Error;
54
55            fn poll_data(
56                self: Pin<&mut Self>,
57                cx: &mut Context<'_>,
58            ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
59                let this = self.project();
60                match this.body.poll_data(cx)? {
61                    Poll::Ready(Some(data)) => {
62                        let mut out = Vec::new();
63                        this.compress_request.compress_bytes(&data[..], &mut out)?;
64                        Poll::Ready(Some(Ok(out.into())))
65                    }
66                    Poll::Ready(None) => {
67                        *this.is_end_stream = true;
68                        Poll::Ready(None)
69                    }
70                    Poll::Pending => Poll::Pending,
71                }
72            }
73
74            fn poll_trailers(
75                self: Pin<&mut Self>,
76                cx: &mut Context<'_>,
77            ) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
78                let this = self.project();
79                this.body.poll_trailers(cx)
80            }
81
82            fn is_end_stream(&self) -> bool {
83                self.is_end_stream
84            }
85
86            fn size_hint(&self) -> SizeHint {
87                SizeHint::default()
90            }
91        }
92
93        impl CompressedBody<SdkBody, Box<dyn CompressRequest>> {
94            pub fn into_compressed_sdk_body(mut self) -> Result<SdkBody, BoxError> {
99                let mut compressed_body = Vec::new();
100                let bytes = self.body.bytes().ok_or_else(|| "`into_compressed_sdk_body` requires that the inner body is 'in-memory', but it was streaming".to_string())?;
101
102                self.compress_request
103                    .compress_bytes(bytes, &mut compressed_body)?;
104                Ok(SdkBody::from(compressed_body))
105            }
106        }
107    }
108
109    #[cfg(feature = "http-body-1-x")]
111    pub mod http_body_1_x {
112        use crate::body::compress::CompressedBody;
113        use crate::http::http_body_1_x::CompressRequest;
114        use aws_smithy_types::body::SdkBody;
115        use http_body_1_0::{Body, Frame, SizeHint};
116        use std::pin::Pin;
117        use std::task::{ready, Context, Poll};
118
119        impl Body for CompressedBody<SdkBody, Box<dyn CompressRequest>> {
120            type Data = bytes::Bytes;
121            type Error = aws_smithy_types::body::Error;
122
123            fn poll_frame(
124                mut self: Pin<&mut Self>,
125                cx: &mut Context<'_>,
126            ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
127                let this = self.as_mut().project();
128                Poll::Ready(match ready!(this.body.poll_frame(cx)) {
129                    Some(Ok(f)) => {
130                        if f.is_data() {
131                            let d = f.into_data().expect("we checked for data first");
132                            let mut out = Vec::new();
133                            this.compress_request.compress_bytes(&d, &mut out)?;
134                            Some(Ok(Frame::data(out.into())))
135                        } else if f.is_trailers() {
136                            Some(Ok(f))
138                        } else {
139                            unreachable!("Frame is either data or trailers")
140                        }
141                    }
142                    None => {
143                        *this.is_end_stream = true;
144                        None
145                    }
146                    other => other,
147                })
148            }
149
150            fn is_end_stream(&self) -> bool {
151                self.is_end_stream
152            }
153
154            fn size_hint(&self) -> SizeHint {
155                SizeHint::default()
158            }
159        }
160    }
161}
162
163#[cfg(any(feature = "http-body-0-4-x", feature = "http-body-1-x"))]
164#[cfg(test)]
165mod test {
166    use crate::body::compress::CompressedBody;
167    use crate::{CompressionAlgorithm, CompressionOptions};
168    use aws_smithy_types::body::SdkBody;
169    use bytes::Buf;
170    use bytes_utils::SegmentedBuf;
171    use std::io::Read;
172    const UNCOMPRESSED_INPUT: &[u8] = b"hello world";
173    const COMPRESSED_OUTPUT: &[u8] = &[
174        31, 139, 8, 0, 0, 0, 0, 0, 0, 255, 203, 72, 205, 201, 201, 87, 40, 207, 47, 202, 73, 1, 0,
175        133, 17, 74, 13, 11, 0, 0, 0,
176    ];
177
178    #[cfg(feature = "http-body-0-4-x")]
179    mod http_body_0_4_x {
180        use super::*;
181        use http_body_0_4::Body;
182
183        #[tokio::test]
184        async fn test_body_is_compressed() {
185            let compression_options = CompressionOptions::default()
186                .with_min_compression_size_bytes(0)
187                .unwrap();
188            let compress_request =
189                CompressionAlgorithm::Gzip.into_impl_http_body_0_4_x(&compression_options);
190            let body = SdkBody::from(UNCOMPRESSED_INPUT);
191            let mut compressed_body = CompressedBody::new(body, compress_request);
192
193            let mut output = SegmentedBuf::new();
194            while let Some(buf) = compressed_body.data().await {
195                output.push(buf.unwrap());
196            }
197
198            let mut actual_output = Vec::new();
199            output
200                .reader()
201                .read_to_end(&mut actual_output)
202                .expect("Doesn't cause IO errors");
203            assert_eq!(COMPRESSED_OUTPUT, actual_output);
205        }
206
207        #[tokio::test]
208        async fn test_into_compressed_sdk_body() {
209            let compression_options = CompressionOptions::default()
210                .with_min_compression_size_bytes(0)
211                .unwrap();
212            let compress_request =
213                CompressionAlgorithm::Gzip.into_impl_http_body_0_4_x(&compression_options);
214            let body = SdkBody::from(UNCOMPRESSED_INPUT);
215            let compressed_sdk_body = CompressedBody::new(body, compress_request)
216                .into_compressed_sdk_body()
217                .unwrap();
218
219            assert_eq!(
221                COMPRESSED_OUTPUT,
222                compressed_sdk_body.bytes().expect("body is in-memory")
223            );
224        }
225    }
226
227    #[cfg(feature = "http-body-1-x")]
228    mod http_body_1_x {
229        use super::*;
230        use http_body_util::BodyExt;
231
232        #[tokio::test]
233        async fn test_body_is_compressed() {
234            let compression_options = CompressionOptions::default()
235                .with_min_compression_size_bytes(0)
236                .unwrap();
237            let compress_request =
238                CompressionAlgorithm::Gzip.into_impl_http_body_1_x(&compression_options);
239            let body = SdkBody::from(UNCOMPRESSED_INPUT);
240            let mut compressed_body = CompressedBody::new(body, compress_request);
241
242            let mut output = SegmentedBuf::new();
243
244            loop {
245                let data = match compressed_body.frame().await {
246                    Some(Ok(frame)) => frame.into_data(),
247                    Some(Err(e)) => panic!("Error: {}", e),
248                    None => break,
250                }
251                .expect("frame is OK");
252                output.push(data);
253            }
254
255            let mut actual_output = Vec::new();
256            output
257                .reader()
258                .read_to_end(&mut actual_output)
259                .expect("Doesn't cause IO errors");
260            assert_eq!(COMPRESSED_OUTPUT, actual_output);
262        }
263    }
264}