121 121 | let request = ctx.request_mut();
|
122 122 |
|
123 123 | let mut body = {
|
124 124 | let body = std::mem::replace(request.body_mut(), SdkBody::taken());
|
125 125 | let opt = cfg
|
126 126 | .get_mut_from_interceptor_state::<AwsChunkedBodyOptions>()
|
127 127 | .ok_or_else(|| BuildError::other("AwsChunkedBodyOptions missing from config bag"))?;
|
128 128 | let aws_chunked_body_options = std::mem::take(opt);
|
129 129 | body.map(move |body| {
|
130 130 | let body = AwsChunkedBody::new(body, aws_chunked_body_options.clone());
|
131 - | SdkBody::from_body_0_4(body)
|
131 + | SdkBody::from_body_1_x(body)
|
132 132 | })
|
133 133 | };
|
134 134 |
|
135 135 | std::mem::swap(request.body_mut(), &mut body);
|
136 136 |
|
137 137 | Ok(())
|
138 138 | }
|
139 139 | }
|
140 140 |
|
141 141 | // Determine if chunked encoding must not be used; returns true when any of the following is true:
|
142 142 | // - If the body is in-memory
|
143 143 | // - If chunked encoding is disabled via `AwsChunkedBodyOptions`
|
144 144 | fn must_not_use_chunked_encoding(request: &Request, cfg: &ConfigBag) -> bool {
|
145 145 | match (request.body().bytes(), cfg.load::<AwsChunkedBodyOptions>()) {
|
146 146 | (Some(_), _) => true,
|
147 147 | (_, Some(options)) if options.disabled() => true,
|
148 148 | _ => false,
|
149 149 | }
|
150 150 | }
|
151 151 |
|
152 152 | #[cfg(test)]
|
153 153 | mod tests {
|
154 154 | use super::*;
|
155 155 | use aws_smithy_runtime_api::client::interceptors::context::{BeforeTransmitInterceptorContextMut, Input, InterceptorContext};
|
156 156 | use aws_smithy_runtime_api::client::orchestrator::HttpRequest;
|
157 157 | use aws_smithy_runtime_api::client::runtime_components::RuntimeComponentsBuilder;
|
158 158 | use aws_smithy_types::byte_stream::ByteStream;
|
159 159 | use bytes::BytesMut;
|
160 - | use http_body::Body;
|
160 + | use http_body_util::BodyExt;
|
161 161 | use tempfile::NamedTempFile;
|
162 162 |
|
163 163 | #[tokio::test]
|
164 164 | async fn test_aws_chunked_body_is_retryable() {
|
165 165 | use std::io::Write;
|
166 166 | let mut file = NamedTempFile::new().unwrap();
|
167 167 |
|
168 168 | for i in 0..10000 {
|
169 169 | let line = format!("This is a large file created for testing purposes {}", i);
|
170 170 | file.as_file_mut().write_all(line.as_bytes()).unwrap();
|
171 171 | }
|
172 172 |
|
173 173 | let stream_length = file.as_file().metadata().unwrap().len();
|
174 174 | let request = HttpRequest::new(ByteStream::read_from().path(&file).buffer_size(1024).build().await.unwrap().into_inner());
|
175 175 |
|
176 176 | // ensure original SdkBody is retryable
|
177 177 | assert!(request.body().try_clone().is_some());
|
178 178 |
|
179 179 | let interceptor = AwsChunkedContentEncodingInterceptor;
|
180 180 | let mut cfg = ConfigBag::base();
|
181 181 | cfg.interceptor_state()
|
182 182 | .store_put(AwsChunkedBodyOptions::default().with_stream_length(stream_length));
|
183 183 | let runtime_components = RuntimeComponentsBuilder::for_tests().build().unwrap();
|
184 184 | let mut ctx = InterceptorContext::new(Input::doesnt_matter());
|
185 185 | ctx.enter_serialization_phase();
|
186 186 | let _ = ctx.take_input();
|
187 187 | ctx.set_request(request);
|
188 188 | ctx.enter_before_transmit_phase();
|
189 189 | let mut ctx: BeforeTransmitInterceptorContextMut<'_> = (&mut ctx).into();
|
190 190 | interceptor.modify_before_transmit(&mut ctx, &runtime_components, &mut cfg).unwrap();
|
191 191 |
|
192 192 | // ensure wrapped SdkBody is retryable
|
193 193 | let mut body = ctx.request().body().try_clone().expect("body is retryable");
|
194 194 |
|
195 195 | let mut body_data = BytesMut::new();
|
196 - | while let Some(data) = body.data().await {
|
197 - | body_data.extend_from_slice(&data.unwrap())
|
196 + | while let Some(Ok(frame)) = body.frame().await {
|
197 + | if frame.is_data() {
|
198 + | let data = frame.into_data().unwrap();
|
199 + | body_data.extend_from_slice(&data);
|
200 + | }
|
198 201 | }
|
199 202 | let body_str = std::str::from_utf8(&body_data).unwrap();
|
200 - |
|
201 203 | let expected = "This is a large file created for testing purposes 9999\r\n0\r\n\r\n";
|
202 204 | assert!(body_str.ends_with(expected), "expected '{body_str}' to end with '{expected}'");
|
203 205 | }
|
204 206 |
|
205 207 | #[tokio::test]
|
206 208 | async fn test_short_circuit_modify_before_signing() {
|
207 209 | let mut ctx = InterceptorContext::new(Input::doesnt_matter());
|
208 210 | ctx.enter_serialization_phase();
|
209 211 | let _ = ctx.take_input();
|
210 212 | let request = HttpRequest::new(SdkBody::from("in-memory body, must not use chunked encoding"));
|
211 213 | ctx.set_request(request);
|
212 214 | ctx.enter_before_transmit_phase();
|
213 215 | let mut ctx: BeforeTransmitInterceptorContextMut<'_> = (&mut ctx).into();
|
214 216 |
|
215 217 | let runtime_components = RuntimeComponentsBuilder::for_tests().build().unwrap();
|
216 218 |
|
217 219 | let mut cfg = ConfigBag::base();
|
218 220 | cfg.interceptor_state().store_put(AwsChunkedBodyOptions::default());
|
219 221 |
|
220 222 | let interceptor = AwsChunkedContentEncodingInterceptor;
|
221 223 | interceptor.modify_before_signing(&mut ctx, &runtime_components, &mut cfg).unwrap();
|
222 224 |
|
223 225 | let request = ctx.request();
|
224 226 | assert!(request.headers().get(header::CONTENT_ENCODING).is_none());
|
225 227 | assert!(request
|
226 228 | .headers()
|
227 229 | .get(header::HeaderName::from_static(X_AMZ_DECODED_CONTENT_LENGTH))
|
228 230 | .is_none());
|
229 231 | }
|
230 232 |
|
231 233 | #[tokio::test]
|
232 234 | async fn test_short_circuit_modify_before_transmit() {
|
233 235 | let mut ctx = InterceptorContext::new(Input::doesnt_matter());
|
234 236 | ctx.enter_serialization_phase();
|
235 237 | let _ = ctx.take_input();
|
236 238 | let request = HttpRequest::new(SdkBody::from("in-memory body, must not use chunked encoding"));
|
237 239 | ctx.set_request(request);
|
238 240 | ctx.enter_before_transmit_phase();
|
239 241 | let mut ctx: BeforeTransmitInterceptorContextMut<'_> = (&mut ctx).into();
|
240 242 |
|
241 243 | let runtime_components = RuntimeComponentsBuilder::for_tests().build().unwrap();
|
242 244 |
|
243 245 | let mut cfg = ConfigBag::base();
|
244 246 | // Don't need to set the stream length properly because we expect the body won't be wrapped by `AwsChunkedBody`.
|
245 247 | cfg.interceptor_state().store_put(AwsChunkedBodyOptions::default());
|
246 248 |
|
247 249 | let interceptor = AwsChunkedContentEncodingInterceptor;
|
248 250 | interceptor.modify_before_transmit(&mut ctx, &runtime_components, &mut cfg).unwrap();
|
249 251 |
|
250 252 | let mut body = ctx.request().body().try_clone().expect("body is retryable");
|
251 253 |
|
252 254 | let mut body_data = BytesMut::new();
|
253 - | while let Some(data) = body.data().await {
|
254 - | body_data.extend_from_slice(&data.unwrap())
|
255 + | while let Some(Ok(frame)) = body.frame().await {
|
256 + | if frame.is_data() {
|
257 + | let data = frame.into_data().unwrap();
|
258 + | body_data.extend_from_slice(&data);
|
259 + | }
|
255 260 | }
|
256 261 | let body_str = std::str::from_utf8(&body_data).unwrap();
|
257 262 | // Also implies that `assert!(!body_str.ends_with("0\r\n\r\n"));`, i.e., shouldn't see chunked encoding epilogue.
|
258 263 | assert_eq!("in-memory body, must not use chunked encoding", body_str);
|
259 264 | }
|
260 265 |
|
261 266 | #[test]
|
262 267 | fn test_must_not_use_chunked_encoding_with_in_memory_body() {
|
263 268 | let request = HttpRequest::new(SdkBody::from("test body"));
|
264 269 | let cfg = ConfigBag::base();
|