160 162 | pub fn new(body: Inner, options: AwsChunkedBodyOptions) -> Self {
|
161 163 | Self {
|
162 164 | inner: body,
|
163 165 | state: AwsChunkedBodyState::WritingChunkSize,
|
164 166 | options,
|
165 167 | inner_body_bytes_read_so_far: 0,
|
166 168 | }
|
167 169 | }
|
168 170 | }
|
169 171 |
|
170 - | fn get_unsigned_chunk_bytes_length(payload_length: u64) -> u64 {
|
171 - | let hex_repr_len = int_log16(payload_length);
|
172 - | hex_repr_len + CRLF.len() as u64 + payload_length + CRLF.len() as u64
|
173 - | }
|
174 - |
|
175 - | /// Writes trailers out into a `string` and then converts that `String` to a `Bytes` before
|
176 - | /// returning.
|
177 - | ///
|
178 - | /// - Trailer names are separated by a single colon only, no space.
|
179 - | /// - Trailer names with multiple values will be written out one line per value, with the name
|
180 - | /// appearing on each line.
|
181 - | fn trailers_as_aws_chunked_bytes(
|
182 - | trailer_map: Option<HeaderMap>,
|
183 - | estimated_length: u64,
|
184 - | ) -> BytesMut {
|
185 - | if let Some(trailer_map) = trailer_map {
|
186 - | let mut current_header_name = None;
|
187 - | let mut trailers = BytesMut::with_capacity(estimated_length.try_into().unwrap_or_default());
|
188 - |
|
189 - | for (header_name, header_value) in trailer_map.into_iter() {
|
190 - | // When a header has multiple values, the name only comes up in iteration the first time
|
191 - | // we see it. Therefore, we need to keep track of the last name we saw and fall back to
|
192 - | // it when `header_name == None`.
|
193 - | current_header_name = header_name.or(current_header_name);
|
194 - |
|
195 - | // In practice, this will always exist, but `if let` is nicer than unwrap
|
196 - | if let Some(header_name) = current_header_name.as_ref() {
|
197 - | trailers.extend_from_slice(header_name.as_ref());
|
198 - | trailers.extend_from_slice(TRAILER_SEPARATOR);
|
199 - | trailers.extend_from_slice(header_value.as_bytes());
|
200 - | trailers.extend_from_slice(CRLF.as_bytes());
|
201 - | }
|
202 - | }
|
203 - |
|
204 - | trailers
|
205 - | } else {
|
206 - | BytesMut::new()
|
207 - | }
|
208 - | }
|
209 - |
|
210 - | /// Given an optional `HeaderMap`, calculate the total number of bytes required to represent the
|
211 - | /// `HeaderMap`. If no `HeaderMap` is given as input, return 0.
|
212 - | ///
|
213 - | /// - Trailer names are separated by a single colon only, no space.
|
214 - | /// - Trailer names with multiple values will be written out one line per value, with the name
|
215 - | /// appearing on each line.
|
216 - | fn total_rendered_length_of_trailers(trailer_map: Option<&HeaderMap>) -> u64 {
|
217 - | match trailer_map {
|
218 - | Some(trailer_map) => trailer_map
|
219 - | .iter()
|
220 - | .map(|(trailer_name, trailer_value)| {
|
221 - | trailer_name.as_str().len()
|
222 - | + TRAILER_SEPARATOR.len()
|
223 - | + trailer_value.len()
|
224 - | + CRLF.len()
|
225 - | })
|
226 - | .sum::<usize>() as u64,
|
227 - | None => 0,
|
228 - | }
|
229 - | }
|
230 - |
|
231 - | impl<Inner> Body for AwsChunkedBody<Inner>
|
172 + | impl<Inner> http_body_04x::Body for AwsChunkedBody<Inner>
|
232 173 | where
|
233 - | Inner: Body<Data = Bytes, Error = aws_smithy_types::body::Error>,
|
174 + | Inner: http_body_04x::Body<Data = Bytes, Error = aws_smithy_types::body::Error>,
|
234 175 | {
|
235 176 | type Data = Bytes;
|
236 177 | type Error = aws_smithy_types::body::Error;
|
237 178 |
|
238 179 | fn poll_data(
|
239 180 | self: Pin<&mut Self>,
|
240 181 | cx: &mut Context<'_>,
|
241 182 | ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
|
242 183 | tracing::trace!(state = ?self.state, "polling AwsChunkedBody");
|
243 184 | let mut this = self.project();
|
244 185 |
|
245 186 | match *this.state {
|
246 187 | AwsChunkedBodyState::WritingChunkSize => {
|
247 188 | if this.options.stream_length == 0 {
|
248 189 | // If the stream is empty, we skip to writing trailers after writing the CHUNK_TERMINATOR.
|
249 190 | *this.state = AwsChunkedBodyState::WritingTrailers;
|
250 191 | tracing::trace!("stream is empty, writing chunk terminator");
|
251 192 | Poll::Ready(Some(Ok(Bytes::from([CHUNK_TERMINATOR].concat()))))
|
252 193 | } else {
|
253 194 | *this.state = AwsChunkedBodyState::WritingChunk;
|
254 195 | // A chunk must be prefixed by chunk size in hexadecimal
|
255 196 | let chunk_size = format!("{:X?}{CRLF}", this.options.stream_length);
|
256 197 | tracing::trace!(%chunk_size, "writing chunk size");
|
257 198 | let chunk_size = Bytes::from(chunk_size);
|
258 199 | Poll::Ready(Some(Ok(chunk_size)))
|
259 200 | }
|
260 201 | }
|
261 202 | AwsChunkedBodyState::WritingChunk => match this.inner.poll_data(cx) {
|
262 203 | Poll::Ready(Some(Ok(data))) => {
|
263 204 | tracing::trace!(len = data.len(), "writing chunk data");
|
264 205 | *this.inner_body_bytes_read_so_far += data.len();
|
265 206 | Poll::Ready(Some(Ok(data)))
|
266 207 | }
|
267 208 | Poll::Ready(None) => {
|
268 209 | let actual_stream_length = *this.inner_body_bytes_read_so_far as u64;
|
269 210 | let expected_stream_length = this.options.stream_length;
|
270 211 | if actual_stream_length != expected_stream_length {
|
271 212 | let err = Box::new(AwsChunkedBodyError::StreamLengthMismatch {
|
272 213 | actual: actual_stream_length,
|
273 214 | expected: expected_stream_length,
|
274 215 | });
|
275 216 | return Poll::Ready(Some(Err(err)));
|
276 217 | };
|
277 218 |
|
278 219 | tracing::trace!("no more chunk data, writing CRLF and chunk terminator");
|
279 220 | *this.state = AwsChunkedBodyState::WritingTrailers;
|
280 221 | // Since we wrote chunk data, we end it with a CRLF and since we only write
|
281 222 | // a single chunk, we write the CHUNK_TERMINATOR immediately after
|
282 223 | Poll::Ready(Some(Ok(Bytes::from([CRLF, CHUNK_TERMINATOR].concat()))))
|
283 224 | }
|
284 225 | Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
|
285 226 | Poll::Pending => Poll::Pending,
|
286 227 | },
|
287 228 | AwsChunkedBodyState::WritingTrailers => {
|
288 229 | return match this.inner.poll_trailers(cx) {
|
289 230 | Poll::Ready(Ok(trailers)) => {
|
290 231 | *this.state = AwsChunkedBodyState::Closed;
|
291 - | let expected_length = total_rendered_length_of_trailers(trailers.as_ref());
|
232 + | let expected_length =
|
233 + | http_02x_utils::total_rendered_length_of_trailers(trailers.as_ref());
|
292 234 | let actual_length = this.options.total_trailer_length();
|
293 235 |
|
294 236 | if expected_length != actual_length {
|
295 237 | let err =
|
296 238 | Box::new(AwsChunkedBodyError::ReportedTrailerLengthMismatch {
|
297 239 | actual: actual_length,
|
298 240 | expected: expected_length,
|
299 241 | });
|
300 242 | return Poll::Ready(Some(Err(err)));
|
301 243 | }
|
302 244 |
|
303 - | let mut trailers =
|
304 - | trailers_as_aws_chunked_bytes(trailers, actual_length + 1);
|
245 + | let mut trailers = http_02x_utils::trailers_as_aws_chunked_bytes(
|
246 + | trailers,
|
247 + | actual_length + 1,
|
248 + | );
|
305 249 | // Insert the final CRLF to close the body
|
306 250 | trailers.extend_from_slice(CRLF.as_bytes());
|
307 251 |
|
308 252 | Poll::Ready(Some(Ok(trailers.into())))
|
309 253 | }
|
310 254 | Poll::Pending => Poll::Pending,
|
311 255 | Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))),
|
312 256 | };
|
313 257 | }
|
314 258 | AwsChunkedBodyState::Closed => Poll::Ready(None),
|
315 259 | }
|
316 260 | }
|
317 261 |
|
318 262 | fn poll_trailers(
|
319 263 | self: Pin<&mut Self>,
|
320 264 | _cx: &mut Context<'_>,
|
321 - | ) -> Poll<Result<Option<HeaderMap<HeaderValue>>, Self::Error>> {
|
265 + | ) -> Poll<Result<Option<http_02x::HeaderMap<http_02x::HeaderValue>>, Self::Error>> {
|
322 266 | // Trailers were already appended to the body because of the content encoding scheme
|
323 267 | Poll::Ready(Ok(None))
|
324 268 | }
|
325 269 |
|
326 270 | fn is_end_stream(&self) -> bool {
|
327 271 | self.state == AwsChunkedBodyState::Closed
|
328 272 | }
|
329 273 |
|
330 - | fn size_hint(&self) -> SizeHint {
|
331 - | SizeHint::with_exact(self.options.encoded_length())
|
274 + | fn size_hint(&self) -> http_body_04x::SizeHint {
|
275 + | http_body_04x::SizeHint::with_exact(self.options.encoded_length())
|
276 + | }
|
277 + | }
|
278 + |
|
279 + | /// Utility functions to help with the [http_body_04x::Body] trait implementation
|
280 + | mod http_02x_utils {
|
281 + | use super::{CRLF, TRAILER_SEPARATOR};
|
282 + | use bytes::BytesMut;
|
283 + | use http_02x::HeaderMap;
|
284 + |
|
285 + | /// Writes trailers out into a `string` and then converts that `String` to a `Bytes` before
|
286 + | /// returning.
|
287 + | ///
|
288 + | /// - Trailer names are separated by a single colon only, no space.
|
289 + | /// - Trailer names with multiple values will be written out one line per value, with the name
|
290 + | /// appearing on each line.
|
291 + | pub(super) fn trailers_as_aws_chunked_bytes(
|
292 + | trailer_map: Option<HeaderMap>,
|
293 + | estimated_length: u64,
|
294 + | ) -> BytesMut {
|
295 + | if let Some(trailer_map) = trailer_map {
|
296 + | let mut current_header_name = None;
|
297 + | let mut trailers =
|
298 + | BytesMut::with_capacity(estimated_length.try_into().unwrap_or_default());
|
299 + |
|
300 + | for (header_name, header_value) in trailer_map.into_iter() {
|
301 + | // When a header has multiple values, the name only comes up in iteration the first time
|
302 + | // we see it. Therefore, we need to keep track of the last name we saw and fall back to
|
303 + | // it when `header_name == None`.
|
304 + | current_header_name = header_name.or(current_header_name);
|
305 + |
|
306 + | // In practice, this will always exist, but `if let` is nicer than unwrap
|
307 + | if let Some(header_name) = current_header_name.as_ref() {
|
308 + | trailers.extend_from_slice(header_name.as_ref());
|
309 + | trailers.extend_from_slice(TRAILER_SEPARATOR);
|
310 + | trailers.extend_from_slice(header_value.as_bytes());
|
311 + | trailers.extend_from_slice(CRLF.as_bytes());
|
312 + | }
|
313 + | }
|
314 + |
|
315 + | trailers
|
316 + | } else {
|
317 + | BytesMut::new()
|
318 + | }
|
319 + | }
|
320 + |
|
321 + | /// Given an optional `HeaderMap`, calculate the total number of bytes required to represent the
|
322 + | /// `HeaderMap`. If no `HeaderMap` is given as input, return 0.
|
323 + | ///
|
324 + | /// - Trailer names are separated by a single colon only, no space.
|
325 + | /// - Trailer names with multiple values will be written out one line per value, with the name
|
326 + | /// appearing on each line.
|
327 + | pub(super) fn total_rendered_length_of_trailers(trailer_map: Option<&HeaderMap>) -> u64 {
|
328 + | match trailer_map {
|
329 + | Some(trailer_map) => trailer_map
|
330 + | .iter()
|
331 + | .map(|(trailer_name, trailer_value)| {
|
332 + | trailer_name.as_str().len()
|
333 + | + TRAILER_SEPARATOR.len()
|
334 + | + trailer_value.len()
|
335 + | + CRLF.len()
|
336 + | })
|
337 + | .sum::<usize>() as u64,
|
338 + | None => 0,
|
339 + | }
|
340 + | }
|
341 + | }
|
342 + |
|
343 + | const UNREACHABLE_STATES: &str = "These states already short circuited";
|
344 + |
|
345 + | /// Implementing the [http_body_1x::Body] trait
|
346 + | impl<Inner> http_body_1x::Body for AwsChunkedBody<Inner>
|
347 + | where
|
348 + | Inner: http_body_1x::Body<Data = Bytes, Error = aws_smithy_types::body::Error>,
|
349 + | {
|
350 + | type Data = Bytes;
|
351 + | type Error = aws_smithy_types::body::Error;
|
352 + |
|
353 + | fn is_end_stream(&self) -> bool {
|
354 + | self.state == AwsChunkedBodyState::Closed
|
355 + | }
|
356 + |
|
357 + | fn size_hint(&self) -> http_body_1x::SizeHint {
|
358 + | http_body_1x::SizeHint::with_exact(self.options.encoded_length())
|
359 + | }
|
360 + |
|
361 + | fn poll_frame(
|
362 + | self: Pin<&mut Self>,
|
363 + | cx: &mut Context<'_>,
|
364 + | ) -> Poll<Option<Result<http_body_1x::Frame<Self::Data>, Self::Error>>> {
|
365 + | tracing::trace!(state = ?self.state, "polling AwsChunkedBody");
|
366 + | let mut this = self.project();
|
367 + |
|
368 + | // Both `WritingChunkSize` and `Closed` states short circuit without polling the inner body
|
369 + |
|
370 + | // Initial setup, we do not poll the inner body here
|
371 + | if *this.state == AwsChunkedBodyState::WritingChunkSize {
|
372 + | if this.options.stream_length == 0 {
|
373 + | // If the stream is empty, we skip to writing trailers after writing the CHUNK_TERMINATOR.
|
374 + | tracing::trace!("stream is empty, writing chunk terminator");
|
375 + | let frame = http_body_1x::Frame::data(Bytes::from(CHUNK_TERMINATOR));
|
376 + | *this.state = AwsChunkedBodyState::WritingTrailers;
|
377 + | return Poll::Ready(Some(Ok(frame)));
|
378 + | } else {
|
379 + | // A chunk must be prefixed by chunk size in hexadecimal
|
380 + | let chunk_size = format!(
|
381 + | "{:X?}{}",
|
382 + | this.options.stream_length,
|
383 + | std::str::from_utf8(CRLF_RAW).unwrap()
|
384 + | );
|
385 + | tracing::trace!(%chunk_size, "writing chunk size");
|
386 + | let chunk_size = http_body_1x::Frame::data(Bytes::from(chunk_size));
|
387 + | *this.state = AwsChunkedBodyState::WritingChunk;
|
388 + | return Poll::Ready(Some(Ok(chunk_size)));
|
389 + | }
|
390 + | }
|
391 + |
|
392 + | // Polled after completion
|
393 + | if *this.state == AwsChunkedBodyState::Closed {
|
394 + | return Poll::Ready(None);
|
395 + | }
|
396 + |
|
397 + | // For all other states we must poll the inner body
|
398 + | let maybe_frame = this.inner.poll_frame(cx);
|
399 + | tracing::trace!(poll_state = ?maybe_frame, "Polling InnerBody");
|
400 + |
|
401 + | match maybe_frame {
|
402 + | Poll::Ready(Some(Ok(frame))) => match *this.state {
|
403 + | // Both data chunks and trailers are written as Frame::data so we treat these states similarly
|
404 + | // Importantly we cannot know that the body data of the InnerBody is exhausted until we see a
|
405 + | // trailer frame or a Poll::Ready(None)
|
406 + | AwsChunkedBodyState::WritingChunk => {
|
407 + | if frame.is_data() {
|
408 + | let data = frame.data_ref().expect("Data frame has data");
|
409 + | tracing::trace!(len = data.len(), "Writing chunk data");
|
410 + | *this.inner_body_bytes_read_so_far += data.len();
|
411 + | Poll::Ready(Some(Ok(frame)))
|
412 + | } else {
|
413 + | tracing::trace!(
|
414 + | "No more chunk data, writing CRLF + CHUNK_TERMINATOR to end the data, and the first trailer frame"
|
415 + | );
|
416 + |
|
417 + | // We exhausted the body data, now check if the length is correct
|
418 + | if let Err(poll_stream_len_err) =
|
419 + | http_1x_utils::check_for_stream_length_mismatch(
|
420 + | *this.inner_body_bytes_read_so_far as u64,
|
421 + | this.options.stream_length,
|
422 + | )
|
423 + | {
|
424 + | return poll_stream_len_err;
|
425 + | }
|
426 + |
|
427 + | *this.state = AwsChunkedBodyState::WritingTrailers;
|
428 + | let trailers = frame.trailers_ref();
|
429 + |
|
430 + | // NOTE: there is a subtle logic bug here (which is present in the http-02x implementation as well)
|
431 + | // The check for this error assumes that all trailers will come in a single trailer frame. Currently
|
432 + | // I believe this will always be the case since the only thing we send trailers for in AwsChunked is
|
433 + | // streaming checksums and that is a single trailer value. But it might not always be true. We should
|
434 + | // fix this bug when we update the behavior here to match the actual spec.
|
435 + | // The fix probably looks like returning Poll::Pending while we buffer all of the trailers and then
|
436 + | // comparing the actual length to the expected length before returning a final frame containing all
|
437 + | // of the trailers.
|
438 + | let actual_length: u64 =
|
439 + | http_1x_utils::total_rendered_length_of_trailers(trailers);
|
440 + | let expected_length = this.options.total_trailer_length();
|
441 + | if expected_length != actual_length {
|
442 + | let err =
|
443 + | Box::new(AwsChunkedBodyError::ReportedTrailerLengthMismatch {
|
444 + | actual: actual_length,
|
445 + | expected: expected_length,
|
446 + | });
|
447 + | return Poll::Ready(Some(Err(err)));
|
448 + | }
|
449 + |
|
450 + | // Capacity = actual_length (in case all of the trailers specified in come in AwsChunkedBodyOptions
|
451 + | // come in the first trailer frame which is going to be the case most of the time in practice) + 7
|
452 + | // (2 + 3) for the initial CRLF + CHUNK_TERMINATOR to end the chunked data + 2 for the final CRLF
|
453 + | // ending the trailers section.
|
454 + | let mut buf = BytesMut::with_capacity(actual_length as usize + 7);
|
455 + | // End the final data chunk
|
456 + | buf.extend_from_slice(&[CRLF_RAW, CHUNK_TERMINATOR_RAW].concat());
|
457 + |
|
458 + | // We transform the trailers into raw bytes. We can't write them with Frame::trailers
|
459 + | // since we must include the CRLF + CHUNK_TERMINATOR that end the body and the CRLFs
|
460 + | // after each trailer, so we write them as Frame::data
|
461 + | let trailers = http_1x_utils::trailers_as_aws_chunked_bytes(trailers, buf);
|
462 + | Poll::Ready(Some(Ok(http_body_1x::Frame::data(trailers.into()))))
|
463 + | }
|
464 + | }
|
465 + | AwsChunkedBodyState::WritingTrailers => {
|
466 + | let trailers = frame.trailers_ref();
|
467 + | let actual_length: u64 =
|
468 + | http_1x_utils::total_rendered_length_of_trailers(trailers);
|
469 + | let buf = BytesMut::with_capacity(actual_length as usize + 7);
|
470 + | let trailers = http_1x_utils::trailers_as_aws_chunked_bytes(trailers, buf);
|
471 + | Poll::Ready(Some(Ok(http_body_1x::Frame::data(trailers.into()))))
|
472 + | }
|
473 + | AwsChunkedBodyState::Closed | AwsChunkedBodyState::WritingChunkSize => {
|
474 + | unreachable!("{}", UNREACHABLE_STATES)
|
475 + | }
|
476 + | },
|
477 + | // InnerBody data exhausted, add finalizing bytes depending on current state
|
478 + | Poll::Ready(None) => {
|
479 + | let trailers = match *this.state {
|
480 + | AwsChunkedBodyState::WritingChunk => {
|
481 + | // We exhausted the body data, now check if the length is correct
|
482 + | if let Err(poll_stream_len_err) =
|
483 + | http_1x_utils::check_for_stream_length_mismatch(
|
484 + | *this.inner_body_bytes_read_so_far as u64,
|
485 + | this.options.stream_length,
|
486 + | )
|
487 + | {
|
488 + | return poll_stream_len_err;
|
489 + | }
|
490 + |
|
491 + | // Since we exhausted the body data, but are still in the WritingChunk state we did
|
492 + | // not poll any trailer frames and we write the CRLF + Chunk terminator to begin the
|
493 + | // trailer section plus a single final CRLF to end the (empty) trailer section
|
494 + | let mut trailers = BytesMut::with_capacity(7);
|
495 + | trailers.extend_from_slice(
|
496 + | &[CRLF_RAW, CHUNK_TERMINATOR_RAW, CRLF_RAW].concat(),
|
497 + | );
|
498 + | trailers
|
499 + | }
|
500 + | AwsChunkedBodyState::WritingTrailers => {
|
501 + | let mut trailers = BytesMut::with_capacity(2);
|
502 + | trailers.extend_from_slice(CRLF_RAW);
|
503 + | trailers
|
504 + | }
|
505 + | AwsChunkedBodyState::Closed | AwsChunkedBodyState::WritingChunkSize => {
|
506 + | unreachable!("{}", UNREACHABLE_STATES)
|
507 + | }
|
508 + | };
|
509 + |
|
510 + | let frame = http_body_1x::Frame::data(trailers.into());
|
511 + | *this.state = AwsChunkedBodyState::Closed;
|
512 + | Poll::Ready(Some(Ok(frame)))
|
513 + | }
|
514 + | // Passthrough states
|
515 + | Poll::Pending => Poll::Pending,
|
516 + | Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
|
517 + | }
|
518 + | }
|
519 + | }
|
520 + | /// Utility functions to help with the [http_body_1x::Body] trait implementation
|
521 + | mod http_1x_utils {
|
522 + | use std::task::Poll;
|
523 + |
|
524 + | use super::{CRLF_RAW, TRAILER_SEPARATOR};
|
525 + | use bytes::{Bytes, BytesMut};
|
526 + | use http_1x::{HeaderMap, HeaderName};
|
527 + |
|
528 + | /// Writes trailers out into a `string` and then converts that `String` to a `Bytes` before
|
529 + | /// returning.
|
530 + | ///
|
531 + | /// - Trailer names are separated by a single colon only, no space.
|
532 + | /// - Trailer names with multiple values will be written out one line per value, with the name
|
533 + | /// appearing on each line.
|
534 + | pub(super) fn trailers_as_aws_chunked_bytes(
|
535 + | trailer_map: Option<&HeaderMap>,
|
536 + | mut buffer: BytesMut,
|
537 + | ) -> BytesMut {
|
538 + | if let Some(trailer_map) = trailer_map {
|
539 + | let mut current_header_name: Option<HeaderName> = None;
|
540 + |
|
541 + | for (header_name, header_value) in trailer_map.clone().into_iter() {
|
542 + | // When a header has multiple values, the name only comes up in iteration the first time
|
543 + | // we see it. Therefore, we need to keep track of the last name we saw and fall back to
|
544 + | // it when `header_name == None`.
|
545 + | current_header_name = header_name.or(current_header_name);
|
546 + |
|
547 + | // In practice, this will always exist, but `if let` is nicer than unwrap
|
548 + | if let Some(header_name) = current_header_name.as_ref() {
|
549 + | buffer.extend_from_slice(header_name.as_ref());
|
550 + | buffer.extend_from_slice(TRAILER_SEPARATOR);
|
551 + | buffer.extend_from_slice(header_value.as_bytes());
|
552 + | buffer.extend_from_slice(CRLF_RAW);
|
553 + | }
|
554 + | }
|
555 + |
|
556 + | buffer
|
557 + | } else {
|
558 + | buffer
|
559 + | }
|
560 + | }
|
561 + |
|
562 + | /// Given an optional `HeaderMap`, calculate the total number of bytes required to represent the
|
563 + | /// `HeaderMap`. If no `HeaderMap` is given as input, return 0.
|
564 + | ///
|
565 + | /// - Trailer names are separated by a single colon only, no space.
|
566 + | /// - Trailer names with multiple values will be written out one line per value, with the name
|
567 + | /// appearing on each line.
|
568 + | pub(super) fn total_rendered_length_of_trailers(trailer_map: Option<&HeaderMap>) -> u64 {
|
569 + | match trailer_map {
|
570 + | Some(trailer_map) => trailer_map
|
571 + | .iter()
|
572 + | .map(|(trailer_name, trailer_value)| {
|
573 + | trailer_name.as_str().len()
|
574 + | + TRAILER_SEPARATOR.len()
|
575 + | + trailer_value.len()
|
576 + | + CRLF_RAW.len()
|
577 + | })
|
578 + | .sum::<usize>() as u64,
|
579 + | None => 0,
|
580 + | }
|
581 + | }
|
582 + |
|
583 + | /// This is an ugly return type, but in practice it just returns `Ok(())` if the values match
|
584 + | /// and `Err(Poll::Ready(Some(Err(AwsChunkedBodyError::StreamLengthMismatch))))` if they don't
|
585 + | #[allow(clippy::type_complexity)]
|
586 + | pub(super) fn check_for_stream_length_mismatch(
|
587 + | actual_stream_length: u64,
|
588 + | expected_stream_length: u64,
|
589 + | ) -> Result<(), Poll<Option<Result<http_body_1x::Frame<Bytes>, aws_smithy_types::body::Error>>>>
|
590 + | {
|
591 + | if actual_stream_length != expected_stream_length {
|
592 + | let err = Box::new(super::AwsChunkedBodyError::StreamLengthMismatch {
|
593 + | actual: actual_stream_length,
|
594 + | expected: expected_stream_length,
|
595 + | });
|
596 + | return Err(Poll::Ready(Some(Err(err))));
|
597 + | };
|
598 + |
|
599 + | Ok(())
|
332 600 | }
|
333 601 | }
|
334 602 |
|
335 603 | /// Errors related to `AwsChunkedBody`
|
336 604 | #[derive(Debug)]
|
337 605 | enum AwsChunkedBodyError {
|
338 606 | /// Error that occurs when the sum of `trailer_lengths` set when creating an `AwsChunkedBody` is
|
339 607 | /// not equal to the actual length of the trailers returned by the inner `http_body::Body`
|
340 608 | /// implementor. These trailer lengths are necessary in order to correctly calculate the total
|
341 609 | /// size of the body for setting the content length header.
|
342 610 | ReportedTrailerLengthMismatch { actual: u64, expected: u64 },
|
343 611 | /// Error that occurs when the `stream_length` set when creating an `AwsChunkedBody` is not
|
344 612 | /// equal to the actual length of the body returned by the inner `http_body::Body` implementor.
|
345 613 | /// `stream_length` must be correct in order to set an accurate content length header.
|
346 614 | StreamLengthMismatch { actual: u64, expected: u64 },
|
347 615 | }
|
348 616 |
|
349 617 | impl std::fmt::Display for AwsChunkedBodyError {
|
350 618 | fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
351 619 | match self {
|
352 620 | Self::ReportedTrailerLengthMismatch { actual, expected } => {
|
353 621 | write!(f, "When creating this AwsChunkedBody, length of trailers was reported as {expected}. However, when double checking during trailer encoding, length was found to be {actual} instead.")
|
354 622 | }
|
355 623 | Self::StreamLengthMismatch { actual, expected } => {
|
356 624 | write!(f, "When creating this AwsChunkedBody, stream length was reported as {expected}. However, when double checking during body encoding, length was found to be {actual} instead.")
|
357 625 | }
|
358 626 | }
|
359 627 | }
|
360 628 | }
|
361 629 |
|
362 630 | impl std::error::Error for AwsChunkedBodyError {}
|
363 631 |
|
364 632 | // Used for finding how many hexadecimal digits it takes to represent a base 10 integer
|
365 633 | fn int_log16<T>(mut i: T) -> u64
|
366 634 | where
|
367 635 | T: std::ops::DivAssign + PartialOrd + From<u8> + Copy,
|
368 636 | {
|
369 637 | let mut len = 0;
|
370 638 | let zero = T::from(0);
|
371 639 | let sixteen = T::from(16);
|
372 640 |
|
373 641 | while i > zero {
|
374 642 | i /= sixteen;
|
375 643 | len += 1;
|
376 644 | }
|
377 645 |
|
378 646 | len
|
379 647 | }
|
380 648 |
|
649 + | fn get_unsigned_chunk_bytes_length(payload_length: u64) -> u64 {
|
650 + | let hex_repr_len = int_log16(payload_length);
|
651 + | hex_repr_len + CRLF.len() as u64 + payload_length + CRLF.len() as u64
|
652 + | }
|
653 + |
|
381 654 | #[cfg(test)]
|
382 655 | mod tests {
|
383 - | use super::{
|
384 - | total_rendered_length_of_trailers, trailers_as_aws_chunked_bytes, AwsChunkedBody,
|
385 - | AwsChunkedBodyOptions, CHUNK_TERMINATOR, CRLF,
|
386 - | };
|
387 - |
|
388 - | use aws_smithy_types::body::SdkBody;
|
389 - | use bytes::{Buf, Bytes};
|
390 - | use bytes_utils::SegmentedBuf;
|
391 - | use http_02x::{HeaderMap, HeaderValue};
|
392 - | use http_body_04x::{Body, SizeHint};
|
393 - | use pin_project_lite::pin_project;
|
394 - |
|
395 - | use std::io::Read;
|
396 - | use std::pin::Pin;
|
397 - | use std::task::{Context, Poll};
|
398 - | use std::time::Duration;
|
399 - |
|
400 - | pin_project! {
|
401 - | struct SputteringBody {
|
402 - | parts: Vec<Option<Bytes>>,
|
403 - | cursor: usize,
|
404 - | delay_in_millis: u64,
|
405 - | }
|
406 - | }
|
407 656 |
|
408 - | impl SputteringBody {
|
409 - | fn len(&self) -> usize {
|
410 - | self.parts.iter().flatten().map(|b| b.len()).sum()
|
411 - | }
|
412 - | }
|
657 + | #[cfg(test)]
|
658 + | mod http_02x_tests {
|
659 + | use super::super::{
|
660 + | http_02x_utils::{total_rendered_length_of_trailers, trailers_as_aws_chunked_bytes},
|
661 + | AwsChunkedBody, AwsChunkedBodyOptions, CHUNK_TERMINATOR, CRLF,
|
662 + | };
|
413 663 |
|
414 - | impl Body for SputteringBody {
|
415 - | type Data = Bytes;
|
416 - | type Error = aws_smithy_types::body::Error;
|
664 + | use aws_smithy_types::body::SdkBody;
|
665 + | use bytes::{Buf, Bytes};
|
666 + | use bytes_utils::SegmentedBuf;
|
667 + | use http_02x::{HeaderMap, HeaderValue};
|
668 + | use http_body_04x::{Body, SizeHint};
|
669 + | use pin_project_lite::pin_project;
|
670 + |
|
671 + | use std::io::Read;
|
672 + | use std::pin::Pin;
|
673 + | use std::task::{Context, Poll};
|
674 + | use std::time::Duration;
|
675 + |
|
676 + | pin_project! {
|
677 + | struct SputteringBody {
|
678 + | parts: Vec<Option<Bytes>>,
|
679 + | cursor: usize,
|
680 + | delay_in_millis: u64,
|
681 + | }
|
682 + | }
|
417 683 |
|
418 - | fn poll_data(
|
419 - | self: Pin<&mut Self>,
|
420 - | cx: &mut Context<'_>,
|
421 - | ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
|
422 - | if self.cursor == self.parts.len() {
|
423 - | return Poll::Ready(None);
|
684 + | impl SputteringBody {
|
685 + | fn len(&self) -> usize {
|
686 + | self.parts.iter().flatten().map(|b| b.len()).sum()
|
424 687 | }
|
688 + | }
|
425 689 |
|
426 - | let this = self.project();
|
427 - | let delay_in_millis = *this.delay_in_millis;
|
428 - | let next_part = this.parts.get_mut(*this.cursor).unwrap().take();
|
429 - |
|
430 - | match next_part {
|
431 - | None => {
|
432 - | *this.cursor += 1;
|
433 - | let waker = cx.waker().clone();
|
434 - | tokio::spawn(async move {
|
435 - | tokio::time::sleep(Duration::from_millis(delay_in_millis)).await;
|
436 - | waker.wake();
|
437 - | });
|
438 - | Poll::Pending
|
690 + | impl Body for SputteringBody {
|
691 + | type Data = Bytes;
|
692 + | type Error = aws_smithy_types::body::Error;
|
693 + |
|
694 + | fn poll_data(
|
695 + | self: Pin<&mut Self>,
|
696 + | cx: &mut Context<'_>,
|
697 + | ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
|
698 + | if self.cursor == self.parts.len() {
|
699 + | return Poll::Ready(None);
|
439 700 | }
|
440 - | Some(data) => {
|
441 - | *this.cursor += 1;
|
442 - | Poll::Ready(Some(Ok(data)))
|
701 + |
|
702 + | let this = self.project();
|
703 + | let delay_in_millis = *this.delay_in_millis;
|
704 + | let next_part = this.parts.get_mut(*this.cursor).unwrap().take();
|
705 + |
|
706 + | match next_part {
|
707 + | None => {
|
708 + | *this.cursor += 1;
|
709 + | let waker = cx.waker().clone();
|
710 + | tokio::spawn(async move {
|
711 + | tokio::time::sleep(Duration::from_millis(delay_in_millis)).await;
|
712 + | waker.wake();
|
713 + | });
|
714 + | Poll::Pending
|
715 + | }
|
716 + | Some(data) => {
|
717 + | *this.cursor += 1;
|
718 + | Poll::Ready(Some(Ok(data)))
|
719 + | }
|
443 720 | }
|
444 721 | }
|
445 - | }
|
446 722 |
|
447 - | fn poll_trailers(
|
448 - | self: Pin<&mut Self>,
|
449 - | _cx: &mut Context<'_>,
|
450 - | ) -> Poll<Result<Option<HeaderMap<HeaderValue>>, Self::Error>> {
|
451 - | Poll::Ready(Ok(None))
|
723 + | fn poll_trailers(
|
724 + | self: Pin<&mut Self>,
|
725 + | _cx: &mut Context<'_>,
|
726 + | ) -> Poll<Result<Option<HeaderMap<HeaderValue>>, Self::Error>> {
|
727 + | Poll::Ready(Ok(None))
|
728 + | }
|
729 + |
|
730 + | fn is_end_stream(&self) -> bool {
|
731 + | false
|
732 + | }
|
733 + |
|
734 + | fn size_hint(&self) -> SizeHint {
|
735 + | SizeHint::new()
|
736 + | }
|
452 737 | }
|
453 738 |
|
454 - | fn is_end_stream(&self) -> bool {
|
455 - | false
|
739 + | #[tokio::test]
|
740 + | async fn test_aws_chunked_encoding() {
|
741 + | let test_fut = async {
|
742 + | let input_str = "Hello world";
|
743 + | let opts = AwsChunkedBodyOptions::new(input_str.len() as u64, Vec::new());
|
744 + | let mut body = AwsChunkedBody::new(SdkBody::from(input_str), opts);
|
745 + |
|
746 + | let mut output = SegmentedBuf::new();
|
747 + | while let Some(buf) = body.data().await {
|
748 + | output.push(buf.unwrap());
|
749 + | }
|
750 + |
|
751 + | let mut actual_output = String::new();
|
752 + | output
|
753 + | .reader()
|
754 + | .read_to_string(&mut actual_output)
|
755 + | .expect("Doesn't cause IO errors");
|
756 + |
|
757 + | let expected_output = "B\r\nHello world\r\n0\r\n\r\n";
|
758 + |
|
759 + | assert_eq!(expected_output, actual_output);
|
760 + | assert!(
|
761 + | body.trailers()
|
762 + | .await
|
763 + | .expect("no errors occurred during trailer polling")
|
764 + | .is_none(),
|
765 + | "aws-chunked encoded bodies don't have normal HTTP trailers"
|
766 + | );
|
767 + |
|
768 + | // You can insert a `tokio::time::sleep` here to verify the timeout works as intended
|
769 + | };
|
770 + |
|
771 + | let timeout_duration = Duration::from_secs(3);
|
772 + | if tokio::time::timeout(timeout_duration, test_fut)
|
773 + | .await
|
774 + | .is_err()
|
775 + | {
|
776 + | panic!("test_aws_chunked_encoding timed out after {timeout_duration:?}");
|
777 + | }
|
456 778 | }
|
457 779 |
|
458 - | fn size_hint(&self) -> SizeHint {
|
459 - | SizeHint::new()
|
780 + | #[tokio::test]
|
781 + | async fn test_aws_chunked_encoding_sputtering_body() {
|
782 + | let test_fut = async {
|
783 + | let input = SputteringBody {
|
784 + | parts: vec![
|
785 + | Some(Bytes::from_static(b"chunk 1, ")),
|
786 + | None,
|
787 + | Some(Bytes::from_static(b"chunk 2, ")),
|
788 + | Some(Bytes::from_static(b"chunk 3, ")),
|
789 + | None,
|
790 + | None,
|
791 + | Some(Bytes::from_static(b"chunk 4, ")),
|
792 + | Some(Bytes::from_static(b"chunk 5, ")),
|
793 + | Some(Bytes::from_static(b"chunk 6")),
|
794 + | ],
|
795 + | cursor: 0,
|
796 + | delay_in_millis: 500,
|
797 + | };
|
798 + | let opts = AwsChunkedBodyOptions::new(input.len() as u64, Vec::new());
|
799 + | let mut body = AwsChunkedBody::new(input, opts);
|
800 + |
|
801 + | let mut output = SegmentedBuf::new();
|
802 + | while let Some(buf) = body.data().await {
|
803 + | output.push(buf.unwrap());
|
804 + | }
|
805 + |
|
806 + | let mut actual_output = String::new();
|
807 + | output
|
808 + | .reader()
|
809 + | .read_to_string(&mut actual_output)
|
810 + | .expect("Doesn't cause IO errors");
|
811 + |
|
812 + | let expected_output =
|
813 + | "34\r\nchunk 1, chunk 2, chunk 3, chunk 4, chunk 5, chunk 6\r\n0\r\n\r\n";
|
814 + |
|
815 + | assert_eq!(expected_output, actual_output);
|
816 + | assert!(
|
817 + | body.trailers()
|
818 + | .await
|
819 + | .expect("no errors occurred during trailer polling")
|
820 + | .is_none(),
|
821 + | "aws-chunked encoded bodies don't have normal HTTP trailers"
|
822 + | );
|
823 + | };
|
824 + |
|
825 + | let timeout_duration = Duration::from_secs(3);
|
826 + | if tokio::time::timeout(timeout_duration, test_fut)
|
827 + | .await
|
828 + | .is_err()
|
829 + | {
|
830 + | panic!(
|
831 + | "test_aws_chunked_encoding_sputtering_body timed out after {timeout_duration:?}"
|
832 + | );
|
833 + | }
|
460 834 | }
|
461 - | }
|
462 835 |
|
463 - | #[tokio::test]
|
464 - | async fn test_aws_chunked_encoding() {
|
465 - | let test_fut = async {
|
836 + | #[tokio::test]
|
837 + | #[should_panic = "called `Result::unwrap()` on an `Err` value: ReportedTrailerLengthMismatch { actual: 44, expected: 0 }"]
|
838 + | async fn test_aws_chunked_encoding_incorrect_trailer_length_panic() {
|
466 839 | let input_str = "Hello world";
|
467 - | let opts = AwsChunkedBodyOptions::new(input_str.len() as u64, Vec::new());
|
840 + | // Test body has no trailers, so this length is incorrect and will trigger an assert panic
|
841 + | // When the panic occurs, it will actually expect a length of 44. This is because, when using
|
842 + | // aws-chunked encoding, each trailer will end with a CRLF which is 2 bytes long.
|
843 + | let wrong_trailer_len = 42;
|
844 + | let opts = AwsChunkedBodyOptions::new(input_str.len() as u64, vec![wrong_trailer_len]);
|
468 845 | let mut body = AwsChunkedBody::new(SdkBody::from(input_str), opts);
|
469 846 |
|
470 - | let mut output = SegmentedBuf::new();
|
847 + | // We don't care about the body contents but we have to read it all before checking for trailers
|
471 848 | while let Some(buf) = body.data().await {
|
472 - | output.push(buf.unwrap());
|
849 + | drop(buf.unwrap());
|
473 850 | }
|
474 851 |
|
475 - | let mut actual_output = String::new();
|
476 - | output
|
477 - | .reader()
|
478 - | .read_to_string(&mut actual_output)
|
479 - | .expect("Doesn't cause IO errors");
|
480 - |
|
481 - | let expected_output = "B\r\nHello world\r\n0\r\n\r\n";
|
482 - |
|
483 - | assert_eq!(expected_output, actual_output);
|
484 852 | assert!(
|
485 853 | body.trailers()
|
486 854 | .await
|
487 855 | .expect("no errors occurred during trailer polling")
|
488 856 | .is_none(),
|
489 857 | "aws-chunked encoded bodies don't have normal HTTP trailers"
|
490 858 | );
|
491 - |
|
492 - | // You can insert a `tokio::time::sleep` here to verify the timeout works as intended
|
493 - | };
|
494 - |
|
495 - | let timeout_duration = Duration::from_secs(3);
|
496 - | if tokio::time::timeout(timeout_duration, test_fut)
|
497 - | .await
|
498 - | .is_err()
|
499 - | {
|
500 - | panic!("test_aws_chunked_encoding timed out after {timeout_duration:?}");
|
501 859 | }
|
502 - | }
|
503 860 |
|
504 - | #[tokio::test]
|
505 - | async fn test_aws_chunked_encoding_sputtering_body() {
|
506 - | let test_fut = async {
|
507 - | let input = SputteringBody {
|
508 - | parts: vec![
|
509 - | Some(Bytes::from_static(b"chunk 1, ")),
|
510 - | None,
|
511 - | Some(Bytes::from_static(b"chunk 2, ")),
|
512 - | Some(Bytes::from_static(b"chunk 3, ")),
|
513 - | None,
|
514 - | None,
|
515 - | Some(Bytes::from_static(b"chunk 4, ")),
|
516 - | Some(Bytes::from_static(b"chunk 5, ")),
|
517 - | Some(Bytes::from_static(b"chunk 6")),
|
518 - | ],
|
519 - | cursor: 0,
|
520 - | delay_in_millis: 500,
|
521 - | };
|
522 - | let opts = AwsChunkedBodyOptions::new(input.len() as u64, Vec::new());
|
523 - | let mut body = AwsChunkedBody::new(input, opts);
|
861 + | #[tokio::test]
|
862 + | async fn test_aws_chunked_encoding_empty_body() {
|
863 + | let input_str = "";
|
864 + | let opts = AwsChunkedBodyOptions::new(input_str.len() as u64, Vec::new());
|
865 + | let mut body = AwsChunkedBody::new(SdkBody::from(input_str), opts);
|
524 866 |
|
525 867 | let mut output = SegmentedBuf::new();
|
526 868 | while let Some(buf) = body.data().await {
|
527 869 | output.push(buf.unwrap());
|
528 870 | }
|
529 871 |
|
530 872 | let mut actual_output = String::new();
|
531 873 | output
|
532 874 | .reader()
|
533 875 | .read_to_string(&mut actual_output)
|
534 876 | .expect("Doesn't cause IO errors");
|
535 877 |
|
536 - | let expected_output =
|
537 - | "34\r\nchunk 1, chunk 2, chunk 3, chunk 4, chunk 5, chunk 6\r\n0\r\n\r\n";
|
878 + | let expected_output = [CHUNK_TERMINATOR, CRLF].concat();
|
538 879 |
|
539 880 | assert_eq!(expected_output, actual_output);
|
540 881 | assert!(
|
541 882 | body.trailers()
|
542 883 | .await
|
543 884 | .expect("no errors occurred during trailer polling")
|
544 885 | .is_none(),
|
545 886 | "aws-chunked encoded bodies don't have normal HTTP trailers"
|
546 887 | );
|
547 - | };
|
888 + | }
|
548 889 |
|
549 - | let timeout_duration = Duration::from_secs(3);
|
550 - | if tokio::time::timeout(timeout_duration, test_fut)
|
551 - | .await
|
552 - | .is_err()
|
553 - | {
|
554 - | panic!(
|
555 - | "test_aws_chunked_encoding_sputtering_body timed out after {timeout_duration:?}"
|
556 - | );
|
890 + | #[tokio::test]
|
891 + | async fn test_total_rendered_length_of_trailers() {
|
892 + | let mut trailers = HeaderMap::new();
|
893 + |
|
894 + | trailers.insert("empty_value", HeaderValue::from_static(""));
|
895 + |
|
896 + | trailers.insert("single_value", HeaderValue::from_static("value 1"));
|
897 + |
|
898 + | trailers.insert("two_values", HeaderValue::from_static("value 1"));
|
899 + | trailers.append("two_values", HeaderValue::from_static("value 2"));
|
900 + |
|
901 + | trailers.insert("three_values", HeaderValue::from_static("value 1"));
|
902 + | trailers.append("three_values", HeaderValue::from_static("value 2"));
|
903 + | trailers.append("three_values", HeaderValue::from_static("value 3"));
|
904 + |
|
905 + | let trailers = Some(trailers);
|
906 + | let actual_length = total_rendered_length_of_trailers(trailers.as_ref());
|
907 + | let expected_length =
|
908 + | (trailers_as_aws_chunked_bytes(trailers, actual_length).len()) as u64;
|
909 + |
|
910 + | assert_eq!(expected_length, actual_length);
|
911 + | }
|
912 + |
|
913 + | #[tokio::test]
|
914 + | async fn test_total_rendered_length_of_empty_trailers() {
|
915 + | let trailers = Some(HeaderMap::new());
|
916 + | let actual_length = total_rendered_length_of_trailers(trailers.as_ref());
|
917 + | let expected_length =
|
918 + | (trailers_as_aws_chunked_bytes(trailers, actual_length).len()) as u64;
|
919 + |
|
920 + | assert_eq!(expected_length, actual_length);
|
557 921 | }
|
558 922 | }
|
559 923 |
|
560 - | #[tokio::test]
|
561 - | #[should_panic = "called `Result::unwrap()` on an `Err` value: ReportedTrailerLengthMismatch { actual: 44, expected: 0 }"]
|
562 - | async fn test_aws_chunked_encoding_incorrect_trailer_length_panic() {
|
563 - | let input_str = "Hello world";
|
564 - | // Test body has no trailers, so this length is incorrect and will trigger an assert panic
|
565 - | // When the panic occurs, it will actually expect a length of 44. This is because, when using
|
566 - | // aws-chunked encoding, each trailer will end with a CRLF which is 2 bytes long.
|
567 - | let wrong_trailer_len = 42;
|
568 - | let opts = AwsChunkedBodyOptions::new(input_str.len() as u64, vec![wrong_trailer_len]);
|
569 - | let mut body = AwsChunkedBody::new(SdkBody::from(input_str), opts);
|
924 + | #[cfg(test)]
|
925 + | mod http_1x_tests {
|
926 + | use super::super::{
|
927 + | http_1x_utils::{total_rendered_length_of_trailers, trailers_as_aws_chunked_bytes},
|
928 + | AwsChunkedBody, AwsChunkedBodyOptions, CHUNK_TERMINATOR_RAW, CRLF_RAW,
|
929 + | };
|
570 930 |
|
571 - | // We don't care about the body contents but we have to read it all before checking for trailers
|
572 - | while let Some(buf) = body.data().await {
|
573 - | drop(buf.unwrap());
|
931 + | use aws_smithy_types::body::SdkBody;
|
932 + | use bytes::{Buf, Bytes, BytesMut};
|
933 + | use bytes_utils::SegmentedBuf;
|
934 + | use http_1x::{HeaderMap, HeaderValue};
|
935 + | use http_body_1x::{Body, Frame, SizeHint};
|
936 + | use http_body_util::BodyExt;
|
937 + | use pin_project_lite::pin_project;
|
938 + |
|
939 + | use std::io::Read;
|
940 + | use std::pin::Pin;
|
941 + | use std::task::{Context, Poll};
|
942 + | use std::time::Duration;
|
943 + |
|
944 + | pin_project! {
|
945 + | struct SputteringBody {
|
946 + | parts: Vec<Option<Bytes>>,
|
947 + | cursor: usize,
|
948 + | delay_in_millis: u64,
|
949 + | }
|
574 950 | }
|
575 951 |
|
576 - | assert!(
|
577 - | body.trailers()
|
578 - | .await
|
579 - | .expect("no errors occurred during trailer polling")
|
580 - | .is_none(),
|
581 - | "aws-chunked encoded bodies don't have normal HTTP trailers"
|
582 - | );
|
583 - | }
|
952 + | impl SputteringBody {
|
953 + | fn len(&self) -> usize {
|
954 + | self.parts.iter().flatten().map(|b| b.len()).sum()
|
955 + | }
|
956 + | }
|
957 + |
|
958 + | impl Body for SputteringBody {
|
959 + | type Data = Bytes;
|
960 + | type Error = aws_smithy_types::body::Error;
|
584 961 |
|
585 - | #[tokio::test]
|
586 - | async fn test_aws_chunked_encoding_empty_body() {
|
587 - | let input_str = "";
|
588 - | let opts = AwsChunkedBodyOptions::new(input_str.len() as u64, Vec::new());
|
589 - | let mut body = AwsChunkedBody::new(SdkBody::from(input_str), opts);
|
962 + | fn poll_frame(
|
963 + | self: Pin<&mut Self>,
|
964 + | cx: &mut Context<'_>,
|
965 + | ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
|
966 + | if self.cursor == self.parts.len() {
|
967 + | return Poll::Ready(None);
|
968 + | }
|
590 969 |
|
591 - | let mut output = SegmentedBuf::new();
|
592 - | while let Some(buf) = body.data().await {
|
593 - | output.push(buf.unwrap());
|
970 + | let this = self.project();
|
971 + | let delay_in_millis = *this.delay_in_millis;
|
972 + | let next_part = this.parts.get_mut(*this.cursor).unwrap().take();
|
973 + |
|
974 + | match next_part {
|
975 + | None => {
|
976 + | *this.cursor += 1;
|
977 + | let waker = cx.waker().clone();
|
978 + | tokio::spawn(async move {
|
979 + | tokio::time::sleep(Duration::from_millis(delay_in_millis)).await;
|
980 + | waker.wake();
|
981 + | });
|
982 + | Poll::Pending
|
983 + | }
|
984 + | Some(data) => {
|
985 + | *this.cursor += 1;
|
986 + | let frame = Frame::data(data);
|
987 + | Poll::Ready(Some(Ok(frame)))
|
988 + | }
|
989 + | }
|
990 + | }
|
991 + |
|
992 + | fn is_end_stream(&self) -> bool {
|
993 + | false
|
994 + | }
|
995 + |
|
996 + | fn size_hint(&self) -> SizeHint {
|
997 + | SizeHint::new()
|
998 + | }
|
594 999 | }
|
595 1000 |
|
596 - | let mut actual_output = String::new();
|
597 - | output
|
598 - | .reader()
|
599 - | .read_to_string(&mut actual_output)
|
600 - | .expect("Doesn't cause IO errors");
|
1001 + | #[tokio::test]
|
1002 + | async fn test_aws_chunked_encoding() {
|
1003 + | let test_fut = async {
|
1004 + | let input_str = "Hello world";
|
1005 + | let opts = AwsChunkedBodyOptions::new(input_str.len() as u64, vec![]);
|
1006 + | let mut body = AwsChunkedBody::new(SdkBody::from(input_str), opts);
|
1007 + |
|
1008 + | let mut output = SegmentedBuf::new();
|
1009 + | while let Some(Ok(buf)) = body.frame().await {
|
1010 + | output.push(buf.into_data().unwrap());
|
1011 + | }
|
1012 + |
|
1013 + | let mut actual_output = String::new();
|
1014 + | output
|
1015 + | .reader()
|
1016 + | .read_to_string(&mut actual_output)
|
1017 + | .expect("Doesn't cause IO errors");
|
601 1018 |
|
602 - | let expected_output = [CHUNK_TERMINATOR, CRLF].concat();
|
1019 + | let expected_output = "B\r\nHello world\r\n0\r\n\r\n";
|
603 1020 |
|
604 - | assert_eq!(expected_output, actual_output);
|
605 - | assert!(
|
606 - | body.trailers()
|
1021 + | assert_eq!(expected_output, actual_output);
|
1022 + |
|
1023 + | // You can insert a `tokio::time::sleep` here to verify the timeout works as intended
|
1024 + | };
|
1025 + |
|
1026 + | let timeout_duration = Duration::from_secs(3);
|
1027 + | if tokio::time::timeout(timeout_duration, test_fut)
|
607 1028 | .await
|
608 - | .expect("no errors occurred during trailer polling")
|
609 - | .is_none(),
|
610 - | "aws-chunked encoded bodies don't have normal HTTP trailers"
|
611 - | );
|
612 - | }
|
1029 + | .is_err()
|
1030 + | {
|
1031 + | panic!("test_aws_chunked_encoding timed out after {timeout_duration:?}");
|
1032 + | }
|
1033 + | }
|
613 1034 |
|
614 - | #[tokio::test]
|
615 - | async fn test_total_rendered_length_of_trailers() {
|
616 - | let mut trailers = HeaderMap::new();
|
1035 + | #[tokio::test]
|
1036 + | async fn test_aws_chunked_encoding_sputtering_body() {
|
1037 + | let test_fut = async {
|
1038 + | let input = SputteringBody {
|
1039 + | parts: vec![
|
1040 + | Some(Bytes::from_static(b"chunk 1, ")),
|
1041 + | None,
|
1042 + | Some(Bytes::from_static(b"chunk 2, ")),
|
1043 + | Some(Bytes::from_static(b"chunk 3, ")),
|
1044 + | None,
|
1045 + | None,
|
1046 + | Some(Bytes::from_static(b"chunk 4, ")),
|
1047 + | Some(Bytes::from_static(b"chunk 5, ")),
|
1048 + | Some(Bytes::from_static(b"chunk 6")),
|
1049 + | ],
|
1050 + | cursor: 0,
|
1051 + | delay_in_millis: 500,
|
1052 + | };
|
1053 + | let opts = AwsChunkedBodyOptions::new(input.len() as u64, vec![]);
|
1054 + | let mut body = AwsChunkedBody::new(input, opts);
|
617 1055 |
|
618 - | trailers.insert("empty_value", HeaderValue::from_static(""));
|
1056 + | let mut output = SegmentedBuf::new();
|
1057 + | while let Some(Ok(buf)) = body.frame().await {
|
1058 + | output.push(buf.into_data().unwrap());
|
1059 + | }
|
619 1060 |
|
620 - | trailers.insert("single_value", HeaderValue::from_static("value 1"));
|
1061 + | let mut actual_output = String::new();
|
1062 + | output
|
1063 + | .reader()
|
1064 + | .read_to_string(&mut actual_output)
|
1065 + | .expect("Doesn't cause IO errors");
|
621 1066 |
|
622 - | trailers.insert("two_values", HeaderValue::from_static("value 1"));
|
623 - | trailers.append("two_values", HeaderValue::from_static("value 2"));
|
1067 + | let expected_output =
|
1068 + | "34\r\nchunk 1, chunk 2, chunk 3, chunk 4, chunk 5, chunk 6\r\n0\r\n\r\n";
|
624 1069 |
|
625 - | trailers.insert("three_values", HeaderValue::from_static("value 1"));
|
626 - | trailers.append("three_values", HeaderValue::from_static("value 2"));
|
627 - | trailers.append("three_values", HeaderValue::from_static("value 3"));
|
1070 + | assert_eq!(expected_output, actual_output);
|
1071 + | };
|
628 1072 |
|
629 - | let trailers = Some(trailers);
|
630 - | let actual_length = total_rendered_length_of_trailers(trailers.as_ref());
|
631 - | let expected_length = (trailers_as_aws_chunked_bytes(trailers, actual_length).len()) as u64;
|
1073 + | let timeout_duration = Duration::from_secs(3);
|
1074 + | if tokio::time::timeout(timeout_duration, test_fut)
|
1075 + | .await
|
1076 + | .is_err()
|
1077 + | {
|
1078 + | panic!(
|
1079 + | "test_aws_chunked_encoding_sputtering_body timed out after {timeout_duration:?}"
|
1080 + | );
|
1081 + | }
|
1082 + | }
|
632 1083 |
|
633 - | assert_eq!(expected_length, actual_length);
|
634 - | }
|
1084 + | #[tokio::test]
|
1085 + | async fn test_aws_chunked_encoding_incorrect_trailer_length_panic() {
|
1086 + | let input_str = "Hello world";
|
1087 + | // Test body has no trailers, so this length is incorrect and will trigger an assert panic
|
1088 + | // When the panic occurs, it will actually expect a length of 44. This is because, when using
|
1089 + | // aws-chunked encoding, each trailer will end with a CRLF which is 2 bytes long.
|
1090 + | let wrong_trailer_len = 42;
|
1091 + | let opts = AwsChunkedBodyOptions::new(input_str.len() as u64, vec![wrong_trailer_len]);
|
1092 + | let mut body = AwsChunkedBody::new(SdkBody::from(input_str), opts);
|
1093 + |
|
1094 + | // We don't care about the body contents but we have to read it all before checking for trailers
|
1095 + | while let Some(Ok(frame)) = body.frame().await {
|
1096 + | assert!(!frame.is_trailers());
|
1097 + | }
|
1098 + | }
|
1099 + |
|
1100 + | #[tokio::test]
|
1101 + | async fn test_aws_chunked_encoding_empty_body() {
|
1102 + | let input_str = "";
|
1103 + | let opts = AwsChunkedBodyOptions::new(input_str.len() as u64, vec![]);
|
1104 + | let mut body = AwsChunkedBody::new(SdkBody::from(input_str), opts);
|
635 1105 |
|
636 - | #[tokio::test]
|
637 - | async fn test_total_rendered_length_of_empty_trailers() {
|
638 - | let trailers = Some(HeaderMap::new());
|
639 - | let actual_length = total_rendered_length_of_trailers(trailers.as_ref());
|
640 - | let expected_length = (trailers_as_aws_chunked_bytes(trailers, actual_length).len()) as u64;
|
1106 + | let mut output = SegmentedBuf::new();
|
1107 + | while let Some(Ok(frame)) = body.frame().await {
|
1108 + | output.push(frame.into_data().unwrap());
|
1109 + | }
|
1110 + |
|
1111 + | let mut actual_output = String::new();
|
1112 + | output
|
1113 + | .reader()
|
1114 + | .read_to_string(&mut actual_output)
|
1115 + | .expect("Doesn't cause IO errors");
|
641 1116 |
|
642 - | assert_eq!(expected_length, actual_length);
|
1117 + | let actual_output = std::str::from_utf8(actual_output.as_bytes()).unwrap();
|
1118 + | let expected_output = [CHUNK_TERMINATOR_RAW, CRLF_RAW].concat();
|
1119 + | let expected_output = std::str::from_utf8(&expected_output).unwrap();
|
1120 + |
|
1121 + | assert_eq!(expected_output, actual_output);
|
1122 + | }
|
1123 + |
|
1124 + | #[tokio::test]
|
1125 + | async fn test_total_rendered_length_of_trailers() {
|
1126 + | let mut trailers = HeaderMap::new();
|
1127 + |
|
1128 + | trailers.insert("empty_value", HeaderValue::from_static(""));
|
1129 + |
|
1130 + | trailers.insert("single_value", HeaderValue::from_static("value 1"));
|
1131 + |
|
1132 + | trailers.insert("two_values", HeaderValue::from_static("value 1"));
|
1133 + | trailers.append("two_values", HeaderValue::from_static("value 2"));
|
1134 + |
|
1135 + | trailers.insert("three_values", HeaderValue::from_static("value 1"));
|
1136 + | trailers.append("three_values", HeaderValue::from_static("value 2"));
|
1137 + | trailers.append("three_values", HeaderValue::from_static("value 3"));
|
1138 + |
|
1139 + | let trailers = Some(&trailers);
|
1140 + | let actual_length = total_rendered_length_of_trailers(trailers);
|
1141 + | let buf = BytesMut::with_capacity(actual_length as usize);
|
1142 + | let expected_length = (trailers_as_aws_chunked_bytes(trailers, buf).len()) as u64;
|
1143 + |
|
1144 + | assert_eq!(expected_length, actual_length);
|
1145 + | }
|
1146 + |
|
1147 + | #[tokio::test]
|
1148 + | async fn test_total_rendered_length_of_empty_trailers() {
|
1149 + | let header_map = HeaderMap::new();
|
1150 + | let trailers = Some(&header_map);
|
1151 + | let actual_length = total_rendered_length_of_trailers(trailers);
|
1152 + | let buf = BytesMut::with_capacity(actual_length as usize);
|
1153 + | let expected_length = (trailers_as_aws_chunked_bytes(trailers, buf).len()) as u64;
|
1154 + |
|
1155 + | assert_eq!(expected_length, actual_length);
|
1156 + | }
|
643 1157 | }
|
644 1158 | }
|