aws_smithy_compression/
body.rs1pub mod compress {
12 use aws_smithy_types::body::SdkBody;
13 use pin_project_lite::pin_project;
14
15 pin_project! {
16 pub struct CompressedBody<InnerBody, CompressionImpl> {
21 #[pin]
22 body: InnerBody,
23 compress_request: CompressionImpl,
24 is_end_stream: bool,
25 }
26 }
27
28 impl<CR> CompressedBody<SdkBody, CR> {
29 pub fn new(body: SdkBody, compress_request: CR) -> Self {
31 Self {
32 body,
33 compress_request,
34 is_end_stream: false,
35 }
36 }
37 }
38
39 #[cfg(feature = "http-body-0-4-x")]
41 pub mod http_body_0_4_x {
42 use super::CompressedBody;
43 use crate::http::http_body_0_4_x::CompressRequest;
44 use aws_smithy_runtime_api::box_error::BoxError;
45 use aws_smithy_types::body::SdkBody;
46 use http_0_2::HeaderMap;
47 use http_body_0_4::{Body, SizeHint};
48 use std::pin::Pin;
49 use std::task::{Context, Poll};
50
51 impl Body for CompressedBody<SdkBody, Box<dyn CompressRequest>> {
52 type Data = bytes::Bytes;
53 type Error = aws_smithy_types::body::Error;
54
55 fn poll_data(
56 self: Pin<&mut Self>,
57 cx: &mut Context<'_>,
58 ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
59 let this = self.project();
60 match this.body.poll_data(cx)? {
61 Poll::Ready(Some(data)) => {
62 let mut out = Vec::new();
63 this.compress_request.compress_bytes(&data[..], &mut out)?;
64 Poll::Ready(Some(Ok(out.into())))
65 }
66 Poll::Ready(None) => {
67 *this.is_end_stream = true;
68 Poll::Ready(None)
69 }
70 Poll::Pending => Poll::Pending,
71 }
72 }
73
74 fn poll_trailers(
75 self: Pin<&mut Self>,
76 cx: &mut Context<'_>,
77 ) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
78 let this = self.project();
79 this.body.poll_trailers(cx)
80 }
81
82 fn is_end_stream(&self) -> bool {
83 self.is_end_stream
84 }
85
86 fn size_hint(&self) -> SizeHint {
87 SizeHint::default()
90 }
91 }
92
93 impl CompressedBody<SdkBody, Box<dyn CompressRequest>> {
94 pub fn into_compressed_sdk_body(mut self) -> Result<SdkBody, BoxError> {
99 let mut compressed_body = Vec::new();
100 let bytes = self.body.bytes().ok_or_else(|| "`into_compressed_sdk_body` requires that the inner body is 'in-memory', but it was streaming".to_string())?;
101
102 self.compress_request
103 .compress_bytes(bytes, &mut compressed_body)?;
104 Ok(SdkBody::from(compressed_body))
105 }
106 }
107 }
108
109 #[cfg(feature = "http-body-1-x")]
111 pub mod http_body_1_x {
112 use crate::body::compress::CompressedBody;
113 use crate::http::http_body_1_x::CompressRequest;
114 use aws_smithy_types::body::SdkBody;
115 use http_body_1_0::{Body, Frame, SizeHint};
116 use std::pin::Pin;
117 use std::task::{ready, Context, Poll};
118
119 impl Body for CompressedBody<SdkBody, Box<dyn CompressRequest>> {
120 type Data = bytes::Bytes;
121 type Error = aws_smithy_types::body::Error;
122
123 fn poll_frame(
124 mut self: Pin<&mut Self>,
125 cx: &mut Context<'_>,
126 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
127 let this = self.as_mut().project();
128 Poll::Ready(match ready!(this.body.poll_frame(cx)) {
129 Some(Ok(f)) => {
130 if f.is_data() {
131 let d = f.into_data().expect("we checked for data first");
132 let mut out = Vec::new();
133 this.compress_request.compress_bytes(&d, &mut out)?;
134 Some(Ok(Frame::data(out.into())))
135 } else if f.is_trailers() {
136 Some(Ok(f))
138 } else {
139 unreachable!("Frame is either data or trailers")
140 }
141 }
142 None => {
143 *this.is_end_stream = true;
144 None
145 }
146 other => other,
147 })
148 }
149
150 fn is_end_stream(&self) -> bool {
151 self.is_end_stream
152 }
153
154 fn size_hint(&self) -> SizeHint {
155 SizeHint::default()
158 }
159 }
160 }
161}
162
163#[cfg(any(feature = "http-body-0-4-x", feature = "http-body-1-x"))]
164#[cfg(test)]
165mod test {
166 use crate::body::compress::CompressedBody;
167 use crate::{CompressionAlgorithm, CompressionOptions};
168 use aws_smithy_types::body::SdkBody;
169 use bytes::Buf;
170 use bytes_utils::SegmentedBuf;
171 use std::io::Read;
172 const UNCOMPRESSED_INPUT: &[u8] = b"hello world";
173 const COMPRESSED_OUTPUT: &[u8] = &[
174 31, 139, 8, 0, 0, 0, 0, 0, 0, 255, 203, 72, 205, 201, 201, 87, 40, 207, 47, 202, 73, 1, 0,
175 133, 17, 74, 13, 11, 0, 0, 0,
176 ];
177
178 #[cfg(feature = "http-body-0-4-x")]
179 mod http_body_0_4_x {
180 use super::*;
181 use http_body_0_4::Body;
182
183 #[tokio::test]
184 async fn test_body_is_compressed() {
185 let compression_options = CompressionOptions::default()
186 .with_min_compression_size_bytes(0)
187 .unwrap();
188 let compress_request =
189 CompressionAlgorithm::Gzip.into_impl_http_body_0_4_x(&compression_options);
190 let body = SdkBody::from(UNCOMPRESSED_INPUT);
191 let mut compressed_body = CompressedBody::new(body, compress_request);
192
193 let mut output = SegmentedBuf::new();
194 while let Some(buf) = compressed_body.data().await {
195 output.push(buf.unwrap());
196 }
197
198 let mut actual_output = Vec::new();
199 output
200 .reader()
201 .read_to_end(&mut actual_output)
202 .expect("Doesn't cause IO errors");
203 assert_eq!(COMPRESSED_OUTPUT, actual_output);
205 }
206
207 #[tokio::test]
208 async fn test_into_compressed_sdk_body() {
209 let compression_options = CompressionOptions::default()
210 .with_min_compression_size_bytes(0)
211 .unwrap();
212 let compress_request =
213 CompressionAlgorithm::Gzip.into_impl_http_body_0_4_x(&compression_options);
214 let body = SdkBody::from(UNCOMPRESSED_INPUT);
215 let compressed_sdk_body = CompressedBody::new(body, compress_request)
216 .into_compressed_sdk_body()
217 .unwrap();
218
219 assert_eq!(
221 COMPRESSED_OUTPUT,
222 compressed_sdk_body.bytes().expect("body is in-memory")
223 );
224 }
225 }
226
227 #[cfg(feature = "http-body-1-x")]
228 mod http_body_1_x {
229 use super::*;
230 use http_body_util::BodyExt;
231
232 #[tokio::test]
233 async fn test_body_is_compressed() {
234 let compression_options = CompressionOptions::default()
235 .with_min_compression_size_bytes(0)
236 .unwrap();
237 let compress_request =
238 CompressionAlgorithm::Gzip.into_impl_http_body_1_x(&compression_options);
239 let body = SdkBody::from(UNCOMPRESSED_INPUT);
240 let mut compressed_body = CompressedBody::new(body, compress_request);
241
242 let mut output = SegmentedBuf::new();
243
244 loop {
245 let data = match compressed_body.frame().await {
246 Some(Ok(frame)) => frame.into_data(),
247 Some(Err(e)) => panic!("Error: {}", e),
248 None => break,
250 }
251 .expect("frame is OK");
252 output.push(data);
253 }
254
255 let mut actual_output = Vec::new();
256 output
257 .reader()
258 .read_to_end(&mut actual_output)
259 .expect("Doesn't cause IO errors");
260 assert_eq!(COMPRESSED_OUTPUT, actual_output);
262 }
263 }
264}