100 100 | // Don't wrap a body if compression is disabled.
|
101 101 | if !options.is_enabled() {
|
102 102 | tracing::trace!("request compression is disabled and will not be applied");
|
103 103 | return Ok(());
|
104 104 | }
|
105 105 |
|
106 106 | // Don't wrap a body if it's below the minimum size
|
107 107 | //
|
108 108 | // Because compressing small amounts of data can actually increase its size,
|
109 109 | // we check to see if the data is big enough to make compression worthwhile.
|
110 - | let size_hint = http_body::Body::size_hint(request.body()).exact();
|
110 + | let size_hint = http_body_1x::Body::size_hint(request.body()).exact();
|
111 111 | if let Some(known_size) = size_hint {
|
112 112 | if known_size < options.min_compression_size_bytes() as u64 {
|
113 113 | tracing::trace!(
|
114 114 | min_compression_size_bytes = options.min_compression_size_bytes(),
|
115 115 | known_size,
|
116 116 | "request body is below minimum size and will not be compressed"
|
117 117 | );
|
118 118 | return Ok(());
|
119 119 | }
|
120 120 | tracing::trace!("compressing sized request body...");
|
121 121 | } else {
|
122 122 | tracing::trace!("compressing unsized request body...");
|
123 123 | }
|
124 124 |
|
125 - | wrap_request_body_in_compressed_body(request, CompressionAlgorithm::Gzip.into_impl_http_body_0_4_x(&options))?;
|
125 + | wrap_request_body_in_compressed_body(request, CompressionAlgorithm::Gzip.into_impl_http_body_1_x(&options))?;
|
126 126 | cfg.interceptor_state()
|
127 127 | .store_append::<SmithySdkFeature>(SmithySdkFeature::GzipRequestCompression);
|
128 128 |
|
129 129 | Ok(())
|
130 130 | }
|
131 131 | }
|
132 132 |
|
133 133 | fn wrap_request_body_in_compressed_body(request: &mut HttpRequest, request_compress_impl: Box<dyn CompressRequest>) -> Result<(), BuildError> {
|
134 134 | request
|
135 135 | .headers_mut()
|
136 136 | .append(request_compress_impl.header_name(), request_compress_impl.header_value());
|
137 137 | let mut body = {
|
138 138 | let body = mem::replace(request.body_mut(), SdkBody::taken());
|
139 139 |
|
140 140 | if body.is_streaming() {
|
141 - | request.headers_mut().remove(http::header::CONTENT_LENGTH);
|
141 + | request.headers_mut().remove(http_1x::header::CONTENT_LENGTH);
|
142 142 | body.map(move |body| {
|
143 143 | let body = CompressedBody::new(body, request_compress_impl.clone());
|
144 - | SdkBody::from_body_0_4(body)
|
144 + | SdkBody::from_body_1_x(body)
|
145 145 | })
|
146 146 | } else {
|
147 147 | let body = CompressedBody::new(body, request_compress_impl.clone());
|
148 148 | let body = body.into_compressed_sdk_body().map_err(BuildError::other)?;
|
149 149 |
|
150 150 | let content_length = body.content_length().expect("this payload is in-memory");
|
151 - | request.headers_mut().insert(http::header::CONTENT_LENGTH, content_length.to_string());
|
151 + | request.headers_mut().insert(http_1x::header::CONTENT_LENGTH, content_length.to_string());
|
152 152 |
|
153 153 | body
|
154 154 | }
|
155 155 | };
|
156 156 | mem::swap(request.body_mut(), &mut body);
|
157 157 |
|
158 158 | Ok(())
|
159 159 | }
|
160 160 |
|
161 161 | #[derive(Debug, Copy, Clone, Default)]
|
162 162 | pub(crate) struct DisableRequestCompression(pub(crate) bool);
|
163 163 |
|
164 164 | impl From<bool> for DisableRequestCompression {
|
165 165 | fn from(value: bool) -> Self {
|
166 166 | DisableRequestCompression(value)
|
167 167 | }
|
168 168 | }
|
169 169 |
|
170 170 | impl Storable for DisableRequestCompression {
|
171 171 | type Storer = StoreReplace<Self>;
|
172 172 | }
|
173 173 |
|
174 174 | #[derive(Debug, Copy, Clone)]
|
175 175 | pub(crate) struct RequestMinCompressionSizeBytes(pub(crate) u32);
|
176 176 |
|
177 177 | impl Default for RequestMinCompressionSizeBytes {
|
178 178 | fn default() -> Self {
|
179 179 | RequestMinCompressionSizeBytes(10240)
|
180 180 | }
|
181 181 | }
|
182 182 |
|
183 183 | impl From<u32> for RequestMinCompressionSizeBytes {
|
184 184 | fn from(value: u32) -> Self {
|
185 185 | RequestMinCompressionSizeBytes(value)
|
186 186 | }
|
187 187 | }
|
188 188 |
|
189 189 | impl Storable for RequestMinCompressionSizeBytes {
|
190 190 | type Storer = StoreReplace<Self>;
|
191 191 | }
|
192 192 |
|
193 193 | #[cfg(test)]
|
194 194 | mod tests {
|
195 195 | use super::wrap_request_body_in_compressed_body;
|
196 196 | use crate::client_request_compression::{RequestCompressionInterceptor, RequestMinCompressionSizeBytes};
|
197 197 | use aws_smithy_compression::{CompressionAlgorithm, CompressionOptions};
|
198 198 | use aws_smithy_runtime::client::sdk_feature::SmithySdkFeature;
|
199 199 | use aws_smithy_runtime_api::client::interceptors::context::{Input, InterceptorContext};
|
200 200 | use aws_smithy_runtime_api::client::interceptors::Intercept;
|
201 201 | use aws_smithy_runtime_api::client::orchestrator::HttpRequest;
|
202 202 | use aws_smithy_runtime_api::client::runtime_components::RuntimeComponentsBuilder;
|
203 203 | use aws_smithy_types::body::SdkBody;
|
204 204 | use aws_smithy_types::config_bag::{ConfigBag, Layer};
|
205 - | use http_body::Body;
|
205 + | use http_body_util::BodyExt;
|
206 206 |
|
207 207 | const UNCOMPRESSED_INPUT: &[u8] = b"hello world";
|
208 208 | const COMPRESSED_OUTPUT: &[u8] = &[
|
209 209 | 31, 139, 8, 0, 0, 0, 0, 0, 0, 255, 203, 72, 205, 201, 201, 87, 40, 207, 47, 202, 73, 1, 0, 133, 17, 74, 13, 11, 0, 0, 0,
|
210 210 | ];
|
211 211 |
|
212 212 | #[tokio::test]
|
213 213 | async fn test_compressed_body_is_retryable() {
|
214 - | let mut request: HttpRequest = http::Request::builder()
|
214 + | let mut request: HttpRequest = http_1x::Request::builder()
|
215 215 | .body(SdkBody::retryable(move || SdkBody::from(UNCOMPRESSED_INPUT)))
|
216 216 | .unwrap()
|
217 217 | .try_into()
|
218 218 | .unwrap();
|
219 219 |
|
220 220 | // ensure original SdkBody is retryable
|
221 221 | let mut body = request.body().try_clone().unwrap();
|
222 222 | let mut body_data = Vec::new();
|
223 - | while let Some(data) = body.data().await {
|
224 - | body_data.extend_from_slice(&data.unwrap())
|
223 + | while let Some(Ok(frame)) = body.frame().await {
|
224 + | let data = frame.into_data().expect("Data frame");
|
225 + | body_data.extend_from_slice(&data)
|
225 226 | }
|
226 227 | // Not yet wrapped, should still be the same as UNCOMPRESSED_INPUT.
|
227 228 | assert_eq!(UNCOMPRESSED_INPUT, body_data);
|
228 229 |
|
229 230 | let compression_algorithm = CompressionAlgorithm::Gzip;
|
230 231 | let compression_options = CompressionOptions::default().with_min_compression_size_bytes(0).unwrap();
|
231 232 |
|
232 - | wrap_request_body_in_compressed_body(&mut request, compression_algorithm.into_impl_http_body_0_4_x(&compression_options)).unwrap();
|
233 + | wrap_request_body_in_compressed_body(&mut request, compression_algorithm.into_impl_http_body_1_x(&compression_options)).unwrap();
|
233 234 |
|
234 235 | // ensure again that wrapped SdkBody is retryable
|
235 236 | let mut body = request.body().try_clone().expect("body is retryable");
|
236 237 | let mut body_data = Vec::new();
|
237 - | while let Some(data) = body.data().await {
|
238 - | body_data.extend_from_slice(&data.unwrap())
|
238 + | while let Some(Ok(frame)) = body.frame().await {
|
239 + | let data = frame.into_data().expect("Data frame");
|
240 + | body_data.extend_from_slice(&data)
|
239 241 | }
|
240 242 |
|
241 243 | // Since this body was wrapped, the output should be compressed data
|
242 244 | assert_ne!(UNCOMPRESSED_INPUT, body_data.as_slice());
|
243 245 | assert_eq!(COMPRESSED_OUTPUT, body_data.as_slice());
|
244 246 | }
|
245 247 |
|
246 248 | fn context() -> InterceptorContext {
|
247 249 | let mut context = InterceptorContext::new(Input::doesnt_matter());
|
248 250 | context.enter_serialization_phase();
|
249 251 | context.set_request(
|
250 - | http::Request::builder()
|
252 + | http_1x::Request::builder()
|
251 253 | .body(SdkBody::from(UNCOMPRESSED_INPUT))
|
252 254 | .unwrap()
|
253 255 | .try_into()
|
254 256 | .unwrap(),
|
255 257 | );
|
256 258 | let _ = context.take_input();
|
257 259 | context.enter_before_transmit_phase();
|
258 260 | context
|
259 261 | }
|
260 262 |
|