AWS SDK

AWS SDK

rev. 33341052640bdff4ac4f4158b00a75df08ff159b (ignoring whitespace)

Files changed:

tmp-codegen-diff/aws-sdk/sdk/aws-smithy-legacy-http/Cargo.toml

@@ -0,1 +0,74 @@
           1  +
# Code generated by software.amazon.smithy.rust.codegen.smithy-rs. DO NOT EDIT.
           2  +
[package]
           3  +
name = "aws-smithy-legacy-http"
           4  +
version = "0.62.5"
           5  +
authors = ["AWS Rust SDK Team <aws-sdk-rust@amazon.com>", "Russell Cohen <rcoh@amazon.com>"]
           6  +
description = "Smithy HTTP-0x logic for smithy-rs."
           7  +
edition = "2021"
           8  +
license = "Apache-2.0"
           9  +
repository = "https://github.com/smithy-lang/smithy-rs"
          10  +
[package.metadata.docs.rs]
          11  +
all-features = true
          12  +
targets = ["x86_64-unknown-linux-gnu"]
          13  +
cargo-args = ["-Zunstable-options", "-Zrustdoc-scrape-examples"]
          14  +
rustdoc-args = ["--cfg", "docsrs"]
          15  +
          16  +
[features]
          17  +
event-stream = ["aws-smithy-eventstream"]
          18  +
rt-tokio = ["aws-smithy-types/rt-tokio"]
          19  +
          20  +
[dependencies]
          21  +
bytes = "1.10.0"
          22  +
bytes-utils = "0.1"
          23  +
percent-encoding = "2.3.1"
          24  +
pin-project-lite = "0.2.14"
          25  +
pin-utils = "0.1.0"
          26  +
tracing = "0.1.40"
          27  +
futures-core = "0.3.31"
          28  +
          29  +
[dependencies.aws-smithy-eventstream]
          30  +
path = "../aws-smithy-eventstream"
          31  +
optional = true
          32  +
version = "0.60.13"
          33  +
          34  +
[dependencies.aws-smithy-runtime-api]
          35  +
path = "../aws-smithy-runtime-api"
          36  +
features = ["client", "http-02x"]
          37  +
version = "1.9.2"
          38  +
          39  +
[dependencies.aws-smithy-types]
          40  +
path = "../aws-smithy-types"
          41  +
features = ["byte-stream-poll-next", "http-body-0-4-x"]
          42  +
version = "1.3.4"
          43  +
          44  +
[dependencies.http-02x]
          45  +
package = "http"
          46  +
version = "0.2.9"
          47  +
          48  +
[dependencies.http-1x]
          49  +
package = "http"
          50  +
version = "1"
          51  +
          52  +
[dependencies.http-body-04x]
          53  +
package = "http-body"
          54  +
version = "0.4.5"
          55  +
          56  +
[dependencies.futures-util]
          57  +
version = "0.3.29"
          58  +
default-features = false
          59  +
          60  +
[dev-dependencies]
          61  +
async-stream = "0.3"
          62  +
proptest = "1"
          63  +
          64  +
[dev-dependencies.futures-util]
          65  +
version = "0.3.29"
          66  +
default-features = false
          67  +
          68  +
[dev-dependencies.hyper]
          69  +
version = "0.14.26"
          70  +
features = ["stream"]
          71  +
          72  +
[dev-dependencies.tokio]
          73  +
version = "1.23.1"
          74  +
features = ["macros", "rt", "rt-multi-thread"]

tmp-codegen-diff/aws-sdk/sdk/aws-smithy-legacy-http/LICENSE

@@ -0,1 +0,175 @@
           1  +
           2  +
                                 Apache License
           3  +
                           Version 2.0, January 2004
           4  +
                        http://www.apache.org/licenses/
           5  +
           6  +
   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
           7  +
           8  +
   1. Definitions.
           9  +
          10  +
      "License" shall mean the terms and conditions for use, reproduction,
          11  +
      and distribution as defined by Sections 1 through 9 of this document.
          12  +
          13  +
      "Licensor" shall mean the copyright owner or entity authorized by
          14  +
      the copyright owner that is granting the License.
          15  +
          16  +
      "Legal Entity" shall mean the union of the acting entity and all
          17  +
      other entities that control, are controlled by, or are under common
          18  +
      control with that entity. For the purposes of this definition,
          19  +
      "control" means (i) the power, direct or indirect, to cause the
          20  +
      direction or management of such entity, whether by contract or
          21  +
      otherwise, or (ii) ownership of fifty percent (50%) or more of the
          22  +
      outstanding shares, or (iii) beneficial ownership of such entity.
          23  +
          24  +
      "You" (or "Your") shall mean an individual or Legal Entity
          25  +
      exercising permissions granted by this License.
          26  +
          27  +
      "Source" form shall mean the preferred form for making modifications,
          28  +
      including but not limited to software source code, documentation
          29  +
      source, and configuration files.
          30  +
          31  +
      "Object" form shall mean any form resulting from mechanical
          32  +
      transformation or translation of a Source form, including but
          33  +
      not limited to compiled object code, generated documentation,
          34  +
      and conversions to other media types.
          35  +
          36  +
      "Work" shall mean the work of authorship, whether in Source or
          37  +
      Object form, made available under the License, as indicated by a
          38  +
      copyright notice that is included in or attached to the work
          39  +
      (an example is provided in the Appendix below).
          40  +
          41  +
      "Derivative Works" shall mean any work, whether in Source or Object
          42  +
      form, that is based on (or derived from) the Work and for which the
          43  +
      editorial revisions, annotations, elaborations, or other modifications
          44  +
      represent, as a whole, an original work of authorship. For the purposes
          45  +
      of this License, Derivative Works shall not include works that remain
          46  +
      separable from, or merely link (or bind by name) to the interfaces of,
          47  +
      the Work and Derivative Works thereof.
          48  +
          49  +
      "Contribution" shall mean any work of authorship, including
          50  +
      the original version of the Work and any modifications or additions
          51  +
      to that Work or Derivative Works thereof, that is intentionally
          52  +
      submitted to Licensor for inclusion in the Work by the copyright owner
          53  +
      or by an individual or Legal Entity authorized to submit on behalf of
          54  +
      the copyright owner. For the purposes of this definition, "submitted"
          55  +
      means any form of electronic, verbal, or written communication sent
          56  +
      to the Licensor or its representatives, including but not limited to
          57  +
      communication on electronic mailing lists, source code control systems,
          58  +
      and issue tracking systems that are managed by, or on behalf of, the
          59  +
      Licensor for the purpose of discussing and improving the Work, but
          60  +
      excluding communication that is conspicuously marked or otherwise
          61  +
      designated in writing by the copyright owner as "Not a Contribution."
          62  +
          63  +
      "Contributor" shall mean Licensor and any individual or Legal Entity
          64  +
      on behalf of whom a Contribution has been received by Licensor and
          65  +
      subsequently incorporated within the Work.
          66  +
          67  +
   2. Grant of Copyright License. Subject to the terms and conditions of
          68  +
      this License, each Contributor hereby grants to You a perpetual,
          69  +
      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
          70  +
      copyright license to reproduce, prepare Derivative Works of,
          71  +
      publicly display, publicly perform, sublicense, and distribute the
          72  +
      Work and such Derivative Works in Source or Object form.
          73  +
          74  +
   3. Grant of Patent License. Subject to the terms and conditions of
          75  +
      this License, each Contributor hereby grants to You a perpetual,
          76  +
      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
          77  +
      (except as stated in this section) patent license to make, have made,
          78  +
      use, offer to sell, sell, import, and otherwise transfer the Work,
          79  +
      where such license applies only to those patent claims licensable
          80  +
      by such Contributor that are necessarily infringed by their
          81  +
      Contribution(s) alone or by combination of their Contribution(s)
          82  +
      with the Work to which such Contribution(s) was submitted. If You
          83  +
      institute patent litigation against any entity (including a
          84  +
      cross-claim or counterclaim in a lawsuit) alleging that the Work
          85  +
      or a Contribution incorporated within the Work constitutes direct
          86  +
      or contributory patent infringement, then any patent licenses
          87  +
      granted to You under this License for that Work shall terminate
          88  +
      as of the date such litigation is filed.
          89  +
          90  +
   4. Redistribution. You may reproduce and distribute copies of the
          91  +
      Work or Derivative Works thereof in any medium, with or without
          92  +
      modifications, and in Source or Object form, provided that You
          93  +
      meet the following conditions:
          94  +
          95  +
      (a) You must give any other recipients of the Work or
          96  +
          Derivative Works a copy of this License; and
          97  +
          98  +
      (b) You must cause any modified files to carry prominent notices
          99  +
          stating that You changed the files; and
         100  +
         101  +
      (c) You must retain, in the Source form of any Derivative Works
         102  +
          that You distribute, all copyright, patent, trademark, and
         103  +
          attribution notices from the Source form of the Work,
         104  +
          excluding those notices that do not pertain to any part of
         105  +
          the Derivative Works; and
         106  +
         107  +
      (d) If the Work includes a "NOTICE" text file as part of its
         108  +
          distribution, then any Derivative Works that You distribute must
         109  +
          include a readable copy of the attribution notices contained
         110  +
          within such NOTICE file, excluding those notices that do not
         111  +
          pertain to any part of the Derivative Works, in at least one
         112  +
          of the following places: within a NOTICE text file distributed
         113  +
          as part of the Derivative Works; within the Source form or
         114  +
          documentation, if provided along with the Derivative Works; or,
         115  +
          within a display generated by the Derivative Works, if and
         116  +
          wherever such third-party notices normally appear. The contents
         117  +
          of the NOTICE file are for informational purposes only and
         118  +
          do not modify the License. You may add Your own attribution
         119  +
          notices within Derivative Works that You distribute, alongside
         120  +
          or as an addendum to the NOTICE text from the Work, provided
         121  +
          that such additional attribution notices cannot be construed
         122  +
          as modifying the License.
         123  +
         124  +
      You may add Your own copyright statement to Your modifications and
         125  +
      may provide additional or different license terms and conditions
         126  +
      for use, reproduction, or distribution of Your modifications, or
         127  +
      for any such Derivative Works as a whole, provided Your use,
         128  +
      reproduction, and distribution of the Work otherwise complies with
         129  +
      the conditions stated in this License.
         130  +
         131  +
   5. Submission of Contributions. Unless You explicitly state otherwise,
         132  +
      any Contribution intentionally submitted for inclusion in the Work
         133  +
      by You to the Licensor shall be under the terms and conditions of
         134  +
      this License, without any additional terms or conditions.
         135  +
      Notwithstanding the above, nothing herein shall supersede or modify
         136  +
      the terms of any separate license agreement you may have executed
         137  +
      with Licensor regarding such Contributions.
         138  +
         139  +
   6. Trademarks. This License does not grant permission to use the trade
         140  +
      names, trademarks, service marks, or product names of the Licensor,
         141  +
      except as required for reasonable and customary use in describing the
         142  +
      origin of the Work and reproducing the content of the NOTICE file.
         143  +
         144  +
   7. Disclaimer of Warranty. Unless required by applicable law or
         145  +
      agreed to in writing, Licensor provides the Work (and each
         146  +
      Contributor provides its Contributions) on an "AS IS" BASIS,
         147  +
      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
         148  +
      implied, including, without limitation, any warranties or conditions
         149  +
      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
         150  +
      PARTICULAR PURPOSE. You are solely responsible for determining the
         151  +
      appropriateness of using or redistributing the Work and assume any
         152  +
      risks associated with Your exercise of permissions under this License.
         153  +
         154  +
   8. Limitation of Liability. In no event and under no legal theory,
         155  +
      whether in tort (including negligence), contract, or otherwise,
         156  +
      unless required by applicable law (such as deliberate and grossly
         157  +
      negligent acts) or agreed to in writing, shall any Contributor be
         158  +
      liable to You for damages, including any direct, indirect, special,
         159  +
      incidental, or consequential damages of any character arising as a
         160  +
      result of this License or out of the use or inability to use the
         161  +
      Work (including but not limited to damages for loss of goodwill,
         162  +
      work stoppage, computer failure or malfunction, or any and all
         163  +
      other commercial damages or losses), even if such Contributor
         164  +
      has been advised of the possibility of such damages.
         165  +
         166  +
   9. Accepting Warranty or Additional Liability. While redistributing
         167  +
      the Work or Derivative Works thereof, You may choose to offer,
         168  +
      and charge a fee for, acceptance of support, warranty, indemnity,
         169  +
      or other liability obligations and/or rights consistent with this
         170  +
      License. However, in accepting such obligations, You may act only
         171  +
      on Your own behalf and on Your sole responsibility, not on behalf
         172  +
      of any other Contributor, and only if You agree to indemnify,
         173  +
      defend, and hold each Contributor harmless for any liability
         174  +
      incurred by, or claims asserted against, such Contributor by reason
         175  +
      of your accepting any such warranty or additional liability.

tmp-codegen-diff/aws-sdk/sdk/aws-smithy-legacy-http/README.md

@@ -0,1 +0,18 @@
           1  +
# aws-smithy-legacy-http
           2  +
           3  +
**This is a legacy crate that provides support for `http@0.x` and `hyper@0.x`.**
           4  +
           5  +
Core HTTP primitives for service clients generated by [smithy-rs](https://github.com/smithy-lang/smithy-rs) including:
           6  +
- HTTP Body implementation
           7  +
- Endpoint support
           8  +
- HTTP header deserialization
           9  +
- Event streams
          10  +
- `ByteStream`: _(supported on crate feature `rt-tokio` only)_ a misuse-resistant abstraction for streaming binary data
          11  +
          12  +
## Usage
          13  +
          14  +
This crate is used when generating server SDKs without the `http-1x` codegen flag. For new projects, prefer using `aws-smithy-http` which supports `http@1.x` and `hyper@1.x`.
          15  +
          16  +
<!-- anchor_start:footer -->
          17  +
This crate is part of the [AWS SDK for Rust](https://awslabs.github.io/aws-sdk-rust/) and the [smithy-rs](https://github.com/smithy-lang/smithy-rs) code generator. In most cases, it should not be used directly.
          18  +
<!-- anchor_end:footer -->

tmp-codegen-diff/aws-sdk/sdk/aws-smithy-legacy-http/additional-ci

@@ -0,1 +0,12 @@
           1  +
#!/bin/bash
           2  +
#
           3  +
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
           4  +
# SPDX-License-Identifier: Apache-2.0
           5  +
#
           6  +
           7  +
# This script contains additional CI checks to run for this specific package
           8  +
           9  +
set -e
          10  +
          11  +
echo "### Testing every combination of features (excluding --all-features)"
          12  +
cargo hack test --feature-powerset --exclude-all-features

tmp-codegen-diff/aws-sdk/sdk/aws-smithy-legacy-http/external-types.toml

@@ -0,1 +0,21 @@
           1  +
allowed_external_types = [
           2  +
    "aws_smithy_runtime_api::*",
           3  +
    "aws_smithy_types::*",
           4  +
    "bytes::bytes::Bytes",
           5  +
    "http::error::Error",
           6  +
    "http::header::map::HeaderMap",
           7  +
    "http::header::map::ValueIter",
           8  +
    "http::header::name::HeaderName",
           9  +
    "http::header::value::HeaderValue",
          10  +
    "http::request::Builder",
          11  +
    "http::request::Request",
          12  +
    "http::response::Builder",
          13  +
    "http::response::Response",
          14  +
    "http::uri::Uri",
          15  +
          16  +
    # TODO(https://github.com/smithy-lang/smithy-rs/issues/1193): Once tooling permits it, only allow the following types in the `event-stream` feature
          17  +
    "futures_core::stream::Stream",
          18  +
          19  +
    # TODO(https://github.com/smithy-lang/smithy-rs/issues/1193): Once tooling permits it, only allow the following types in the `event-stream` feature
          20  +
    "aws_smithy_eventstream::*",
          21  +
]

tmp-codegen-diff/aws-sdk/sdk/aws-smithy-legacy-http/fuzz/Cargo.toml

@@ -0,1 +0,27 @@
           1  +
# Code generated by software.amazon.smithy.rust.codegen.smithy-rs. DO NOT EDIT.
           2  +
[[bin]]
           3  +
name = "read_many_from_str"
           4  +
path = "fuzz_targets/read_many_from_str.rs"
           5  +
test = false
           6  +
doc = false
           7  +
           8  +
[package]
           9  +
name = "aws-smithy-http-fuzz"
          10  +
version = "0.0.0"
          11  +
authors = ["Automatically generated"]
          12  +
publish = false
          13  +
edition = "2021"
          14  +
          15  +
[package.metadata]
          16  +
cargo-fuzz = true
          17  +
          18  +
[dependencies]
          19  +
libfuzzer-sys = "=0.4.7"
          20  +
http = "0.2.3"
          21  +
          22  +
[dependencies.aws-smithy-http]
          23  +
path = ".."
          24  +
version = "0.62.5"
          25  +
          26  +
[workspace]
          27  +
members = ["."]

tmp-codegen-diff/aws-sdk/sdk/aws-smithy-legacy-http/fuzz/fuzz_targets/read_many_from_str.rs

@@ -0,1 +0,19 @@
           1  +
/*
           2  +
 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
           3  +
 * SPDX-License-Identifier: Apache-2.0
           4  +
 */
           5  +
           6  +
#![no_main]
           7  +
use libfuzzer_sys::fuzz_target;
           8  +
           9  +
use aws_smithy_http::header::read_many_from_str;
          10  +
use http;
          11  +
          12  +
fuzz_target!(|data: &[u8]| {
          13  +
    if let Ok(s) = std::str::from_utf8(data) {
          14  +
        if let Ok(req) = http::Request::builder().header("test", s).body(()) {
          15  +
            // Shouldn't panic
          16  +
            let _ = read_many_from_str::<String>(req.headers().get_all("test").iter());
          17  +
        }
          18  +
    }
          19  +
});

tmp-codegen-diff/aws-sdk/sdk/aws-smithy-legacy-http/proptest-regressions/label.txt

@@ -0,1 +0,10 @@
           1  +
# Seeds for failure cases proptest has generated in the past. It is
           2  +
# automatically read and these particular cases re-run before any
           3  +
# novel cases are generated.
           4  +
#
           5  +
# It is recommended to check this file in to source control so that
           6  +
# everyone who runs the test benefits from these saved cases.
           7  +
cc dfac816a3fc3fff8523f1a1707da0065b72fc3c0d70fce001627a8e2e7ee5e0e # shrinks to s = ">"
           8  +
cc 22bce3cd581f5f5a55e6ba18b1fb027481a496f6b35fee6dc4ef84659b99ddca # shrinks to s = "`"
           9  +
cc be619cccfee48e3bf642cf0f82e98e00dceccbe10963fbaf3a622a68a55a3227 # shrinks to s = "?\""
          10  +
cc 3e0b2e6f64642d7c58e5d2fe9223f75238a874bd8c3812dcb3ecc721d9aa0243 # shrinks to s = " "

tmp-codegen-diff/aws-sdk/sdk/aws-smithy-legacy-http/proptest-regressions/query.txt

@@ -0,1 +0,9 @@
           1  +
# Seeds for failure cases proptest has generated in the past. It is
           2  +
# automatically read and these particular cases re-run before any
           3  +
# novel cases are generated.
           4  +
#
           5  +
# It is recommended to check this file in to source control so that
           6  +
# everyone who runs the test benefits from these saved cases.
           7  +
cc b8ff8401495a7e4b4604f4438d8fc6b0ba63a58ddf58273ddcb3bb511e5cf91a # shrinks to s = "<"
           8  +
cc 59ee40f6a097f80254a91d0ee7d6cde97a353f7ccdf83eddd1d437781019431f # shrinks to s = "\""
           9  +
cc 65e6e5f9082c6cbebf599af889721d30d8ee2388f2f7be372520aa86526c8379 # shrinks to s = ">"

tmp-codegen-diff/aws-sdk/sdk/aws-smithy-legacy-http/src/endpoint.rs

@@ -0,1 +0,71 @@
           1  +
/*
           2  +
 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
           3  +
 * SPDX-License-Identifier: Apache-2.0
           4  +
 */
           5  +
           6  +
//! Code for resolving an endpoint (URI) that a request should be sent to
           7  +
           8  +
use aws_smithy_runtime_api::client::endpoint::{error::InvalidEndpointError, EndpointPrefix};
           9  +
use std::borrow::Cow;
          10  +
use std::result::Result as StdResult;
          11  +
use std::str::FromStr;
          12  +
          13  +
pub mod error;
          14  +
pub use error::ResolveEndpointError;
          15  +
          16  +
/// An endpoint-resolution-specific Result. Contains either an [`Endpoint`](aws_smithy_types::endpoint::Endpoint) or a [`ResolveEndpointError`].
          17  +
#[deprecated(since = "0.60.1", note = "Was never used.")]
          18  +
pub type Result = std::result::Result<aws_smithy_types::endpoint::Endpoint, ResolveEndpointError>;
          19  +
          20  +
/// Apply `endpoint` to `uri`
          21  +
///
          22  +
/// This method mutates `uri` by setting the `endpoint` on it
          23  +
pub fn apply_endpoint(
          24  +
    uri: &mut http_1x::Uri,
          25  +
    endpoint: &http_1x::Uri,
          26  +
    prefix: Option<&EndpointPrefix>,
          27  +
) -> StdResult<(), InvalidEndpointError> {
          28  +
    let prefix = prefix.map(EndpointPrefix::as_str).unwrap_or("");
          29  +
    let authority = endpoint
          30  +
        .authority()
          31  +
        .as_ref()
          32  +
        .map(|auth| auth.as_str())
          33  +
        .unwrap_or("");
          34  +
    let authority = if !prefix.is_empty() {
          35  +
        Cow::Owned(format!("{prefix}{authority}"))
          36  +
    } else {
          37  +
        Cow::Borrowed(authority)
          38  +
    };
          39  +
    let authority = http_1x::uri::Authority::from_str(&authority).map_err(|err| {
          40  +
        InvalidEndpointError::failed_to_construct_authority(authority.into_owned(), err)
          41  +
    })?;
          42  +
    let scheme = *endpoint
          43  +
        .scheme()
          44  +
        .as_ref()
          45  +
        .ok_or_else(InvalidEndpointError::endpoint_must_have_scheme)?;
          46  +
    let new_uri = http_1x::Uri::builder()
          47  +
        .authority(authority)
          48  +
        .scheme(scheme.clone())
          49  +
        .path_and_query(merge_paths(endpoint, uri).as_ref())
          50  +
        .build()
          51  +
        .map_err(InvalidEndpointError::failed_to_construct_uri)?;
          52  +
    *uri = new_uri;
          53  +
    Ok(())
          54  +
}
          55  +
          56  +
fn merge_paths<'a>(endpoint: &'a http_1x::Uri, uri: &'a http_1x::Uri) -> Cow<'a, str> {
          57  +
    if let Some(query) = endpoint.path_and_query().and_then(|pq| pq.query()) {
          58  +
        tracing::warn!(query = %query, "query specified in endpoint will be ignored during endpoint resolution");
          59  +
    }
          60  +
    let endpoint_path = endpoint.path();
          61  +
    let uri_path_and_query = uri.path_and_query().map(|pq| pq.as_str()).unwrap_or("");
          62  +
    if endpoint_path.is_empty() {
          63  +
        Cow::Borrowed(uri_path_and_query)
          64  +
    } else {
          65  +
        let ep_no_slash = endpoint_path.strip_suffix('/').unwrap_or(endpoint_path);
          66  +
        let uri_path_no_slash = uri_path_and_query
          67  +
            .strip_prefix('/')
          68  +
            .unwrap_or(uri_path_and_query);
          69  +
        Cow::Owned(format!("{ep_no_slash}/{uri_path_no_slash}"))
          70  +
    }
          71  +
}

tmp-codegen-diff/aws-sdk/sdk/aws-smithy-legacy-http/src/endpoint/error.rs

@@ -0,1 +0,137 @@
           1  +
/*
           2  +
 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
           3  +
 * SPDX-License-Identifier: Apache-2.0
           4  +
 */
           5  +
           6  +
//! Errors related to endpoint resolution and validation
           7  +
           8  +
use std::error::Error;
           9  +
use std::fmt;
          10  +
          11  +
/// Endpoint resolution failed
          12  +
#[derive(Debug)]
          13  +
pub struct ResolveEndpointError {
          14  +
    message: String,
          15  +
    source: Option<Box<dyn Error + Send + Sync>>,
          16  +
}
          17  +
          18  +
impl ResolveEndpointError {
          19  +
    /// Create an [`ResolveEndpointError`] with a message
          20  +
    pub fn message(message: impl Into<String>) -> Self {
          21  +
        Self {
          22  +
            message: message.into(),
          23  +
            source: None,
          24  +
        }
          25  +
    }
          26  +
          27  +
    /// Add a source to the error
          28  +
    pub fn with_source(self, source: Option<Box<dyn Error + Send + Sync>>) -> Self {
          29  +
        Self { source, ..self }
          30  +
    }
          31  +
          32  +
    /// Create a [`ResolveEndpointError`] from a message and a source
          33  +
    pub fn from_source(
          34  +
        message: impl Into<String>,
          35  +
        source: impl Into<Box<dyn Error + Send + Sync>>,
          36  +
    ) -> Self {
          37  +
        Self::message(message).with_source(Some(source.into()))
          38  +
    }
          39  +
}
          40  +
          41  +
impl fmt::Display for ResolveEndpointError {
          42  +
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
          43  +
        write!(f, "{}", self.message)
          44  +
    }
          45  +
}
          46  +
          47  +
impl Error for ResolveEndpointError {
          48  +
    fn source(&self) -> Option<&(dyn Error + 'static)> {
          49  +
        self.source.as_ref().map(|err| err.as_ref() as _)
          50  +
    }
          51  +
}
          52  +
          53  +
#[derive(Debug)]
          54  +
pub(super) enum InvalidEndpointErrorKind {
          55  +
    EndpointMustHaveScheme,
          56  +
    FailedToConstructAuthority {
          57  +
        authority: String,
          58  +
        source: Box<dyn Error + Send + Sync + 'static>,
          59  +
    },
          60  +
    FailedToConstructUri {
          61  +
        source: Box<dyn Error + Send + Sync + 'static>,
          62  +
    },
          63  +
}
          64  +
          65  +
/// An error that occurs when an endpoint is found to be invalid. This usually occurs due to an
          66  +
/// incomplete URI.
          67  +
#[derive(Debug)]
          68  +
pub struct InvalidEndpointError {
          69  +
    pub(super) kind: InvalidEndpointErrorKind,
          70  +
}
          71  +
          72  +
impl InvalidEndpointError {
          73  +
    /// Construct a build error for a missing scheme
          74  +
    pub fn endpoint_must_have_scheme() -> Self {
          75  +
        Self {
          76  +
            kind: InvalidEndpointErrorKind::EndpointMustHaveScheme,
          77  +
        }
          78  +
    }
          79  +
          80  +
    /// Construct a build error for an invalid authority
          81  +
    pub fn failed_to_construct_authority(
          82  +
        authority: impl Into<String>,
          83  +
        source: impl Into<Box<dyn Error + Send + Sync + 'static>>,
          84  +
    ) -> Self {
          85  +
        Self {
          86  +
            kind: InvalidEndpointErrorKind::FailedToConstructAuthority {
          87  +
                authority: authority.into(),
          88  +
                source: source.into(),
          89  +
            },
          90  +
        }
          91  +
    }
          92  +
          93  +
    /// Construct a build error for an invalid URI
          94  +
    pub fn failed_to_construct_uri(
          95  +
        source: impl Into<Box<dyn Error + Send + Sync + 'static>>,
          96  +
    ) -> Self {
          97  +
        Self {
          98  +
            kind: InvalidEndpointErrorKind::FailedToConstructUri {
          99  +
                source: source.into(),
         100  +
            },
         101  +
        }
         102  +
    }
         103  +
}
         104  +
         105  +
impl From<InvalidEndpointErrorKind> for InvalidEndpointError {
         106  +
    fn from(kind: InvalidEndpointErrorKind) -> Self {
         107  +
        Self { kind }
         108  +
    }
         109  +
}
         110  +
         111  +
impl fmt::Display for InvalidEndpointError {
         112  +
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
         113  +
        use InvalidEndpointErrorKind as ErrorKind;
         114  +
        match &self.kind {
         115  +
            ErrorKind::EndpointMustHaveScheme => write!(f, "endpoint must contain a valid scheme"),
         116  +
            ErrorKind::FailedToConstructAuthority { authority, source: _ } => write!(
         117  +
                f,
         118  +
                "endpoint must contain a valid authority when combined with endpoint prefix: {authority}"
         119  +
            ),
         120  +
            ErrorKind::FailedToConstructUri { .. } => write!(f, "failed to construct URI"),
         121  +
        }
         122  +
    }
         123  +
}
         124  +
         125  +
impl Error for InvalidEndpointError {
         126  +
    fn source(&self) -> Option<&(dyn Error + 'static)> {
         127  +
        use InvalidEndpointErrorKind as ErrorKind;
         128  +
        match &self.kind {
         129  +
            ErrorKind::FailedToConstructUri { source } => Some(source.as_ref()),
         130  +
            ErrorKind::FailedToConstructAuthority {
         131  +
                authority: _,
         132  +
                source,
         133  +
            } => Some(source.as_ref()),
         134  +
            ErrorKind::EndpointMustHaveScheme => None,
         135  +
        }
         136  +
    }
         137  +
}

tmp-codegen-diff/aws-sdk/sdk/aws-smithy-legacy-http/src/event_stream.rs

@@ -0,1 +0,20 @@
           1  +
/*
           2  +
 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
           3  +
 * SPDX-License-Identifier: Apache-2.0
           4  +
 */
           5  +
           6  +
//! Provides Sender/Receiver implementations for Event Stream codegen.
           7  +
           8  +
use std::error::Error as StdError;
           9  +
          10  +
mod receiver;
          11  +
mod sender;
          12  +
          13  +
/// A generic, boxed error that's `Send`, `Sync`, and `'static`.
          14  +
pub type BoxError = Box<dyn StdError + Send + Sync + 'static>;
          15  +
          16  +
#[doc(inline)]
          17  +
pub use sender::{EventStreamSender, MessageStreamAdapter, MessageStreamError};
          18  +
          19  +
#[doc(inline)]
          20  +
pub use receiver::{InitialMessageType, Receiver, ReceiverError};

tmp-codegen-diff/aws-sdk/sdk/aws-smithy-legacy-http/src/event_stream/receiver.rs

@@ -0,1 +0,569 @@
           1  +
/*
           2  +
 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
           3  +
 * SPDX-License-Identifier: Apache-2.0
           4  +
 */
           5  +
           6  +
use aws_smithy_eventstream::frame::{
           7  +
    DecodedFrame, MessageFrameDecoder, UnmarshallMessage, UnmarshalledMessage,
           8  +
};
           9  +
use aws_smithy_runtime_api::client::result::{ConnectorError, SdkError};
          10  +
use aws_smithy_types::body::SdkBody;
          11  +
use aws_smithy_types::event_stream::{Message, RawMessage};
          12  +
use bytes::Buf;
          13  +
use bytes::Bytes;
          14  +
use bytes_utils::SegmentedBuf;
          15  +
use std::error::Error as StdError;
          16  +
use std::fmt;
          17  +
use std::marker::PhantomData;
          18  +
use std::mem;
          19  +
use tracing::trace;
          20  +
          21  +
/// Wrapper around SegmentedBuf that tracks the state of the stream.
          22  +
#[derive(Debug)]
          23  +
enum RecvBuf {
          24  +
    /// Nothing has been buffered yet.
          25  +
    Empty,
          26  +
    /// Some data has been buffered.
          27  +
    /// The SegmentedBuf will automatically purge when it reads off the end of a chunk boundary.
          28  +
    Partial(SegmentedBuf<Bytes>),
          29  +
    /// The end of the stream has been reached, but there may still be some buffered data.
          30  +
    EosPartial(SegmentedBuf<Bytes>),
          31  +
    /// An exception terminated this stream.
          32  +
    Terminated,
          33  +
}
          34  +
          35  +
impl RecvBuf {
          36  +
    /// Returns true if there's more buffered data.
          37  +
    fn has_data(&self) -> bool {
          38  +
        match self {
          39  +
            RecvBuf::Empty | RecvBuf::Terminated => false,
          40  +
            RecvBuf::Partial(segments) | RecvBuf::EosPartial(segments) => segments.remaining() > 0,
          41  +
        }
          42  +
    }
          43  +
          44  +
    /// Returns true if the stream has ended.
          45  +
    fn is_eos(&self) -> bool {
          46  +
        matches!(self, RecvBuf::EosPartial(_) | RecvBuf::Terminated)
          47  +
    }
          48  +
          49  +
    /// Returns a mutable reference to the underlying buffered data.
          50  +
    fn buffered(&mut self) -> &mut SegmentedBuf<Bytes> {
          51  +
        match self {
          52  +
            RecvBuf::Empty => panic!("buffer must be populated before reading; this is a bug"),
          53  +
            RecvBuf::Partial(segmented) => segmented,
          54  +
            RecvBuf::EosPartial(segmented) => segmented,
          55  +
            RecvBuf::Terminated => panic!("buffer has been terminated; this is a bug"),
          56  +
        }
          57  +
    }
          58  +
          59  +
    /// Returns a new `RecvBuf` with additional data buffered. This will only allocate
          60  +
    /// if the `RecvBuf` was previously empty.
          61  +
    fn with_partial(self, partial: Bytes) -> Self {
          62  +
        match self {
          63  +
            RecvBuf::Empty => {
          64  +
                let mut segmented = SegmentedBuf::new();
          65  +
                segmented.push(partial);
          66  +
                RecvBuf::Partial(segmented)
          67  +
            }
          68  +
            RecvBuf::Partial(mut segmented) => {
          69  +
                segmented.push(partial);
          70  +
                RecvBuf::Partial(segmented)
          71  +
            }
          72  +
            RecvBuf::EosPartial(_) | RecvBuf::Terminated => {
          73  +
                panic!("cannot buffer more data after the stream has ended or been terminated; this is a bug")
          74  +
            }
          75  +
        }
          76  +
    }
          77  +
          78  +
    /// Returns a `RecvBuf` that has reached end of stream.
          79  +
    fn ended(self) -> Self {
          80  +
        match self {
          81  +
            RecvBuf::Empty => RecvBuf::EosPartial(SegmentedBuf::new()),
          82  +
            RecvBuf::Partial(segmented) => RecvBuf::EosPartial(segmented),
          83  +
            RecvBuf::EosPartial(_) => panic!("already end of stream; this is a bug"),
          84  +
            RecvBuf::Terminated => panic!("stream terminated; this is a bug"),
          85  +
        }
          86  +
    }
          87  +
}
          88  +
          89  +
#[derive(Debug)]
          90  +
enum ReceiverErrorKind {
          91  +
    /// The stream ended before a complete message frame was received.
          92  +
    UnexpectedEndOfStream,
          93  +
}
          94  +
          95  +
/// An error that occurs within an event stream receiver.
          96  +
#[derive(Debug)]
          97  +
pub struct ReceiverError {
          98  +
    kind: ReceiverErrorKind,
          99  +
}
         100  +
         101  +
impl fmt::Display for ReceiverError {
         102  +
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
         103  +
        match self.kind {
         104  +
            ReceiverErrorKind::UnexpectedEndOfStream => write!(f, "unexpected end of stream"),
         105  +
        }
         106  +
    }
         107  +
}
         108  +
         109  +
impl StdError for ReceiverError {}
         110  +
         111  +
/// Receives Smithy-modeled messages out of an Event Stream.
         112  +
#[derive(Debug)]
         113  +
pub struct Receiver<T, E> {
         114  +
    unmarshaller: Box<dyn UnmarshallMessage<Output = T, Error = E> + Send + Sync>,
         115  +
    decoder: MessageFrameDecoder,
         116  +
    buffer: RecvBuf,
         117  +
    body: SdkBody,
         118  +
    /// Event Stream has optional initial response frames an with `:message-type` of
         119  +
    /// `initial-response`. If `try_recv_initial()` is called and the next message isn't an
         120  +
    /// initial response, then the message will be stored in `buffered_message` so that it can
         121  +
    /// be returned with the next call of `recv()`.
         122  +
    buffered_message: Option<Message>,
         123  +
    _phantom: PhantomData<E>,
         124  +
}
         125  +
         126  +
// Used by `Receiver::try_recv_initial`, hence this enum is also doc hidden
         127  +
#[doc(hidden)]
         128  +
#[non_exhaustive]
         129  +
pub enum InitialMessageType {
         130  +
    Request,
         131  +
    Response,
         132  +
}
         133  +
         134  +
impl InitialMessageType {
         135  +
    fn as_str(&self) -> &'static str {
         136  +
        match self {
         137  +
            InitialMessageType::Request => "initial-request",
         138  +
            InitialMessageType::Response => "initial-response",
         139  +
        }
         140  +
    }
         141  +
}
         142  +
         143  +
impl<T, E> Receiver<T, E> {
         144  +
    /// Creates a new `Receiver` with the given message unmarshaller and SDK body.
         145  +
    pub fn new(
         146  +
        unmarshaller: impl UnmarshallMessage<Output = T, Error = E> + Send + Sync + 'static,
         147  +
        body: SdkBody,
         148  +
    ) -> Self {
         149  +
        Receiver {
         150  +
            unmarshaller: Box::new(unmarshaller),
         151  +
            decoder: MessageFrameDecoder::new(),
         152  +
            buffer: RecvBuf::Empty,
         153  +
            body,
         154  +
            buffered_message: None,
         155  +
            _phantom: Default::default(),
         156  +
        }
         157  +
    }
         158  +
         159  +
    fn unmarshall(&self, message: Message) -> Result<Option<T>, SdkError<E, RawMessage>> {
         160  +
        match self.unmarshaller.unmarshall(&message) {
         161  +
            Ok(unmarshalled) => match unmarshalled {
         162  +
                UnmarshalledMessage::Event(event) => Ok(Some(event)),
         163  +
                UnmarshalledMessage::Error(err) => {
         164  +
                    Err(SdkError::service_error(err, RawMessage::Decoded(message)))
         165  +
                }
         166  +
            },
         167  +
            Err(err) => Err(SdkError::response_error(err, RawMessage::Decoded(message))),
         168  +
        }
         169  +
    }
         170  +
         171  +
    async fn buffer_next_chunk(&mut self) -> Result<(), SdkError<E, RawMessage>> {
         172  +
        use http_body_04x::Body;
         173  +
         174  +
        if !self.buffer.is_eos() {
         175  +
            let next_chunk = self
         176  +
                .body
         177  +
                .data()
         178  +
                .await
         179  +
                .transpose()
         180  +
                .map_err(|err| SdkError::dispatch_failure(ConnectorError::io(err)))?;
         181  +
            let buffer = mem::replace(&mut self.buffer, RecvBuf::Empty);
         182  +
            if let Some(chunk) = next_chunk {
         183  +
                self.buffer = buffer.with_partial(chunk);
         184  +
            } else {
         185  +
                self.buffer = buffer.ended();
         186  +
            }
         187  +
        }
         188  +
        Ok(())
         189  +
    }
         190  +
         191  +
    async fn next_message(&mut self) -> Result<Option<Message>, SdkError<E, RawMessage>> {
         192  +
        while !self.buffer.is_eos() {
         193  +
            if self.buffer.has_data() {
         194  +
                if let DecodedFrame::Complete(message) = self
         195  +
                    .decoder
         196  +
                    .decode_frame(self.buffer.buffered())
         197  +
                    .map_err(|err| {
         198  +
                        SdkError::response_error(
         199  +
                            err,
         200  +
                            // the buffer has been consumed
         201  +
                            RawMessage::Invalid(None),
         202  +
                        )
         203  +
                    })?
         204  +
                {
         205  +
                    trace!(message = ?message, "received complete event stream message");
         206  +
                    return Ok(Some(message));
         207  +
                }
         208  +
            }
         209  +
         210  +
            self.buffer_next_chunk().await?;
         211  +
        }
         212  +
        if self.buffer.has_data() {
         213  +
            trace!(remaining_data = ?self.buffer, "data left over in the event stream response stream");
         214  +
            let buf = self.buffer.buffered();
         215  +
            return Err(SdkError::response_error(
         216  +
                ReceiverError {
         217  +
                    kind: ReceiverErrorKind::UnexpectedEndOfStream,
         218  +
                },
         219  +
                RawMessage::invalid(Some(buf.copy_to_bytes(buf.remaining()))),
         220  +
            ));
         221  +
        }
         222  +
        Ok(None)
         223  +
    }
         224  +
         225  +
    /// Tries to receive the initial response message that has `:event-type` of a given `message_type`.
         226  +
    /// If a different event type is received, then it is buffered and `Ok(None)` is returned.
         227  +
    #[doc(hidden)]
         228  +
    pub async fn try_recv_initial(
         229  +
        &mut self,
         230  +
        message_type: InitialMessageType,
         231  +
    ) -> Result<Option<Message>, SdkError<E, RawMessage>> {
         232  +
        if let Some(message) = self.next_message().await? {
         233  +
            if let Some(event_type) = message
         234  +
                .headers()
         235  +
                .iter()
         236  +
                .find(|h| h.name().as_str() == ":event-type")
         237  +
            {
         238  +
                if event_type
         239  +
                    .value()
         240  +
                    .as_string()
         241  +
                    .map(|s| s.as_str() == message_type.as_str())
         242  +
                    .unwrap_or(false)
         243  +
                {
         244  +
                    return Ok(Some(message));
         245  +
                }
         246  +
            }
         247  +
            // Buffer the message so that it can be returned by the next call to `recv()`
         248  +
            self.buffered_message = Some(message);
         249  +
        }
         250  +
        Ok(None)
         251  +
    }
         252  +
         253  +
    /// Asynchronously tries to receive a message from the stream. If the stream has ended,
         254  +
    /// it returns an `Ok(None)`. If there is a transport layer error, it will return
         255  +
    /// `Err(SdkError::DispatchFailure)`. Service-modeled errors will be a part of the returned
         256  +
    /// messages.
         257  +
    pub async fn recv(&mut self) -> Result<Option<T>, SdkError<E, RawMessage>> {
         258  +
        if let Some(buffered) = self.buffered_message.take() {
         259  +
            return match self.unmarshall(buffered) {
         260  +
                Ok(message) => Ok(message),
         261  +
                Err(error) => {
         262  +
                    self.buffer = RecvBuf::Terminated;
         263  +
                    Err(error)
         264  +
                }
         265  +
            };
         266  +
        }
         267  +
        if let Some(message) = self.next_message().await? {
         268  +
            match self.unmarshall(message) {
         269  +
                Ok(message) => Ok(message),
         270  +
                Err(error) => {
         271  +
                    self.buffer = RecvBuf::Terminated;
         272  +
                    Err(error)
         273  +
                }
         274  +
            }
         275  +
        } else {
         276  +
            Ok(None)
         277  +
        }
         278  +
    }
         279  +
}
         280  +
         281  +
#[cfg(test)]
         282  +
mod tests {
         283  +
    use super::{InitialMessageType, Receiver, UnmarshallMessage};
         284  +
    use aws_smithy_eventstream::error::Error as EventStreamError;
         285  +
    use aws_smithy_eventstream::frame::{write_message_to, UnmarshalledMessage};
         286  +
    use aws_smithy_runtime_api::client::result::SdkError;
         287  +
    use aws_smithy_types::body::SdkBody;
         288  +
    use aws_smithy_types::event_stream::{Header, HeaderValue, Message};
         289  +
    use bytes::Bytes;
         290  +
    use hyper::body::Body;
         291  +
    use std::error::Error as StdError;
         292  +
    use std::io::{Error as IOError, ErrorKind};
         293  +
         294  +
    fn encode_initial_response() -> Bytes {
         295  +
        let mut buffer = Vec::new();
         296  +
        let message = Message::new(Bytes::new())
         297  +
            .add_header(Header::new(
         298  +
                ":message-type",
         299  +
                HeaderValue::String("event".into()),
         300  +
            ))
         301  +
            .add_header(Header::new(
         302  +
                ":event-type",
         303  +
                HeaderValue::String("initial-response".into()),
         304  +
            ));
         305  +
        write_message_to(&message, &mut buffer).unwrap();
         306  +
        buffer.into()
         307  +
    }
         308  +
         309  +
    fn encode_message(message: &str) -> Bytes {
         310  +
        let mut buffer = Vec::new();
         311  +
        let message = Message::new(Bytes::copy_from_slice(message.as_bytes()));
         312  +
        write_message_to(&message, &mut buffer).unwrap();
         313  +
        buffer.into()
         314  +
    }
         315  +
         316  +
    #[derive(Debug)]
         317  +
    struct FakeError;
         318  +
    impl std::fmt::Display for FakeError {
         319  +
        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
         320  +
            write!(f, "FakeError")
         321  +
        }
         322  +
    }
         323  +
    impl StdError for FakeError {}
         324  +
         325  +
    #[derive(Debug, Eq, PartialEq)]
         326  +
    struct TestMessage(String);
         327  +
         328  +
    #[derive(Debug)]
         329  +
    struct Unmarshaller;
         330  +
    impl UnmarshallMessage for Unmarshaller {
         331  +
        type Output = TestMessage;
         332  +
        type Error = EventStreamError;
         333  +
         334  +
        fn unmarshall(
         335  +
            &self,
         336  +
            message: &Message,
         337  +
        ) -> Result<UnmarshalledMessage<Self::Output, Self::Error>, EventStreamError> {
         338  +
            Ok(UnmarshalledMessage::Event(TestMessage(
         339  +
                std::str::from_utf8(&message.payload()[..]).unwrap().into(),
         340  +
            )))
         341  +
        }
         342  +
    }
         343  +
         344  +
    #[tokio::test]
         345  +
    async fn receive_success() {
         346  +
        let chunks: Vec<Result<_, IOError>> =
         347  +
            vec![Ok(encode_message("one")), Ok(encode_message("two"))];
         348  +
        let chunk_stream = futures_util::stream::iter(chunks);
         349  +
        let body = SdkBody::from_body_0_4(Body::wrap_stream(chunk_stream));
         350  +
        let mut receiver = Receiver::<TestMessage, EventStreamError>::new(Unmarshaller, body);
         351  +
        assert_eq!(
         352  +
            TestMessage("one".into()),
         353  +
            receiver.recv().await.unwrap().unwrap()
         354  +
        );
         355  +
        assert_eq!(
         356  +
            TestMessage("two".into()),
         357  +
            receiver.recv().await.unwrap().unwrap()
         358  +
        );
         359  +
        assert_eq!(None, receiver.recv().await.unwrap());
         360  +
    }
         361  +
         362  +
    #[tokio::test]
         363  +
    async fn receive_last_chunk_empty() {
         364  +
        let chunks: Vec<Result<_, IOError>> = vec![
         365  +
            Ok(encode_message("one")),
         366  +
            Ok(encode_message("two")),
         367  +
            Ok(Bytes::from_static(&[])),
         368  +
        ];
         369  +
        let chunk_stream = futures_util::stream::iter(chunks);
         370  +
        let body = SdkBody::from_body_0_4(Body::wrap_stream(chunk_stream));
         371  +
        let mut receiver = Receiver::<TestMessage, EventStreamError>::new(Unmarshaller, body);
         372  +
        assert_eq!(
         373  +
            TestMessage("one".into()),
         374  +
            receiver.recv().await.unwrap().unwrap()
         375  +
        );
         376  +
        assert_eq!(
         377  +
            TestMessage("two".into()),
         378  +
            receiver.recv().await.unwrap().unwrap()
         379  +
        );
         380  +
        assert_eq!(None, receiver.recv().await.unwrap());
         381  +
    }
         382  +
         383  +
    #[tokio::test]
         384  +
    async fn receive_last_chunk_not_full_message() {
         385  +
        let chunks: Vec<Result<_, IOError>> = vec![
         386  +
            Ok(encode_message("one")),
         387  +
            Ok(encode_message("two")),
         388  +
            Ok(encode_message("three").split_to(10)),
         389  +
        ];
         390  +
        let chunk_stream = futures_util::stream::iter(chunks);
         391  +
        let body = SdkBody::from_body_0_4(Body::wrap_stream(chunk_stream));
         392  +
        let mut receiver = Receiver::<TestMessage, EventStreamError>::new(Unmarshaller, body);
         393  +
        assert_eq!(
         394  +
            TestMessage("one".into()),
         395  +
            receiver.recv().await.unwrap().unwrap()
         396  +
        );
         397  +
        assert_eq!(
         398  +
            TestMessage("two".into()),
         399  +
            receiver.recv().await.unwrap().unwrap()
         400  +
        );
         401  +
        assert!(matches!(
         402  +
            receiver.recv().await,
         403  +
            Err(SdkError::ResponseError { .. }),
         404  +
        ));
         405  +
    }
         406  +
         407  +
    #[tokio::test]
         408  +
    async fn receive_last_chunk_has_multiple_messages() {
         409  +
        let chunks: Vec<Result<_, IOError>> = vec![
         410  +
            Ok(encode_message("one")),
         411  +
            Ok(encode_message("two")),
         412  +
            Ok(Bytes::from(
         413  +
                [encode_message("three"), encode_message("four")].concat(),
         414  +
            )),
         415  +
        ];
         416  +
        let chunk_stream = futures_util::stream::iter(chunks);
         417  +
        let body = SdkBody::from_body_0_4(Body::wrap_stream(chunk_stream));
         418  +
        let mut receiver = Receiver::<TestMessage, EventStreamError>::new(Unmarshaller, body);
         419  +
        assert_eq!(
         420  +
            TestMessage("one".into()),
         421  +
            receiver.recv().await.unwrap().unwrap()
         422  +
        );
         423  +
        assert_eq!(
         424  +
            TestMessage("two".into()),
         425  +
            receiver.recv().await.unwrap().unwrap()
         426  +
        );
         427  +
        assert_eq!(
         428  +
            TestMessage("three".into()),
         429  +
            receiver.recv().await.unwrap().unwrap()
         430  +
        );
         431  +
        assert_eq!(
         432  +
            TestMessage("four".into()),
         433  +
            receiver.recv().await.unwrap().unwrap()
         434  +
        );
         435  +
        assert_eq!(None, receiver.recv().await.unwrap());
         436  +
    }
         437  +
         438  +
    proptest::proptest! {
         439  +
        #[test]
         440  +
        fn receive_multiple_messages_split_unevenly_across_chunks(b1: usize, b2: usize) {
         441  +
            let combined = Bytes::from([
         442  +
                encode_message("one"),
         443  +
                encode_message("two"),
         444  +
                encode_message("three"),
         445  +
                encode_message("four"),
         446  +
                encode_message("five"),
         447  +
                encode_message("six"),
         448  +
                encode_message("seven"),
         449  +
                encode_message("eight"),
         450  +
            ].concat());
         451  +
         452  +
            let midpoint = combined.len() / 2;
         453  +
            let (start, boundary1, boundary2, end) = (
         454  +
                0,
         455  +
                b1 % midpoint,
         456  +
                midpoint + b2 % midpoint,
         457  +
                combined.len()
         458  +
            );
         459  +
            println!("[{}, {}], [{}, {}], [{}, {}]", start, boundary1, boundary1, boundary2, boundary2, end);
         460  +
         461  +
            let rt = tokio::runtime::Runtime::new().unwrap();
         462  +
            rt.block_on(async move {
         463  +
                let chunks: Vec<Result<_, IOError>> = vec![
         464  +
                    Ok(Bytes::copy_from_slice(&combined[start..boundary1])),
         465  +
                    Ok(Bytes::copy_from_slice(&combined[boundary1..boundary2])),
         466  +
                    Ok(Bytes::copy_from_slice(&combined[boundary2..end])),
         467  +
                ];
         468  +
         469  +
                let chunk_stream = futures_util::stream::iter(chunks);
         470  +
                let body = SdkBody::from_body_0_4(Body::wrap_stream(chunk_stream));
         471  +
                let mut receiver = Receiver::<TestMessage, EventStreamError>::new(Unmarshaller, body);
         472  +
                for payload in &["one", "two", "three", "four", "five", "six", "seven", "eight"] {
         473  +
                    assert_eq!(
         474  +
                        TestMessage((*payload).into()),
         475  +
                        receiver.recv().await.unwrap().unwrap()
         476  +
                    );
         477  +
                }
         478  +
                assert_eq!(None, receiver.recv().await.unwrap());
         479  +
            });
         480  +
        }
         481  +
    }
         482  +
         483  +
    #[tokio::test]
         484  +
    async fn receive_network_failure() {
         485  +
        let chunks: Vec<Result<_, IOError>> = vec![
         486  +
            Ok(encode_message("one")),
         487  +
            Err(IOError::new(ErrorKind::ConnectionReset, FakeError)),
         488  +
        ];
         489  +
        let chunk_stream = futures_util::stream::iter(chunks);
         490  +
        let body = SdkBody::from_body_0_4(Body::wrap_stream(chunk_stream));
         491  +
        let mut receiver = Receiver::<TestMessage, EventStreamError>::new(Unmarshaller, body);
         492  +
        assert_eq!(
         493  +
            TestMessage("one".into()),
         494  +
            receiver.recv().await.unwrap().unwrap()
         495  +
        );
         496  +
        assert!(matches!(
         497  +
            receiver.recv().await,
         498  +
            Err(SdkError::DispatchFailure(_))
         499  +
        ));
         500  +
    }
         501  +
         502  +
    #[tokio::test]
         503  +
    async fn receive_message_parse_failure() {
         504  +
        let chunks: Vec<Result<_, IOError>> = vec![
         505  +
            Ok(encode_message("one")),
         506  +
            // A zero length message will be invalid. We need to provide a minimum of 12 bytes
         507  +
            // for the MessageFrameDecoder to actually start parsing it.
         508  +
            Ok(Bytes::from_static(&[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])),
         509  +
        ];
         510  +
        let chunk_stream = futures_util::stream::iter(chunks);
         511  +
        let body = SdkBody::from_body_0_4(Body::wrap_stream(chunk_stream));
         512  +
        let mut receiver = Receiver::<TestMessage, EventStreamError>::new(Unmarshaller, body);
         513  +
        assert_eq!(
         514  +
            TestMessage("one".into()),
         515  +
            receiver.recv().await.unwrap().unwrap()
         516  +
        );
         517  +
        assert!(matches!(
         518  +
            receiver.recv().await,
         519  +
            Err(SdkError::ResponseError { .. })
         520  +
        ));
         521  +
    }
         522  +
         523  +
    #[tokio::test]
         524  +
    async fn receive_initial_response() {
         525  +
        let chunks: Vec<Result<_, IOError>> =
         526  +
            vec![Ok(encode_initial_response()), Ok(encode_message("one"))];
         527  +
        let chunk_stream = futures_util::stream::iter(chunks);
         528  +
        let body = SdkBody::from_body_0_4(Body::wrap_stream(chunk_stream));
         529  +
        let mut receiver = Receiver::<TestMessage, EventStreamError>::new(Unmarshaller, body);
         530  +
        assert!(receiver
         531  +
            .try_recv_initial(InitialMessageType::Response)
         532  +
            .await
         533  +
            .unwrap()
         534  +
            .is_some());
         535  +
        assert_eq!(
         536  +
            TestMessage("one".into()),
         537  +
            receiver.recv().await.unwrap().unwrap()
         538  +
        );
         539  +
    }
         540  +
         541  +
    #[tokio::test]
         542  +
    async fn receive_no_initial_response() {
         543  +
        let chunks: Vec<Result<_, IOError>> =
         544  +
            vec![Ok(encode_message("one")), Ok(encode_message("two"))];
         545  +
        let chunk_stream = futures_util::stream::iter(chunks);
         546  +
        let body = SdkBody::from_body_0_4(Body::wrap_stream(chunk_stream));
         547  +
        let mut receiver = Receiver::<TestMessage, EventStreamError>::new(Unmarshaller, body);
         548  +
        assert!(receiver
         549  +
            .try_recv_initial(InitialMessageType::Response)
         550  +
            .await
         551  +
            .unwrap()
         552  +
            .is_none());
         553  +
        assert_eq!(
         554  +
            TestMessage("one".into()),
         555  +
            receiver.recv().await.unwrap().unwrap()
         556  +
        );
         557  +
        assert_eq!(
         558  +
            TestMessage("two".into()),
         559  +
            receiver.recv().await.unwrap().unwrap()
         560  +
        );
         561  +
    }
         562  +
         563  +
    fn assert_send_and_sync<T: Send + Sync>() {}
         564  +
         565  +
    #[tokio::test]
         566  +
    async fn receiver_is_send_and_sync() {
         567  +
        assert_send_and_sync::<Receiver<(), ()>>();
         568  +
    }
         569  +
}