29 29 | /// Given an [`SdkBody`] and a `Box<dyn CompressRequest>`, create a new `CompressedBody<SdkBody, CR>`.
|
30 30 | pub fn new(body: SdkBody, compress_request: CR) -> Self {
|
31 31 | Self {
|
32 32 | body,
|
33 33 | compress_request,
|
34 34 | is_end_stream: false,
|
35 35 | }
|
36 36 | }
|
37 37 | }
|
38 38 |
|
39 - | /// Support for the `http-body-0-4` and `http-0-2` crates.
|
40 - | #[cfg(feature = "http-body-0-4-x")]
|
41 - | pub mod http_body_0_4_x {
|
42 - | use super::CompressedBody;
|
43 - | use crate::http::http_body_0_4_x::CompressRequest;
|
44 - | use aws_smithy_runtime_api::box_error::BoxError;
|
45 - | use aws_smithy_types::body::SdkBody;
|
46 - | use http_0_2::HeaderMap;
|
47 - | use http_body_0_4::{Body, SizeHint};
|
48 - | use std::pin::Pin;
|
49 - | use std::task::{Context, Poll};
|
50 - |
|
51 - | impl Body for CompressedBody<SdkBody, Box<dyn CompressRequest>> {
|
52 - | type Data = bytes::Bytes;
|
53 - | type Error = aws_smithy_types::body::Error;
|
54 - |
|
55 - | fn poll_data(
|
56 - | self: Pin<&mut Self>,
|
57 - | cx: &mut Context<'_>,
|
58 - | ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
|
59 - | let this = self.project();
|
60 - | match this.body.poll_data(cx)? {
|
61 - | Poll::Ready(Some(data)) => {
|
62 - | let mut out = Vec::new();
|
63 - | this.compress_request.compress_bytes(&data[..], &mut out)?;
|
64 - | Poll::Ready(Some(Ok(out.into())))
|
65 - | }
|
66 - | Poll::Ready(None) => {
|
67 - | *this.is_end_stream = true;
|
68 - | Poll::Ready(None)
|
69 - | }
|
70 - | Poll::Pending => Poll::Pending,
|
71 - | }
|
72 - | }
|
73 - |
|
74 - | fn poll_trailers(
|
75 - | self: Pin<&mut Self>,
|
76 - | cx: &mut Context<'_>,
|
77 - | ) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
|
78 - | let this = self.project();
|
79 - | this.body.poll_trailers(cx)
|
80 - | }
|
81 - |
|
82 - | fn is_end_stream(&self) -> bool {
|
83 - | self.is_end_stream
|
84 - | }
|
85 - |
|
86 - | fn size_hint(&self) -> SizeHint {
|
87 - | // We can't return a hint because we don't know exactly how
|
88 - | // compression will affect the content length
|
89 - | SizeHint::default()
|
90 - | }
|
91 - | }
|
92 - |
|
93 - | impl CompressedBody<SdkBody, Box<dyn CompressRequest>> {
|
94 - | /// Consumes this `CompressedBody` and returns an [`SdkBody`] containing the compressed data.
|
95 - | ///
|
96 - | /// This *requires* that the inner `SdkBody` is in-memory (i.e. not streaming). Otherwise, an error is returned.
|
97 - | /// If compression fails, an error is returned.
|
98 - | pub fn into_compressed_sdk_body(mut self) -> Result<SdkBody, BoxError> {
|
99 - | let mut compressed_body = Vec::new();
|
100 - | let bytes = self.body.bytes().ok_or_else(|| "`into_compressed_sdk_body` requires that the inner body is 'in-memory', but it was streaming".to_string())?;
|
101 - |
|
102 - | self.compress_request
|
103 - | .compress_bytes(bytes, &mut compressed_body)?;
|
104 - | Ok(SdkBody::from(compressed_body))
|
105 - | }
|
106 - | }
|
107 - | }
|
108 - |
|
109 39 | /// Support for the `http-body-1-0` and `http-1-0` crates.
|
110 - | #[cfg(feature = "http-body-1-x")]
|
111 40 | pub mod http_body_1_x {
|
112 41 | use crate::body::compress::CompressedBody;
|
113 - | use crate::http::http_body_1_x::CompressRequest;
|
42 + | use crate::http::CompressRequest;
|
43 + | use aws_smithy_runtime_api::box_error::BoxError;
|
114 44 | use aws_smithy_types::body::SdkBody;
|
115 - | use http_body_1_0::{Body, Frame, SizeHint};
|
45 + | use http_body_1x::{Body, Frame, SizeHint};
|
116 46 | use std::pin::Pin;
|
117 47 | use std::task::{ready, Context, Poll};
|
118 48 |
|
119 49 | impl Body for CompressedBody<SdkBody, Box<dyn CompressRequest>> {
|
120 50 | type Data = bytes::Bytes;
|
121 51 | type Error = aws_smithy_types::body::Error;
|
122 52 |
|
123 53 | fn poll_frame(
|
124 54 | mut self: Pin<&mut Self>,
|
125 55 | cx: &mut Context<'_>,
|
126 56 | ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
|
127 57 | let this = self.as_mut().project();
|
128 58 | Poll::Ready(match ready!(this.body.poll_frame(cx)) {
|
129 59 | Some(Ok(f)) => {
|
130 60 | if f.is_data() {
|
131 61 | let d = f.into_data().expect("we checked for data first");
|
132 62 | let mut out = Vec::new();
|
133 63 | this.compress_request.compress_bytes(&d, &mut out)?;
|
134 64 | Some(Ok(Frame::data(out.into())))
|
135 65 | } else if f.is_trailers() {
|
136 66 | // Trailers don't get compressed.
|
137 67 | Some(Ok(f))
|
138 68 | } else {
|
139 69 | unreachable!("Frame is either data or trailers")
|
140 70 | }
|
141 71 | }
|
142 72 | None => {
|
143 73 | *this.is_end_stream = true;
|
144 74 | None
|
145 75 | }
|
146 76 | other => other,
|
147 77 | })
|
148 78 | }
|
149 79 |
|
150 80 | fn is_end_stream(&self) -> bool {
|
151 81 | self.is_end_stream
|
152 82 | }
|
153 83 |
|
154 84 | fn size_hint(&self) -> SizeHint {
|
155 85 | // We can't return a hint because we don't know exactly how
|
156 86 | // compression will affect the content length
|
157 87 | SizeHint::default()
|
158 88 | }
|
159 89 | }
|
90 + | impl CompressedBody<SdkBody, Box<dyn CompressRequest>> {
|
91 + | /// Consumes this `CompressedBody` and returns an [`SdkBody`] containing the compressed data.
|
92 + | ///
|
93 + | /// This *requires* that the inner `SdkBody` is in-memory (i.e. not streaming). Otherwise, an error is returned.
|
94 + | /// If compression fails, an error is returned.
|
95 + | pub fn into_compressed_sdk_body(mut self) -> Result<SdkBody, BoxError> {
|
96 + | let mut compressed_body = Vec::new();
|
97 + | let bytes = self.body.bytes().ok_or_else(|| "`into_compressed_sdk_body` requires that the inner body is 'in-memory', but it was streaming".to_string())?;
|
98 + |
|
99 + | self.compress_request
|
100 + | .compress_bytes(bytes, &mut compressed_body)?;
|
101 + | Ok(SdkBody::from(compressed_body))
|
102 + | }
|
103 + | }
|
160 104 | }
|
161 105 | }
|
162 106 |
|
163 - | #[cfg(any(feature = "http-body-0-4-x", feature = "http-body-1-x"))]
|
164 107 | #[cfg(test)]
|
165 108 | mod test {
|
166 109 | use crate::body::compress::CompressedBody;
|
167 110 | use crate::{CompressionAlgorithm, CompressionOptions};
|
168 111 | use aws_smithy_types::body::SdkBody;
|
169 112 | use bytes::Buf;
|
170 113 | use bytes_utils::SegmentedBuf;
|
171 114 | use std::io::Read;
|
172 115 | const UNCOMPRESSED_INPUT: &[u8] = b"hello world";
|
173 116 | const COMPRESSED_OUTPUT: &[u8] = &[
|
174 117 | 31, 139, 8, 0, 0, 0, 0, 0, 0, 255, 203, 72, 205, 201, 201, 87, 40, 207, 47, 202, 73, 1, 0,
|
175 118 | 133, 17, 74, 13, 11, 0, 0, 0,
|
176 119 | ];
|
177 120 |
|
178 - | #[cfg(feature = "http-body-0-4-x")]
|
179 - | mod http_body_0_4_x {
|
180 - | use super::*;
|
181 - | use http_body_0_4::Body;
|
182 - |
|
183 - | #[tokio::test]
|
184 - | async fn test_body_is_compressed() {
|
185 - | let compression_options = CompressionOptions::default()
|
186 - | .with_min_compression_size_bytes(0)
|
187 - | .unwrap();
|
188 - | let compress_request =
|
189 - | CompressionAlgorithm::Gzip.into_impl_http_body_0_4_x(&compression_options);
|
190 - | let body = SdkBody::from(UNCOMPRESSED_INPUT);
|
191 - | let mut compressed_body = CompressedBody::new(body, compress_request);
|
192 - |
|
193 - | let mut output = SegmentedBuf::new();
|
194 - | while let Some(buf) = compressed_body.data().await {
|
195 - | output.push(buf.unwrap());
|
196 - | }
|
197 - |
|
198 - | let mut actual_output = Vec::new();
|
199 - | output
|
200 - | .reader()
|
201 - | .read_to_end(&mut actual_output)
|
202 - | .expect("Doesn't cause IO errors");
|
203 - | // Verify data is compressed as expected
|
204 - | assert_eq!(COMPRESSED_OUTPUT, actual_output);
|
205 - | }
|
206 - |
|
207 - | #[tokio::test]
|
208 - | async fn test_into_compressed_sdk_body() {
|
209 - | let compression_options = CompressionOptions::default()
|
210 - | .with_min_compression_size_bytes(0)
|
211 - | .unwrap();
|
212 - | let compress_request =
|
213 - | CompressionAlgorithm::Gzip.into_impl_http_body_0_4_x(&compression_options);
|
214 - | let body = SdkBody::from(UNCOMPRESSED_INPUT);
|
215 - | let compressed_sdk_body = CompressedBody::new(body, compress_request)
|
216 - | .into_compressed_sdk_body()
|
217 - | .unwrap();
|
218 - |
|
219 - | // Verify data is compressed as expected
|
220 - | assert_eq!(
|
221 - | COMPRESSED_OUTPUT,
|
222 - | compressed_sdk_body.bytes().expect("body is in-memory")
|
223 - | );
|
224 - | }
|
225 - | }
|
226 - |
|
227 - | #[cfg(feature = "http-body-1-x")]
|
228 - | mod http_body_1_x {
|
229 - | use super::*;
|
230 121 | use http_body_util::BodyExt;
|
231 122 |
|
232 123 | #[tokio::test]
|
233 124 | async fn test_body_is_compressed() {
|
234 125 | let compression_options = CompressionOptions::default()
|
235 126 | .with_min_compression_size_bytes(0)
|
236 127 | .unwrap();
|
237 128 | let compress_request =
|
238 129 | CompressionAlgorithm::Gzip.into_impl_http_body_1_x(&compression_options);
|
239 130 | let body = SdkBody::from(UNCOMPRESSED_INPUT);
|
240 131 | let mut compressed_body = CompressedBody::new(body, compress_request);
|
241 132 |
|
242 133 | let mut output = SegmentedBuf::new();
|
243 134 |
|
244 135 | loop {
|
245 136 | let data = match compressed_body.frame().await {
|
246 137 | Some(Ok(frame)) => frame.into_data(),
|
247 138 | Some(Err(e)) => panic!("Error: {}", e),
|
248 139 | // No more frames, break out of loop
|
249 140 | None => break,
|
250 141 | }
|
251 142 | .expect("frame is OK");
|
252 143 | output.push(data);
|
253 144 | }
|
254 145 |
|
255 146 | let mut actual_output = Vec::new();
|
256 147 | output
|
257 148 | .reader()
|
258 149 | .read_to_end(&mut actual_output)
|
259 150 | .expect("Doesn't cause IO errors");
|
260 151 | // Verify data is compressed as expected
|
261 152 | assert_eq!(COMPRESSED_OUTPUT, actual_output);
|
262 153 | }
|
263 - | }
|
264 154 | }
|