303 306 |
|
304 307 | #[cfg(test)]
|
305 308 | mod tests {
|
306 309 | use super::{InitialMessageType, Receiver, UnmarshallMessage};
|
307 310 | use aws_smithy_eventstream::error::Error as EventStreamError;
|
308 311 | use aws_smithy_eventstream::frame::{write_message_to, UnmarshalledMessage};
|
309 312 | use aws_smithy_runtime_api::client::result::SdkError;
|
310 313 | use aws_smithy_types::body::SdkBody;
|
311 314 | use aws_smithy_types::event_stream::{Header, HeaderValue, Message};
|
312 315 | use bytes::Bytes;
|
313 - | use hyper::body::Body;
|
316 + | use http_body_1x::Frame;
|
314 317 | use std::error::Error as StdError;
|
315 318 | use std::io::{Error as IOError, ErrorKind};
|
316 319 |
|
317 320 | fn encode_initial_response() -> Bytes {
|
318 321 | let mut buffer = Vec::new();
|
319 322 | let message = Message::new(Bytes::new())
|
320 323 | .add_header(Header::new(
|
321 324 | ":message-type",
|
322 325 | HeaderValue::String("event".into()),
|
323 326 | ))
|
324 327 | .add_header(Header::new(
|
325 328 | ":event-type",
|
326 329 | HeaderValue::String("initial-response".into()),
|
327 330 | ));
|
328 331 | write_message_to(&message, &mut buffer).unwrap();
|
329 332 | buffer.into()
|
330 333 | }
|
331 334 |
|
332 335 | fn encode_message(message: &str) -> Bytes {
|
333 336 | let mut buffer = Vec::new();
|
334 337 | let message = Message::new(Bytes::copy_from_slice(message.as_bytes()));
|
335 338 | write_message_to(&message, &mut buffer).unwrap();
|
336 339 | buffer.into()
|
337 340 | }
|
338 341 |
|
342 + | fn map_to_frame(stream: Vec<Result<Bytes, IOError>>) -> Vec<Result<Frame<Bytes>, IOError>> {
|
343 + | stream
|
344 + | .into_iter()
|
345 + | .map(|chunk| chunk.map(Frame::data))
|
346 + | .collect()
|
347 + | }
|
348 + |
|
339 349 | #[derive(Debug)]
|
340 350 | struct FakeError;
|
341 351 | impl std::fmt::Display for FakeError {
|
342 352 | fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
343 353 | write!(f, "FakeError")
|
344 354 | }
|
345 355 | }
|
346 356 | impl StdError for FakeError {}
|
347 357 |
|
348 358 | #[derive(Debug, Eq, PartialEq)]
|
349 359 | struct TestMessage(String);
|
350 360 |
|
351 361 | #[derive(Debug)]
|
352 362 | struct Unmarshaller;
|
353 363 | impl UnmarshallMessage for Unmarshaller {
|
354 364 | type Output = TestMessage;
|
355 365 | type Error = EventStreamError;
|
356 366 |
|
357 367 | fn unmarshall(
|
358 368 | &self,
|
359 369 | message: &Message,
|
360 370 | ) -> Result<UnmarshalledMessage<Self::Output, Self::Error>, EventStreamError> {
|
361 371 | Ok(UnmarshalledMessage::Event(TestMessage(
|
362 372 | std::str::from_utf8(&message.payload()[..]).unwrap().into(),
|
363 373 | )))
|
364 374 | }
|
365 375 | }
|
366 376 |
|
367 377 | #[tokio::test]
|
368 378 | async fn receive_success() {
|
369 379 | let chunks: Vec<Result<_, IOError>> =
|
370 - | vec![Ok(encode_message("one")), Ok(encode_message("two"))];
|
380 + | map_to_frame(vec![Ok(encode_message("one")), Ok(encode_message("two"))]);
|
371 381 | let chunk_stream = futures_util::stream::iter(chunks);
|
372 - | let body = SdkBody::from_body_0_4(Body::wrap_stream(chunk_stream));
|
382 + | let stream_body = http_body_util::StreamBody::new(chunk_stream);
|
383 + | let body = SdkBody::from_body_1_x(stream_body);
|
384 + |
|
373 385 | let mut receiver = Receiver::<TestMessage, EventStreamError>::new(Unmarshaller, body);
|
386 + |
|
374 387 | assert_eq!(
|
375 388 | TestMessage("one".into()),
|
376 389 | receiver.recv().await.unwrap().unwrap()
|
377 390 | );
|
378 391 | assert_eq!(
|
379 392 | TestMessage("two".into()),
|
380 393 | receiver.recv().await.unwrap().unwrap()
|
381 394 | );
|
382 395 | assert_eq!(None, receiver.recv().await.unwrap());
|
383 396 | }
|
384 397 |
|
385 398 | #[tokio::test]
|
386 399 | async fn receive_last_chunk_empty() {
|
387 - | let chunks: Vec<Result<_, IOError>> = vec![
|
400 + | let chunks: Vec<Result<_, IOError>> = map_to_frame(vec![
|
388 401 | Ok(encode_message("one")),
|
389 402 | Ok(encode_message("two")),
|
390 403 | Ok(Bytes::from_static(&[])),
|
391 - | ];
|
404 + | ]);
|
392 405 | let chunk_stream = futures_util::stream::iter(chunks);
|
393 - | let body = SdkBody::from_body_0_4(Body::wrap_stream(chunk_stream));
|
406 + | let stream_body = http_body_util::StreamBody::new(chunk_stream);
|
407 + | let body = SdkBody::from_body_1_x(stream_body);
|
394 408 | let mut receiver = Receiver::<TestMessage, EventStreamError>::new(Unmarshaller, body);
|
395 409 | assert_eq!(
|
396 410 | TestMessage("one".into()),
|
397 411 | receiver.recv().await.unwrap().unwrap()
|
398 412 | );
|
399 413 | assert_eq!(
|
400 414 | TestMessage("two".into()),
|
401 415 | receiver.recv().await.unwrap().unwrap()
|
402 416 | );
|
403 417 | assert_eq!(None, receiver.recv().await.unwrap());
|
404 418 | }
|
405 419 |
|
406 420 | #[tokio::test]
|
407 421 | async fn receive_last_chunk_not_full_message() {
|
408 - | let chunks: Vec<Result<_, IOError>> = vec![
|
422 + | let chunks: Vec<Result<_, IOError>> = map_to_frame(vec![
|
409 423 | Ok(encode_message("one")),
|
410 424 | Ok(encode_message("two")),
|
411 425 | Ok(encode_message("three").split_to(10)),
|
412 - | ];
|
426 + | ]);
|
413 427 | let chunk_stream = futures_util::stream::iter(chunks);
|
414 - | let body = SdkBody::from_body_0_4(Body::wrap_stream(chunk_stream));
|
428 + | let stream_body = http_body_util::StreamBody::new(chunk_stream);
|
429 + | let body = SdkBody::from_body_1_x(stream_body);
|
415 430 | let mut receiver = Receiver::<TestMessage, EventStreamError>::new(Unmarshaller, body);
|
416 431 | assert_eq!(
|
417 432 | TestMessage("one".into()),
|
418 433 | receiver.recv().await.unwrap().unwrap()
|
419 434 | );
|
420 435 | assert_eq!(
|
421 436 | TestMessage("two".into()),
|
422 437 | receiver.recv().await.unwrap().unwrap()
|
423 438 | );
|
424 439 | assert!(matches!(
|
425 440 | receiver.recv().await,
|
426 441 | Err(SdkError::ResponseError { .. }),
|
427 442 | ));
|
428 443 | }
|
429 444 |
|
430 445 | #[tokio::test]
|
431 446 | async fn receive_last_chunk_has_multiple_messages() {
|
432 - | let chunks: Vec<Result<_, IOError>> = vec![
|
447 + | let chunks: Vec<Result<_, IOError>> = map_to_frame(vec![
|
433 448 | Ok(encode_message("one")),
|
434 449 | Ok(encode_message("two")),
|
435 450 | Ok(Bytes::from(
|
436 451 | [encode_message("three"), encode_message("four")].concat(),
|
437 452 | )),
|
438 - | ];
|
453 + | ]);
|
439 454 | let chunk_stream = futures_util::stream::iter(chunks);
|
440 - | let body = SdkBody::from_body_0_4(Body::wrap_stream(chunk_stream));
|
455 + | let stream_body = http_body_util::StreamBody::new(chunk_stream);
|
456 + | let body = SdkBody::from_body_1_x(stream_body);
|
441 457 | let mut receiver = Receiver::<TestMessage, EventStreamError>::new(Unmarshaller, body);
|
442 458 | assert_eq!(
|
443 459 | TestMessage("one".into()),
|
444 460 | receiver.recv().await.unwrap().unwrap()
|
445 461 | );
|
446 462 | assert_eq!(
|
447 463 | TestMessage("two".into()),
|
448 464 | receiver.recv().await.unwrap().unwrap()
|
449 465 | );
|
450 466 | assert_eq!(
|
451 467 | TestMessage("three".into()),
|
452 468 | receiver.recv().await.unwrap().unwrap()
|
453 469 | );
|
454 470 | assert_eq!(
|
455 471 | TestMessage("four".into()),
|
456 472 | receiver.recv().await.unwrap().unwrap()
|
457 473 | );
|
458 474 | assert_eq!(None, receiver.recv().await.unwrap());
|
459 475 | }
|
460 476 |
|
461 477 | proptest::proptest! {
|
462 478 | #[test]
|
463 479 | fn receive_multiple_messages_split_unevenly_across_chunks(b1: usize, b2: usize) {
|
464 480 | let combined = Bytes::from([
|
465 481 | encode_message("one"),
|
466 482 | encode_message("two"),
|
467 483 | encode_message("three"),
|
468 484 | encode_message("four"),
|
469 485 | encode_message("five"),
|
470 486 | encode_message("six"),
|
471 487 | encode_message("seven"),
|
472 488 | encode_message("eight"),
|
473 489 | ].concat());
|
474 490 |
|
475 491 | let midpoint = combined.len() / 2;
|
476 492 | let (start, boundary1, boundary2, end) = (
|
477 493 | 0,
|
478 494 | b1 % midpoint,
|
479 495 | midpoint + b2 % midpoint,
|
480 496 | combined.len()
|
481 497 | );
|
482 498 | println!("[{start}, {boundary1}], [{boundary1}, {boundary2}], [{boundary2}, {end}]");
|
483 499 |
|
484 500 | let rt = tokio::runtime::Runtime::new().unwrap();
|
485 501 | rt.block_on(async move {
|
486 - | let chunks: Vec<Result<_, IOError>> = vec![
|
502 + | let chunks: Vec<Result<_, IOError>> = map_to_frame(vec![
|
487 503 | Ok(Bytes::copy_from_slice(&combined[start..boundary1])),
|
488 504 | Ok(Bytes::copy_from_slice(&combined[boundary1..boundary2])),
|
489 505 | Ok(Bytes::copy_from_slice(&combined[boundary2..end])),
|
490 - | ];
|
506 + | ]);
|
491 507 |
|
492 508 | let chunk_stream = futures_util::stream::iter(chunks);
|
493 - | let body = SdkBody::from_body_0_4(Body::wrap_stream(chunk_stream));
|
509 + | let stream_body = http_body_util::StreamBody::new(chunk_stream);
|
510 + | let body = SdkBody::from_body_1_x(stream_body);
|
494 511 | let mut receiver = Receiver::<TestMessage, EventStreamError>::new(Unmarshaller, body);
|
495 512 | for payload in &["one", "two", "three", "four", "five", "six", "seven", "eight"] {
|
496 513 | assert_eq!(
|
497 514 | TestMessage((*payload).into()),
|
498 515 | receiver.recv().await.unwrap().unwrap()
|
499 516 | );
|
500 517 | }
|
501 518 | assert_eq!(None, receiver.recv().await.unwrap());
|
502 519 | });
|
503 520 | }
|
504 521 | }
|
505 522 |
|
506 523 | #[tokio::test]
|
507 524 | async fn receive_network_failure() {
|
508 - | let chunks: Vec<Result<_, IOError>> = vec![
|
525 + | let chunks: Vec<Result<_, IOError>> = map_to_frame(vec![
|
509 526 | Ok(encode_message("one")),
|
510 527 | Err(IOError::new(ErrorKind::ConnectionReset, FakeError)),
|
511 - | ];
|
528 + | ]);
|
512 529 | let chunk_stream = futures_util::stream::iter(chunks);
|
513 - | let body = SdkBody::from_body_0_4(Body::wrap_stream(chunk_stream));
|
530 + | let stream_body = http_body_util::StreamBody::new(chunk_stream);
|
531 + | let body = SdkBody::from_body_1_x(stream_body);
|
514 532 | let mut receiver = Receiver::<TestMessage, EventStreamError>::new(Unmarshaller, body);
|
515 533 | assert_eq!(
|
516 534 | TestMessage("one".into()),
|
517 535 | receiver.recv().await.unwrap().unwrap()
|
518 536 | );
|
519 537 | assert!(matches!(
|
520 538 | receiver.recv().await,
|
521 539 | Err(SdkError::DispatchFailure(_))
|
522 540 | ));
|
523 541 | }
|
524 542 |
|
525 543 | #[tokio::test]
|
526 544 | async fn receive_message_parse_failure() {
|
527 - | let chunks: Vec<Result<_, IOError>> = vec![
|
545 + | let chunks: Vec<Result<_, IOError>> = map_to_frame(vec![
|
528 546 | Ok(encode_message("one")),
|
529 547 | // A zero length message will be invalid. We need to provide a minimum of 12 bytes
|
530 548 | // for the MessageFrameDecoder to actually start parsing it.
|
531 549 | Ok(Bytes::from_static(&[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])),
|
532 - | ];
|
550 + | ]);
|
533 551 | let chunk_stream = futures_util::stream::iter(chunks);
|
534 - | let body = SdkBody::from_body_0_4(Body::wrap_stream(chunk_stream));
|
552 + | let stream_body = http_body_util::StreamBody::new(chunk_stream);
|
553 + | let body = SdkBody::from_body_1_x(stream_body);
|
535 554 | let mut receiver = Receiver::<TestMessage, EventStreamError>::new(Unmarshaller, body);
|
536 555 | assert_eq!(
|
537 556 | TestMessage("one".into()),
|
538 557 | receiver.recv().await.unwrap().unwrap()
|
539 558 | );
|
540 559 | assert!(matches!(
|
541 560 | receiver.recv().await,
|
542 561 | Err(SdkError::ResponseError { .. })
|
543 562 | ));
|
544 563 | }
|
545 564 |
|
546 565 | #[tokio::test]
|
547 566 | async fn receive_initial_response() {
|
548 - | let chunks: Vec<Result<_, IOError>> =
|
549 - | vec![Ok(encode_initial_response()), Ok(encode_message("one"))];
|
567 + | let chunks: Vec<Result<_, IOError>> = map_to_frame(vec![
|
568 + | Ok(encode_initial_response()),
|
569 + | Ok(encode_message("one")),
|
570 + | ]);
|
550 571 | let chunk_stream = futures_util::stream::iter(chunks);
|
551 - | let body = SdkBody::from_body_0_4(Body::wrap_stream(chunk_stream));
|
572 + | let stream_body = http_body_util::StreamBody::new(chunk_stream);
|
573 + | let body = SdkBody::from_body_1_x(stream_body);
|
552 574 | let mut receiver = Receiver::<TestMessage, EventStreamError>::new(Unmarshaller, body);
|
553 575 | assert!(receiver
|
554 576 | .try_recv_initial(InitialMessageType::Response)
|
555 577 | .await
|
556 578 | .unwrap()
|
557 579 | .is_some());
|
558 580 | assert_eq!(
|
559 581 | TestMessage("one".into()),
|
560 582 | receiver.recv().await.unwrap().unwrap()
|
561 583 | );
|
562 584 | }
|
563 585 |
|
564 586 | #[tokio::test]
|
565 587 | async fn receive_no_initial_response() {
|
566 588 | let chunks: Vec<Result<_, IOError>> =
|
567 - | vec![Ok(encode_message("one")), Ok(encode_message("two"))];
|
589 + | map_to_frame(vec![Ok(encode_message("one")), Ok(encode_message("two"))]);
|
568 590 | let chunk_stream = futures_util::stream::iter(chunks);
|
569 - | let body = SdkBody::from_body_0_4(Body::wrap_stream(chunk_stream));
|
591 + | let stream_body = http_body_util::StreamBody::new(chunk_stream);
|
592 + |
|
593 + | let body = SdkBody::from_body_1_x(stream_body);
|
570 594 | let mut receiver = Receiver::<TestMessage, EventStreamError>::new(Unmarshaller, body);
|
571 595 | assert!(receiver
|
572 596 | .try_recv_initial(InitialMessageType::Response)
|
573 597 | .await
|
574 598 | .unwrap()
|
575 599 | .is_none());
|
576 600 | assert_eq!(
|
577 601 | TestMessage("one".into()),
|
578 602 | receiver.recv().await.unwrap().unwrap()
|
579 603 | );
|