1 - | // Code generated by software.amazon.smithy.rust.codegen.smithy-rs. DO NOT EDIT.
|
2 - | /*
|
3 - | * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
4 - | * SPDX-License-Identifier: Apache-2.0
|
5 - | */
|
6 - |
|
7 - | #![allow(dead_code)]
|
8 - |
|
9 - | use std::fmt;
|
10 - |
|
11 - | use aws_runtime::{
|
12 - | auth::PayloadSigningOverride,
|
13 - | content_encoding::{header_value::AWS_CHUNKED, AwsChunkedBody, AwsChunkedBodyOptions},
|
14 - | };
|
15 - | use aws_smithy_runtime_api::{
|
16 - | box_error::BoxError,
|
17 - | client::{
|
18 - | interceptors::{context::BeforeTransmitInterceptorContextMut, Intercept},
|
19 - | runtime_components::RuntimeComponents,
|
20 - | },
|
21 - | http::Request,
|
22 - | };
|
23 - | use aws_smithy_types::{body::SdkBody, config_bag::ConfigBag, error::operation::BuildError};
|
24 - | use http::{header, HeaderValue};
|
25 - | use http_body::Body;
|
26 - |
|
27 - | const X_AMZ_DECODED_CONTENT_LENGTH: &str = "x-amz-decoded-content-length";
|
28 - |
|
29 - | /// Errors related to constructing aws-chunked encoded HTTP requests.
|
30 - | #[derive(Debug)]
|
31 - | enum Error {
|
32 - | UnsizedRequestBody,
|
33 - | }
|
34 - |
|
35 - | impl fmt::Display for Error {
|
36 - | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
37 - | match self {
|
38 - | Self::UnsizedRequestBody => write!(f, "Only request bodies with a known size can be aws-chunked encoded."),
|
39 - | }
|
40 - | }
|
41 - | }
|
42 - |
|
43 - | impl std::error::Error for Error {}
|
44 - |
|
45 - | #[derive(Debug)]
|
46 - | pub(crate) struct AwsChunkedContentEncodingInterceptor;
|
47 - |
|
48 - | impl Intercept for AwsChunkedContentEncodingInterceptor {
|
49 - | fn name(&self) -> &'static str {
|
50 - | "AwsChunkedContentEncodingInterceptor"
|
51 - | }
|
52 - |
|
53 - | fn modify_before_signing(
|
54 - | &self,
|
55 - | context: &mut BeforeTransmitInterceptorContextMut<'_>,
|
56 - | _runtime_components: &RuntimeComponents,
|
57 - | cfg: &mut ConfigBag,
|
58 - | ) -> Result<(), BoxError> {
|
59 - | if must_not_use_chunked_encoding(context.request(), cfg) {
|
60 - | tracing::debug!("short-circuiting modify_before_signing because chunked encoding must not be used");
|
61 - | return Ok(());
|
62 - | }
|
63 - |
|
64 - | let original_body_size = if let Some(size) = context
|
65 - | .request()
|
66 - | .headers()
|
67 - | .get(header::CONTENT_LENGTH)
|
68 - | .and_then(|s| s.parse::<u64>().ok())
|
69 - | .or_else(|| context.request().body().size_hint().exact())
|
70 - | {
|
71 - | size
|
72 - | } else {
|
73 - | return Err(BuildError::other(Error::UnsizedRequestBody))?;
|
74 - | };
|
75 - |
|
76 - | let chunked_body_options = if let Some(chunked_body_options) = cfg.get_mut_from_interceptor_state::<AwsChunkedBodyOptions>() {
|
77 - | let chunked_body_options = std::mem::take(chunked_body_options);
|
78 - | chunked_body_options.with_stream_length(original_body_size)
|
79 - | } else {
|
80 - | AwsChunkedBodyOptions::default().with_stream_length(original_body_size)
|
81 - | };
|
82 - |
|
83 - | let request = context.request_mut();
|
84 - | // For for aws-chunked encoding, `x-amz-decoded-content-length` must be set to the original body size.
|
85 - | request.headers_mut().insert(
|
86 - | header::HeaderName::from_static(X_AMZ_DECODED_CONTENT_LENGTH),
|
87 - | HeaderValue::from(original_body_size),
|
88 - | );
|
89 - | // Other than `x-amz-decoded-content-length`, either `content-length` or `transfer-encoding`
|
90 - | // must be set, but not both. For uses cases we support, we know the original body size and
|
91 - | // can calculate the encoded size, so we set `content-length`.
|
92 - | request
|
93 - | .headers_mut()
|
94 - | .insert(header::CONTENT_LENGTH, HeaderValue::from(chunked_body_options.encoded_length()));
|
95 - | // Setting `content-length` above means we must unset `transfer-encoding`.
|
96 - | request.headers_mut().remove(header::TRANSFER_ENCODING);
|
97 - | request.headers_mut().append(
|
98 - | header::CONTENT_ENCODING,
|
99 - | HeaderValue::from_str(AWS_CHUNKED)
|
100 - | .map_err(BuildError::other)
|
101 - | .expect("\"aws-chunked\" will always be a valid HeaderValue"),
|
102 - | );
|
103 - |
|
104 - | cfg.interceptor_state().store_put(chunked_body_options);
|
105 - | cfg.interceptor_state().store_put(PayloadSigningOverride::StreamingUnsignedPayloadTrailer);
|
106 - |
|
107 - | Ok(())
|
108 - | }
|
109 - |
|
110 - | fn modify_before_transmit(
|
111 - | &self,
|
112 - | ctx: &mut BeforeTransmitInterceptorContextMut<'_>,
|
113 - | _runtime_components: &RuntimeComponents,
|
114 - | cfg: &mut ConfigBag,
|
115 - | ) -> Result<(), BoxError> {
|
116 - | if must_not_use_chunked_encoding(ctx.request(), cfg) {
|
117 - | tracing::debug!("short-circuiting modify_before_transmit because chunked encoding must not be used");
|
118 - | return Ok(());
|
119 - | }
|
120 - |
|
121 - | let request = ctx.request_mut();
|
122 - |
|
123 - | let mut body = {
|
124 - | let body = std::mem::replace(request.body_mut(), SdkBody::taken());
|
125 - | let opt = cfg
|
126 - | .get_mut_from_interceptor_state::<AwsChunkedBodyOptions>()
|
127 - | .ok_or_else(|| BuildError::other("AwsChunkedBodyOptions missing from config bag"))?;
|
128 - | let aws_chunked_body_options = std::mem::take(opt);
|
129 - | body.map(move |body| {
|
130 - | let body = AwsChunkedBody::new(body, aws_chunked_body_options.clone());
|
131 - | SdkBody::from_body_0_4(body)
|
132 - | })
|
133 - | };
|
134 - |
|
135 - | std::mem::swap(request.body_mut(), &mut body);
|
136 - |
|
137 - | Ok(())
|
138 - | }
|
139 - | }
|
140 - |
|
141 - | // Determine if chunked encoding must not be used; returns true when any of the following is true:
|
142 - | // - If the body is in-memory
|
143 - | // - If chunked encoding is disabled via `AwsChunkedBodyOptions`
|
144 - | fn must_not_use_chunked_encoding(request: &Request, cfg: &ConfigBag) -> bool {
|
145 - | match (request.body().bytes(), cfg.load::<AwsChunkedBodyOptions>()) {
|
146 - | (Some(_), _) => true,
|
147 - | (_, Some(options)) if options.disabled() => true,
|
148 - | _ => false,
|
149 - | }
|
150 - | }
|
151 - |
|
152 - | #[cfg(test)]
|
153 - | mod tests {
|
154 - | use super::*;
|
155 - | use aws_smithy_runtime_api::client::interceptors::context::{BeforeTransmitInterceptorContextMut, Input, InterceptorContext};
|
156 - | use aws_smithy_runtime_api::client::orchestrator::HttpRequest;
|
157 - | use aws_smithy_runtime_api::client::runtime_components::RuntimeComponentsBuilder;
|
158 - | use aws_smithy_types::byte_stream::ByteStream;
|
159 - | use bytes::BytesMut;
|
160 - | use http_body::Body;
|
161 - | use tempfile::NamedTempFile;
|
162 - |
|
163 - | #[tokio::test]
|
164 - | async fn test_aws_chunked_body_is_retryable() {
|
165 - | use std::io::Write;
|
166 - | let mut file = NamedTempFile::new().unwrap();
|
167 - |
|
168 - | for i in 0..10000 {
|
169 - | let line = format!("This is a large file created for testing purposes {}", i);
|
170 - | file.as_file_mut().write_all(line.as_bytes()).unwrap();
|
171 - | }
|
172 - |
|
173 - | let stream_length = file.as_file().metadata().unwrap().len();
|
174 - | let request = HttpRequest::new(ByteStream::read_from().path(&file).buffer_size(1024).build().await.unwrap().into_inner());
|
175 - |
|
176 - | // ensure original SdkBody is retryable
|
177 - | assert!(request.body().try_clone().is_some());
|
178 - |
|
179 - | let interceptor = AwsChunkedContentEncodingInterceptor;
|
180 - | let mut cfg = ConfigBag::base();
|
181 - | cfg.interceptor_state()
|
182 - | .store_put(AwsChunkedBodyOptions::default().with_stream_length(stream_length));
|
183 - | let runtime_components = RuntimeComponentsBuilder::for_tests().build().unwrap();
|
184 - | let mut ctx = InterceptorContext::new(Input::doesnt_matter());
|
185 - | ctx.enter_serialization_phase();
|
186 - | let _ = ctx.take_input();
|
187 - | ctx.set_request(request);
|
188 - | ctx.enter_before_transmit_phase();
|
189 - | let mut ctx: BeforeTransmitInterceptorContextMut<'_> = (&mut ctx).into();
|
190 - | interceptor.modify_before_transmit(&mut ctx, &runtime_components, &mut cfg).unwrap();
|
191 - |
|
192 - | // ensure wrapped SdkBody is retryable
|
193 - | let mut body = ctx.request().body().try_clone().expect("body is retryable");
|
194 - |
|
195 - | let mut body_data = BytesMut::new();
|
196 - | while let Some(data) = body.data().await {
|
197 - | body_data.extend_from_slice(&data.unwrap())
|
198 - | }
|
199 - | let body_str = std::str::from_utf8(&body_data).unwrap();
|
200 - |
|
201 - | let expected = "This is a large file created for testing purposes 9999\r\n0\r\n\r\n";
|
202 - | assert!(body_str.ends_with(expected), "expected '{body_str}' to end with '{expected}'");
|
203 - | }
|
204 - |
|
205 - | #[tokio::test]
|
206 - | async fn test_short_circuit_modify_before_signing() {
|
207 - | let mut ctx = InterceptorContext::new(Input::doesnt_matter());
|
208 - | ctx.enter_serialization_phase();
|
209 - | let _ = ctx.take_input();
|
210 - | let request = HttpRequest::new(SdkBody::from("in-memory body, must not use chunked encoding"));
|
211 - | ctx.set_request(request);
|
212 - | ctx.enter_before_transmit_phase();
|
213 - | let mut ctx: BeforeTransmitInterceptorContextMut<'_> = (&mut ctx).into();
|
214 - |
|
215 - | let runtime_components = RuntimeComponentsBuilder::for_tests().build().unwrap();
|
216 - |
|
217 - | let mut cfg = ConfigBag::base();
|
218 - | cfg.interceptor_state().store_put(AwsChunkedBodyOptions::default());
|
219 - |
|
220 - | let interceptor = AwsChunkedContentEncodingInterceptor;
|
221 - | interceptor.modify_before_signing(&mut ctx, &runtime_components, &mut cfg).unwrap();
|
222 - |
|
223 - | let request = ctx.request();
|
224 - | assert!(request.headers().get(header::CONTENT_ENCODING).is_none());
|
225 - | assert!(request
|
226 - | .headers()
|
227 - | .get(header::HeaderName::from_static(X_AMZ_DECODED_CONTENT_LENGTH))
|
228 - | .is_none());
|
229 - | }
|
230 - |
|
231 - | #[tokio::test]
|
232 - | async fn test_short_circuit_modify_before_transmit() {
|
233 - | let mut ctx = InterceptorContext::new(Input::doesnt_matter());
|
234 - | ctx.enter_serialization_phase();
|
235 - | let _ = ctx.take_input();
|
236 - | let request = HttpRequest::new(SdkBody::from("in-memory body, must not use chunked encoding"));
|
237 - | ctx.set_request(request);
|
238 - | ctx.enter_before_transmit_phase();
|
239 - | let mut ctx: BeforeTransmitInterceptorContextMut<'_> = (&mut ctx).into();
|
240 - |
|
241 - | let runtime_components = RuntimeComponentsBuilder::for_tests().build().unwrap();
|
242 - |
|
243 - | let mut cfg = ConfigBag::base();
|
244 - | // Don't need to set the stream length properly because we expect the body won't be wrapped by `AwsChunkedBody`.
|
245 - | cfg.interceptor_state().store_put(AwsChunkedBodyOptions::default());
|
246 - |
|
247 - | let interceptor = AwsChunkedContentEncodingInterceptor;
|
248 - | interceptor.modify_before_transmit(&mut ctx, &runtime_components, &mut cfg).unwrap();
|
249 - |
|
250 - | let mut body = ctx.request().body().try_clone().expect("body is retryable");
|
251 - |
|
252 - | let mut body_data = BytesMut::new();
|
253 - | while let Some(data) = body.data().await {
|
254 - | body_data.extend_from_slice(&data.unwrap())
|
255 - | }
|
256 - | let body_str = std::str::from_utf8(&body_data).unwrap();
|
257 - | // Also implies that `assert!(!body_str.ends_with("0\r\n\r\n"));`, i.e., shouldn't see chunked encoding epilogue.
|
258 - | assert_eq!("in-memory body, must not use chunked encoding", body_str);
|
259 - | }
|
260 - |
|
261 - | #[test]
|
262 - | fn test_must_not_use_chunked_encoding_with_in_memory_body() {
|
263 - | let request = HttpRequest::new(SdkBody::from("test body"));
|
264 - | let cfg = ConfigBag::base();
|
265 - |
|
266 - | assert!(must_not_use_chunked_encoding(&request, &cfg));
|
267 - | }
|
268 - |
|
269 - | async fn streaming_body(path: impl AsRef<std::path::Path>) -> SdkBody {
|
270 - | let file = path.as_ref();
|
271 - | ByteStream::read_from().path(&file).build().await.unwrap().into_inner()
|
272 - | }
|
273 - |
|
274 - | #[tokio::test]
|
275 - | async fn test_must_not_use_chunked_encoding_with_disabled_option() {
|
276 - | let file = NamedTempFile::new().unwrap();
|
277 - | let request = HttpRequest::new(streaming_body(&file).await);
|
278 - | let mut cfg = ConfigBag::base();
|
279 - | cfg.interceptor_state().store_put(AwsChunkedBodyOptions::disable_chunked_encoding());
|
280 - |
|
281 - | assert!(must_not_use_chunked_encoding(&request, &cfg));
|
282 - | }
|
283 - |
|
284 - | #[tokio::test]
|
285 - | async fn test_chunked_encoding_is_used() {
|
286 - | let file = NamedTempFile::new().unwrap();
|
287 - | let request = HttpRequest::new(streaming_body(&file).await);
|
288 - | let cfg = ConfigBag::base();
|
289 - |
|
290 - | assert!(!must_not_use_chunked_encoding(&request, &cfg));
|
291 - | }
|
292 - | }
|