AWS SDK

AWS SDK

rev. 03e6e47f15dfd569240d570d98975ebba692c405 (ignoring whitespace)

Files changed:

tmp-codegen-diff/aws-sdk/sdk/aws-smithy-runtime/src/client/http/body/minimum_throughput/throughput.rs

@@ -376,376 +435,476 @@
  396    396   
        let half = BIN_COUNT / 2;
  397    397   
        match (transferred > 0, no_polling >= half, pending >= half) {
  398    398   
            (true, _, _) => ThroughputReport::Transferred(throughput),
  399    399   
            (_, true, _) => ThroughputReport::NoPolling,
  400    400   
            (_, _, true) => ThroughputReport::Pending,
  401    401   
            _ => ThroughputReport::Incomplete,
  402    402   
        }
  403    403   
    }
  404    404   
}
  405    405   
         406  +
const ZERO_THROUGHPUT: Throughput = Throughput::new_bytes_per_second(0);
         407  +
// Helper trait for interpreting the throughput report.
         408  +
pub(crate) trait DownloadReport {
         409  +
    fn minimum_throughput_violated(self, minimum_throughput: Throughput) -> (bool, Throughput);
         410  +
}
         411  +
impl DownloadReport for ThroughputReport {
         412  +
    fn minimum_throughput_violated(self, minimum_throughput: Throughput) -> (bool, Throughput) {
         413  +
        let throughput = match self {
         414  +
            ThroughputReport::Complete => return (false, ZERO_THROUGHPUT),
         415  +
            // If the report is incomplete, then we don't have enough data yet to
         416  +
            // decide if minimum throughput was violated.
         417  +
            ThroughputReport::Incomplete => {
         418  +
                tracing::trace!(
         419  +
                    "not enough data to decide if minimum throughput has been violated"
         420  +
                );
         421  +
                return (false, ZERO_THROUGHPUT);
         422  +
            }
         423  +
            // If no polling is taking place, then the user has stalled.
         424  +
            // In this case, we don't want to say minimum throughput was violated.
         425  +
            ThroughputReport::NoPolling => {
         426  +
                tracing::debug!(
         427  +
                    "the user has stalled; this will not become a minimum throughput violation"
         428  +
                );
         429  +
                return (false, ZERO_THROUGHPUT);
         430  +
            }
         431  +
            // If we're stuck in Poll::Pending, then the server has stalled. Alternatively,
         432  +
            // if we're transferring data, but it's too slow, then we also want to say
         433  +
            // that the minimum throughput has been violated.
         434  +
            ThroughputReport::Pending => ZERO_THROUGHPUT,
         435  +
            ThroughputReport::Transferred(tp) => tp,
         436  +
        };
         437  +
        let violated = throughput < minimum_throughput;
         438  +
        if violated {
         439  +
            tracing::debug!(
         440  +
                "current throughput: {throughput} is below minimum: {minimum_throughput}"
         441  +
            );
         442  +
        }
         443  +
        (violated, throughput)
         444  +
    }
         445  +
}
         446  +
  406    447   
#[cfg(test)]
  407    448   
mod test {
  408    449   
    use super::*;
  409    450   
    use std::time::Duration;
  410    451   
  411    452   
    #[test]
  412    453   
    fn test_log_buffer_bin_label_priority() {
  413    454   
        use BinLabel::*;
  414    455   
        assert!(Empty < NoPolling);
  415    456   
        assert!(NoPolling < Pending);

tmp-codegen-diff/aws-sdk/sdk/aws-smithy-runtime/src/client/orchestrator.rs

@@ -498,498 +589,589 @@
  518    518   
        RuntimeComponents, RuntimeComponentsBuilder,
  519    519   
    };
  520    520   
    use aws_smithy_runtime_api::client::runtime_plugin::{RuntimePlugin, RuntimePlugins};
  521    521   
    use aws_smithy_runtime_api::client::ser_de::{
  522    522   
        SharedRequestSerializer, SharedResponseDeserializer,
  523    523   
    };
  524    524   
    use aws_smithy_runtime_api::shared::IntoShared;
  525    525   
    use aws_smithy_types::body::SdkBody;
  526    526   
    use aws_smithy_types::config_bag::{ConfigBag, FrozenLayer, Layer};
  527    527   
    use aws_smithy_types::timeout::TimeoutConfig;
  528         -
    use http_02x::{Response, StatusCode};
         528  +
    use http_1x::{Response, StatusCode};
  529    529   
    use std::borrow::Cow;
  530    530   
    use std::sync::atomic::{AtomicBool, Ordering};
  531    531   
    use std::sync::Arc;
  532    532   
    use tracing_test::traced_test;
  533    533   
  534    534   
    fn new_request_serializer() -> CannedRequestSerializer {
  535    535   
        CannedRequestSerializer::success(HttpRequest::empty())
  536    536   
    }
  537    537   
  538    538   
    fn new_response_deserializer() -> CannedResponseDeserializer {
  539    539   
        CannedResponseDeserializer::new(
  540    540   
            Response::builder()
  541    541   
                .status(StatusCode::OK)
  542    542   
                .body(SdkBody::empty())
  543    543   
                .map_err(|err| OrchestratorError::other(Box::new(err)))
  544    544   
                .map(Output::erase),
  545    545   
        )
  546    546   
    }
  547    547   
  548    548   
    #[derive(Debug, Default)]
  549    549   
    struct OkConnector {}
  550    550   
  551    551   
    impl OkConnector {
  552    552   
        fn new() -> Self {
  553    553   
            Self::default()
  554    554   
        }
  555    555   
    }
  556    556   
  557    557   
    impl HttpConnector for OkConnector {
  558    558   
        fn call(&self, _request: HttpRequest) -> HttpConnectorFuture {
  559         -
            HttpConnectorFuture::ready(Ok(http_02x::Response::builder()
         559  +
            HttpConnectorFuture::ready(Ok(http_1x::Response::builder()
  560    560   
                .status(200)
  561    561   
                .body(SdkBody::empty())
  562    562   
                .expect("OK response is valid")
  563    563   
                .try_into()
  564    564   
                .unwrap()))
  565    565   
        }
  566    566   
    }
  567    567   
  568    568   
    #[derive(Debug)]
  569    569   
    struct TestOperationRuntimePlugin {

tmp-codegen-diff/aws-sdk/sdk/aws-smithy-runtime/src/client/orchestrator/auth.rs

@@ -446,446 +506,506 @@
  466    466   
            fn sign_http_request(
  467    467   
                &self,
  468    468   
                request: &mut HttpRequest,
  469    469   
                _identity: &Identity,
  470    470   
                _auth_scheme_endpoint_config: AuthSchemeEndpointConfig<'_>,
  471    471   
                _runtime_components: &RuntimeComponents,
  472    472   
                _config_bag: &ConfigBag,
  473    473   
            ) -> Result<(), BoxError> {
  474    474   
                request
  475    475   
                    .headers_mut()
  476         -
                    .insert(http_02x::header::AUTHORIZATION, "success!");
         476  +
                    .insert(http_1x::header::AUTHORIZATION, "success!");
  477    477   
                Ok(())
  478    478   
            }
  479    479   
        }
  480    480   
  481    481   
        const TEST_SCHEME_ID: AuthSchemeId = AuthSchemeId::new("test-scheme");
  482    482   
  483    483   
        #[derive(Debug)]
  484    484   
        struct TestAuthScheme {
  485    485   
            signer: TestSigner,
  486    486   
        }
@@ -757,757 +817,817 @@
  777    777   
                _config_bag: &'a ConfigBag,
  778    778   
            ) -> IdentityFuture<'a> {
  779    779   
                IdentityFuture::ready(Ok(Identity::new(Token::new("cached (pass)", None), None)))
  780    780   
            }
  781    781   
        }
  782    782   
  783    783   
        async fn run_test(add_more_to_layer: impl Fn(Layer) -> Layer) {
  784    784   
            let mut ctx = InterceptorContext::new(Input::doesnt_matter());
  785    785   
            ctx.enter_serialization_phase();
  786    786   
            ctx.set_request(
  787         -
                http_02x::Request::builder()
         787  +
                http_1x::Request::builder()
  788    788   
                    .body(SdkBody::empty())
  789    789   
                    .unwrap()
  790    790   
                    .try_into()
  791    791   
                    .unwrap(),
  792    792   
            );
  793    793   
            let _ = ctx.take_input();
  794    794   
            ctx.enter_before_transmit_phase();
  795    795   
  796    796   
            let runtime_components = RuntimeComponentsBuilder::for_tests()
  797    797   
                .with_auth_scheme(SharedAuthScheme::new(ApiKeyAuthScheme::new(

tmp-codegen-diff/aws-sdk/sdk/aws-smithy-runtime/src/client/orchestrator/endpoints.rs

@@ -1,1 +48,48 @@
    6      6   
use aws_smithy_runtime_api::client::endpoint::{
    7      7   
    error::ResolveEndpointError, EndpointFuture, EndpointResolverParams, ResolveEndpoint,
    8      8   
};
    9      9   
use aws_smithy_runtime_api::client::identity::Identity;
   10     10   
use aws_smithy_runtime_api::client::interceptors::context::InterceptorContext;
   11     11   
use aws_smithy_runtime_api::client::orchestrator::HttpRequest;
   12     12   
use aws_smithy_runtime_api::client::runtime_components::RuntimeComponents;
   13     13   
use aws_smithy_runtime_api::{box_error::BoxError, client::endpoint::EndpointPrefix};
   14     14   
use aws_smithy_types::config_bag::ConfigBag;
   15     15   
use aws_smithy_types::endpoint::Endpoint;
   16         -
use http_02x::header::HeaderName;
   17         -
use http_02x::uri::PathAndQuery;
   18         -
use http_02x::{HeaderValue, Uri};
          16  +
use http_1x::header::HeaderName;
          17  +
use http_1x::uri::PathAndQuery;
          18  +
use http_1x::{HeaderValue, Uri};
   19     19   
use std::borrow::Cow;
   20     20   
use std::fmt::Debug;
   21     21   
use std::str::FromStr;
   22     22   
use tracing::trace;
   23     23   
   24     24   
/// An endpoint resolver that uses a static URI.
   25     25   
#[derive(Clone, Debug)]
   26     26   
pub struct StaticUriEndpointResolver {
   27     27   
    endpoint: String,
   28     28   
}

tmp-codegen-diff/aws-sdk/sdk/aws-smithy-runtime/src/client/orchestrator/http.rs

@@ -1,1 +54,49 @@
    1      1   
/*
    2      2   
 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
    3      3   
 * SPDX-License-Identifier: Apache-2.0
    4      4   
 */
    5      5   
    6      6   
use aws_smithy_runtime_api::client::orchestrator::{HttpResponse, SensitiveOutput};
    7      7   
use aws_smithy_types::body::SdkBody;
    8      8   
use aws_smithy_types::config_bag::ConfigBag;
    9         -
use bytes::{Buf, Bytes};
   10         -
use http_body_04x::Body;
           9  +
use bytes::Bytes;
   11     10   
use pin_utils::pin_mut;
   12     11   
use tracing::trace;
   13     12   
   14     13   
const LOG_SENSITIVE_BODIES: &str = "LOG_SENSITIVE_BODIES";
   15     14   
   16         -
async fn body_to_bytes(body: SdkBody) -> Result<Bytes, <SdkBody as Body>::Error> {
   17         -
    let mut output = Vec::new();
          15  +
async fn body_to_bytes(body: SdkBody) -> Result<Bytes, <SdkBody as http_body_1x::Body>::Error> {
          16  +
    use http_body_util::BodyExt;
   18     17   
    pin_mut!(body);
   19         -
    while let Some(buf) = body.data().await {
   20         -
        let mut buf = buf?;
   21         -
        while buf.has_remaining() {
   22         -
            output.extend_from_slice(buf.chunk());
   23         -
            buf.advance(buf.chunk().len())
   24         -
        }
   25         -
    }
          18  +
    let collected = body.collect().await?;
   26     19   
   27         -
    Ok(Bytes::from(output))
          20  +
    Ok(collected.to_bytes())
   28     21   
}
   29     22   
   30         -
pub(crate) async fn read_body(response: &mut HttpResponse) -> Result<(), <SdkBody as Body>::Error> {
          23  +
pub(crate) async fn read_body(
          24  +
    response: &mut HttpResponse,
          25  +
) -> Result<(), <SdkBody as http_body_1x::Body>::Error> {
   31     26   
    let mut body = SdkBody::taken();
   32     27   
    std::mem::swap(&mut body, response.body_mut());
   33     28   
   34     29   
    let bytes = body_to_bytes(body).await?;
   35     30   
    let mut body = SdkBody::from(bytes);
   36     31   
    std::mem::swap(&mut body, response.body_mut());
   37     32   
   38     33   
    Ok(())
   39     34   
}
   40     35   

tmp-codegen-diff/aws-sdk/sdk/aws-smithy-runtime/src/client/retries/classifiers.rs

@@ -214,214 +287,287 @@
  234    234   
        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  235    235   
            write!(f, "UnmodeledError")
  236    236   
        }
  237    237   
    }
  238    238   
  239    239   
    impl std::error::Error for UnmodeledError {}
  240    240   
  241    241   
    #[test]
  242    242   
    fn classify_by_response_status() {
  243    243   
        let policy = HttpStatusCodeClassifier::default();
  244         -
        let res = http_02x::Response::builder()
         244  +
        let res = http_1x::Response::builder()
  245    245   
            .status(500)
  246    246   
            .body("error!")
  247    247   
            .unwrap()
  248    248   
            .map(SdkBody::from);
  249    249   
        let mut ctx = InterceptorContext::new(Input::doesnt_matter());
  250    250   
        ctx.set_response(res.try_into().unwrap());
  251    251   
        assert_eq!(policy.classify_retry(&ctx), RetryAction::transient_error());
  252    252   
    }
  253    253   
  254    254   
    #[test]
  255    255   
    fn classify_by_response_status_not_retryable() {
  256    256   
        let policy = HttpStatusCodeClassifier::default();
  257         -
        let res = http_02x::Response::builder()
         257  +
        let res = http_1x::Response::builder()
  258    258   
            .status(408)
  259    259   
            .body("error!")
  260    260   
            .unwrap()
  261    261   
            .map(SdkBody::from);
  262    262   
        let mut ctx = InterceptorContext::new(Input::doesnt_matter());
  263    263   
        ctx.set_response(res.try_into().unwrap());
  264    264   
        assert_eq!(policy.classify_retry(&ctx), RetryAction::NoActionIndicated);
  265    265   
    }
  266    266   
  267    267   
    #[test]

tmp-codegen-diff/aws-sdk/sdk/aws-smithy-runtime/src/client/stalled_stream_protection.rs

@@ -55,55 +139,139 @@
   75     75   
                let now = time_source.now();
   76     76   
   77     77   
                let options: MinimumThroughputBodyOptions = sspcfg.into();
   78     78   
                let throughput = UploadThroughput::new(options.check_window(), now);
   79     79   
                cfg.interceptor_state().store_put(throughput.clone());
   80     80   
   81     81   
                tracing::trace!("adding stalled stream protection to request body");
   82     82   
                let it = mem::replace(context.request_mut().body_mut(), SdkBody::taken());
   83     83   
                let it = it.map_preserve_contents(move |body| {
   84     84   
                    let time_source = time_source.clone();
   85         -
                    SdkBody::from_body_0_4(ThroughputReadingBody::new(
          85  +
                    SdkBody::from_body_1_x(ThroughputReadingBody::new(
   86     86   
                        time_source,
   87     87   
                        throughput.clone(),
   88     88   
                        body,
   89     89   
                    ))
   90     90   
                });
   91     91   
                let _ = mem::replace(context.request_mut().body_mut(), it);
   92     92   
            }
   93     93   
        }
   94     94   
   95     95   
        Ok(())
   96     96   
    }
   97     97   
   98     98   
    fn modify_before_deserialization(
   99     99   
        &self,
  100    100   
        context: &mut BeforeDeserializationInterceptorContextMut<'_>,
  101    101   
        runtime_components: &RuntimeComponents,
  102    102   
        cfg: &mut ConfigBag,
  103    103   
    ) -> Result<(), BoxError> {
  104    104   
        if let Some(sspcfg) = cfg.load::<StalledStreamProtectionConfig>() {
  105    105   
            if sspcfg.download_enabled() {
  106    106   
                let (async_sleep, time_source) = get_runtime_component_deps(runtime_components)?;
  107    107   
                tracing::trace!("adding stalled stream protection to response body");
  108    108   
                let sspcfg = sspcfg.clone();
  109    109   
                let it = mem::replace(context.response_mut().body_mut(), SdkBody::taken());
  110    110   
                let it = it.map_preserve_contents(move |body| {
  111    111   
                    let sspcfg = sspcfg.clone();
  112    112   
                    let async_sleep = async_sleep.clone();
  113    113   
                    let time_source = time_source.clone();
  114    114   
                    let mtb = MinimumThroughputDownloadBody::new(
  115    115   
                        time_source,
  116    116   
                        async_sleep,
  117    117   
                        body,
  118    118   
                        sspcfg.into(),
  119    119   
                    );
  120         -
                    SdkBody::from_body_0_4(mtb)
         120  +
                    SdkBody::from_body_1_x(mtb)
  121    121   
                });
  122    122   
                let _ = mem::replace(context.response_mut().body_mut(), it);
  123    123   
            }
  124    124   
        }
  125    125   
        Ok(())
  126    126   
    }
  127    127   
}
  128    128   
  129    129   
fn get_runtime_component_deps(
  130    130   
    runtime_components: &RuntimeComponents,

tmp-codegen-diff/aws-sdk/sdk/aws-smithy-runtime/tests/stalled_stream_common.rs

@@ -18,18 +119,111 @@
   38     38   
        ser_de::DeserializeResponse,
   39     39   
        stalled_stream_protection::StalledStreamProtectionConfig,
   40     40   
    },
   41     41   
    http::{Response, StatusCode},
   42     42   
    shared::IntoShared,
   43     43   
};
   44     44   
pub use aws_smithy_types::{
   45     45   
    body::SdkBody, error::display::DisplayErrorContext, timeout::TimeoutConfig,
   46     46   
};
   47     47   
pub use bytes::Bytes;
   48         -
pub use http_body_04x::Body;
   49     48   
pub use pin_utils::pin_mut;
   50     49   
pub use std::{
   51     50   
    collections::VecDeque,
   52     51   
    convert::Infallible,
   53     52   
    future::poll_fn,
   54     53   
    mem,
   55     54   
    pin::Pin,
   56     55   
    sync::{Arc, Mutex},
   57     56   
    task::{Context, Poll},
   58     57   
    time::Duration,
   59     58   
};
   60     59   
pub use tracing::{info, Instrument as _};
   61     60   
   62     61   
/// No really, it's 42 bytes long... super neat
   63     62   
pub const NEAT_DATA: Bytes = Bytes::from_static(b"some really neat data");
   64     63   
   65     64   
/// Ticks time forward by the given duration, and logs the current time for debugging.
   66     65   
#[macro_export]
   67     66   
macro_rules! tick {
   68     67   
    ($ticker:ident, $duration:expr) => {
   69     68   
        $ticker.tick($duration).await;
   70     69   
        let now = $ticker
   71     70   
            .now()
   72     71   
            .duration_since(std::time::SystemTime::UNIX_EPOCH)
   73     72   
            .unwrap();
   74     73   
        tracing::info!("ticked {:?}, now at {:?}", $duration, now);
   75     74   
    };
   76     75   
}
   77     76   
   78     77   
#[derive(Debug)]
   79     78   
pub struct FakeServer(pub SharedHttpConnector);
   80     79   
impl HttpClient for FakeServer {
   81     80   
    fn http_connector(
   82     81   
        &self,
   83     82   
        _settings: &HttpConnectorSettings,
   84     83   
        _components: &RuntimeComponents,
   85     84   
    ) -> SharedHttpConnector {
   86     85   
        self.0.clone()
   87     86   
    }
   88     87   
}
   89     88   
   90     89   
struct ChannelBody {
   91     90   
    receiver: tokio::sync::mpsc::Receiver<Bytes>,
   92     91   
}
   93     92   
   94         -
impl Body for ChannelBody {
          93  +
impl http_body_1x::Body for ChannelBody {
   95     94   
    type Data = Bytes;
   96     95   
    type Error = Infallible;
   97     96   
   98         -
    fn poll_data(
          97  +
    fn poll_frame(
   99     98   
        mut self: Pin<&mut Self>,
  100     99   
        cx: &mut Context<'_>,
  101         -
    ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
         100  +
    ) -> Poll<Option<Result<http_body_1x::Frame<Self::Data>, Self::Error>>> {
  102    101   
        match self.receiver.poll_recv(cx) {
  103         -
            Poll::Ready(value) => Poll::Ready(value.map(Ok)),
         102  +
            Poll::Ready(value) => Poll::Ready(value.map(|b| Ok(http_body_1x::Frame::data(b)))),
  104    103   
            Poll::Pending => Poll::Pending,
  105    104   
        }
  106    105   
    }
  107         -
  108         -
    fn poll_trailers(
  109         -
        self: Pin<&mut Self>,
  110         -
        _cx: &mut Context<'_>,
  111         -
    ) -> Poll<Result<Option<http_02x::HeaderMap>, Self::Error>> {
  112         -
        unreachable!()
  113         -
    }
  114    106   
}
  115    107   
  116    108   
pub fn channel_body() -> (SdkBody, tokio::sync::mpsc::Sender<Bytes>) {
  117    109   
    let (sender, receiver) = tokio::sync::mpsc::channel(1000);
  118         -
    (SdkBody::from_body_0_4(ChannelBody { receiver }), sender)
         110  +
    (SdkBody::from_body_1_x(ChannelBody { receiver }), sender)
  119    111   
}

tmp-codegen-diff/aws-sdk/sdk/aws-smithy-runtime/tests/stalled_stream_download.rs

@@ -231,231 +290,291 @@
  251    251   
  252    252   
    result
  253    253   
        .await
  254    254   
        .expect("no panics")
  255    255   
        .expect("response MUST NOT timeout");
  256    256   
}
  257    257   
  258    258   
use download_test_tools::*;
  259    259   
mod download_test_tools {
  260    260   
    use crate::stalled_stream_common::*;
         261  +
    use http_body_1x::Body;
  261    262   
    use tokio::sync::mpsc::Receiver;
  262    263   
  263    264   
    fn response(body: SdkBody) -> HttpResponse {
  264    265   
        HttpResponse::try_from(
  265    266   
            http_02x::Response::builder()
  266    267   
                .status(200)
  267    268   
                .body(body)
  268    269   
                .unwrap(),
  269    270   
        )
  270    271   
        .unwrap()
@@ -315,316 +388,389 @@
  335    336   
                body: Arc::new(Mutex::new(Some(body))),
  336    337   
            }
  337    338   
            .into_shared(),
  338    339   
            body_sender,
  339    340   
        )
  340    341   
    }
  341    342   
  342    343   
    /// Simulate a client eagerly consuming all the data sent to it from the server.
  343    344   
    pub async fn eagerly_consume(body: SdkBody) -> Result<(), BoxError> {
  344    345   
        pin_mut!(body);
  345         -
        while let Some(result) = poll_fn(|cx| body.as_mut().poll_data(cx)).await {
         346  +
        while let Some(result) = poll_fn(|cx| body.as_mut().poll_frame(cx)).await {
  346    347   
            if let Err(err) = result {
  347    348   
                return Err(err);
  348    349   
            } else {
  349    350   
                info!("consumed bytes from the response body");
  350    351   
            }
  351    352   
        }
  352    353   
        Ok(())
  353    354   
    }
  354    355   
  355    356   
    /// Simulate a client very slowly consuming data with an eager server.
  356    357   
    ///
  357    358   
    /// This implementation will take longer than the grace period to consume
  358    359   
    /// the next piece of data.
  359    360   
    pub async fn slowly_consume(time: TickAdvanceTime, body: SdkBody) -> Result<(), BoxError> {
  360    361   
        pin_mut!(body);
  361         -
        while let Some(result) = poll_fn(|cx| body.as_mut().poll_data(cx)).await {
         362  +
        while let Some(result) = poll_fn(|cx| body.as_mut().poll_frame(cx)).await {
  362    363   
            if let Err(err) = result {
  363    364   
                return Err(err);
  364    365   
            } else {
  365    366   
                info!("consumed bytes from the response body");
  366    367   
                tick!(time, Duration::from_secs(10));
  367    368   
            }
  368    369   
        }
  369    370   
        Ok(())
  370    371   
    }
  371    372   
  372    373   
    /// A client that allows us to control when data is consumed by sending a signal to `rx`.
  373    374   
    pub async fn consume_on_signal(mut rx: Receiver<()>, body: SdkBody) -> Result<(), BoxError> {
  374    375   
        // Wait to start polling until a signal has been received
  375    376   
        let _ = rx.recv().await;
  376    377   
        pin_mut!(body);
  377         -
        while let Some(result) = poll_fn(|cx| body.as_mut().poll_data(cx)).await {
         378  +
        while let Some(result) = poll_fn(|cx| body.as_mut().poll_frame(cx)).await {
  378    379   
            if let Err(err) = result {
  379    380   
                return Err(err);
  380    381   
            } else {
  381    382   
                info!("consumed bytes from the response body");
  382    383   
                // Block until a signal has been received
  383    384   
                let _ = rx.recv().await;
  384    385   
            }
  385    386   
        }
  386    387   
        Ok(())
  387    388   
    }

tmp-codegen-diff/aws-sdk/sdk/aws-smithy-runtime/tests/stalled_stream_upload.rs

@@ -224,224 +286,286 @@
  244    244   
        body_sender.send(NEAT_DATA).await.unwrap();
  245    245   
        drop(body_sender);
  246    246   
        tick!(time, Duration::from_secs(1));
  247    247   
    });
  248    248   
  249    249   
    assert_eq!(200, result.await.unwrap().expect("success").as_u16());
  250    250   
}
  251    251   
  252    252   
use upload_test_tools::*;
  253    253   
mod upload_test_tools {
  254         -
    use aws_smithy_async::rt::sleep::AsyncSleep;
  255         -
  256    254   
    use crate::stalled_stream_common::*;
         255  +
    use aws_smithy_async::rt::sleep::AsyncSleep;
         256  +
    use http_body_1x::Body;
  257    257   
  258    258   
    pub fn successful_response() -> HttpResponse {
  259    259   
        HttpResponse::try_from(
  260    260   
            http_02x::Response::builder()
  261    261   
                .status(200)
  262    262   
                .body(SdkBody::empty())
  263    263   
                .unwrap(),
  264    264   
        )
  265    265   
        .unwrap()
  266    266   
    }
@@ -311,311 +473,473 @@
  331    331   
    /// optional 1 second gap in between polls.
  332    332   
    pub fn eager_server(
  333    333   
        advance_time: bool,
  334    334   
    ) -> (SharedHttpConnector, TickAdvanceTime, TickAdvanceSleep) {
  335    335   
        async fn fake_server(
  336    336   
            mut body: Pin<&mut SdkBody>,
  337    337   
            time: TickAdvanceTime,
  338    338   
            _: TickAdvanceSleep,
  339    339   
            advance_time: bool,
  340    340   
        ) -> HttpResponse {
  341         -
            while poll_fn(|cx| body.as_mut().poll_data(cx)).await.is_some() {
         341  +
            while poll_fn(|cx| body.as_mut().poll_frame(cx)).await.is_some() {
  342    342   
                if advance_time {
  343    343   
                    tick!(time, Duration::from_secs(1));
  344    344   
                }
  345    345   
            }
  346    346   
            successful_response()
  347    347   
        }
  348    348   
        fake_server!(FakeServerConnector, fake_server, bool, advance_time)
  349    349   
    }
  350    350   
  351    351   
    /// Fake server/connector that reads some data, and then stalls for the given time before
  352    352   
    /// returning a response. If `None` is given the server will stall indefinitely.
  353    353   
    pub fn stalling_server(
  354    354   
        respond_after: Option<Duration>,
  355    355   
    ) -> (SharedHttpConnector, TickAdvanceTime, TickAdvanceSleep) {
  356    356   
        async fn fake_server(
  357    357   
            mut body: Pin<&mut SdkBody>,
  358    358   
            _time: TickAdvanceTime,
  359    359   
            sleep: TickAdvanceSleep,
  360    360   
            respond_after: Option<Duration>,
  361    361   
        ) -> HttpResponse {
  362    362   
            let mut times = 5;
  363         -
            while times > 0 && poll_fn(|cx| body.as_mut().poll_data(cx)).await.is_some() {
         363  +
            while times > 0 && poll_fn(|cx| body.as_mut().poll_frame(cx)).await.is_some() {
  364    364   
                times -= 1;
  365    365   
            }
  366    366   
  367    367   
            match respond_after {
  368    368   
                Some(delay) => {
  369    369   
                    tracing::info!("stalling for {} seconds", delay.as_secs());
  370    370   
                    sleep.sleep(delay).await;
  371    371   
                    tracing::info!("returning delayed response");
  372    372   
                    successful_response()
  373    373   
                }
  374    374   
                None => {
  375    375   
                    // never awake after this
  376    376   
                    tracing::info!("stalling indefinitely");
  377    377   
                    std::future::pending::<()>().await;
  378    378   
                    unreachable!()
  379    379   
                }
  380    380   
            }
  381    381   
        }
  382    382   
        fake_server!(
  383    383   
            FakeServerConnector,
  384    384   
            fake_server,
  385    385   
            Option<Duration>,
  386    386   
            respond_after
  387    387   
        )
  388    388   
    }
  389    389   
  390    390   
    /// Fake server/connector that polls data after each period of time in the given
  391    391   
    /// sequence. Once the sequence completes, it will delay 1 second after each poll.
  392    392   
    pub fn time_sequence_server(
  393    393   
        time_sequence: impl IntoIterator<Item = u64>,
  394    394   
    ) -> (SharedHttpConnector, TickAdvanceTime, TickAdvanceSleep) {
  395    395   
        async fn fake_server(
  396    396   
            mut body: Pin<&mut SdkBody>,
  397    397   
            time: TickAdvanceTime,
  398    398   
            _sleep: TickAdvanceSleep,
  399    399   
            time_sequence: Vec<u64>,
  400    400   
        ) -> HttpResponse {
  401    401   
            let mut time_sequence: VecDeque<Duration> =
  402    402   
                time_sequence.into_iter().map(Duration::from_secs).collect();
  403         -
            while poll_fn(|cx| body.as_mut().poll_data(cx)).await.is_some() {
         403  +
            while poll_fn(|cx| body.as_mut().poll_frame(cx)).await.is_some() {
  404    404   
                let next_time = time_sequence.pop_front().unwrap_or(Duration::from_secs(1));
  405    405   
                tick!(time, next_time);
  406    406   
            }
  407    407   
            successful_response()
  408    408   
        }
  409    409   
        fake_server!(
  410    410   
            FakeServerConnector,
  411    411   
            fake_server,
  412    412   
            Vec<u64>,
  413    413   
            time_sequence.into_iter().collect()
  414    414   
        )
  415    415   
    }
  416    416   
  417    417   
    /// Fake server/connector that polls data only up to the content-length. Optionally delays
  418    418   
    /// sending the response by the given duration.
  419    419   
    pub fn limited_read_server(
  420    420   
        content_len: usize,
  421    421   
        respond_after: Option<Duration>,
  422    422   
    ) -> (SharedHttpConnector, TickAdvanceTime, TickAdvanceSleep) {
  423    423   
        async fn fake_server(
  424    424   
            mut body: Pin<&mut SdkBody>,
  425    425   
            _time: TickAdvanceTime,
  426    426   
            sleep: TickAdvanceSleep,
  427    427   
            params: (usize, Option<Duration>),
  428    428   
        ) -> HttpResponse {
  429    429   
            let mut remaining = params.0;
  430    430   
            loop {
  431         -
                match poll_fn(|cx| body.as_mut().poll_data(cx)).await {
         431  +
                match poll_fn(|cx| body.as_mut().poll_frame(cx)).await {
  432    432   
                    Some(res) => {
  433         -
                        let rc = res.unwrap().len();
         433  +
                        let rc = res.unwrap().into_data().expect("data frame").len();
  434    434   
                        remaining -= rc;
  435    435   
                        tracing::info!("read {rc} bytes; remaining: {remaining}");
  436    436   
                        if remaining == 0 {
  437    437   
                            tracing::info!("read reported content-length data, stopping polling");
  438    438   
                            break;
  439    439   
                        };
  440    440   
                    }
  441    441   
                    None => {
  442    442   
                        tracing::info!(
  443         -
                            "read until poll_data() returned None, no data left, stopping polling"
         443  +
                            "read until poll_frame() returned None, no data left, stopping polling"
  444    444   
                        );
  445    445   
                        break;
  446    446   
                    }
  447    447   
                }
  448    448   
            }
  449    449   
  450    450   
            let respond_after = params.1;
  451    451   
            if let Some(delay) = respond_after {
  452    452   
                tracing::info!("stalling for {} seconds", delay.as_secs());
  453    453   
                sleep.sleep(delay).await;

tmp-codegen-diff/aws-sdk/sdk/aws-smithy-types-convert/Cargo.toml

@@ -1,1 +43,44 @@
    1      1   
# Code generated by software.amazon.smithy.rust.codegen.smithy-rs. DO NOT EDIT.
    2      2   
[package]
    3      3   
name = "aws-smithy-types-convert"
    4         -
version = "0.60.11"
           4  +
version = "0.60.12"
    5      5   
authors = ["AWS Rust SDK Team <aws-sdk-rust@amazon.com>"]
    6      6   
description = "Conversion of types from aws-smithy-types to other libraries."
    7      7   
edition = "2021"
    8      8   
license = "Apache-2.0"
    9      9   
repository = "https://github.com/smithy-lang/smithy-rs"
   10     10   
rust-version = "1.88"
   11     11   
[package.metadata.docs.rs]
   12     12   
all-features = true
   13     13   
targets = ["x86_64-unknown-linux-gnu"]
   14     14   
cargo-args = ["-Zunstable-options", "-Zrustdoc-scrape-examples"]
   15     15   
rustdoc-args = ["--cfg", "docsrs"]
   16     16   
   17     17   
[features]
   18     18   
convert-chrono = ["aws-smithy-types", "chrono"]
   19     19   
convert-time = ["aws-smithy-types", "time"]
   20     20   
convert-streams = ["aws-smithy-async", "futures-core"]
   21     21   
[dependencies.aws-smithy-types]
   22     22   
path = "../aws-smithy-types"
          23  +
features = ["http-body-1-x"]
   23     24   
optional = true
   24         -
version = "1.3.6"
          25  +
version = "1.4.0"
   25     26   
   26     27   
[dependencies.aws-smithy-async]
   27     28   
path = "../aws-smithy-async"
   28     29   
optional = true
   29         -
version = "1.2.7"
          30  +
version = "1.2.8"
   30     31   
   31     32   
[dependencies.chrono]
   32     33   
version = "0.4.35"
   33     34   
optional = true
   34     35   
default-features = false
   35     36   
features = ["std"]
   36     37   
   37     38   
[dependencies.time]
   38     39   
version = "0.3.4"
   39     40   
optional = true

tmp-codegen-diff/aws-sdk/sdk/aws-smithy-types/Cargo.toml

@@ -1,1 +83,83 @@
    1      1   
[package]
    2      2   
name = "aws-smithy-types"
    3         -
version = "1.3.6"
           3  +
version = "1.4.0"
    4      4   
authors = [
    5      5   
    "AWS Rust SDK Team <aws-sdk-rust@amazon.com>",
    6      6   
    "Russell Cohen <rcoh@amazon.com>",
    7      7   
]
    8      8   
description = "Types for smithy-rs codegen."
    9      9   
edition = "2021"
   10     10   
license = "Apache-2.0"
   11     11   
repository = "https://github.com/smithy-lang/smithy-rs"
   12     12   
rust-version = "1.88"
   13     13   
   14     14   
[features]
   15     15   
byte-stream-poll-next = []
   16     16   
http-body-0-4-x = ["dep:http-body-0-4", "dep:http"]
   17         -
http-body-1-x = ["dep:http-body-1-0", "dep:http-body-util", "dep:http-body-0-4", "dep:http-1x", "dep:http"]
          17  +
http-body-1-x = ["dep:http-body-1-0", "dep:http-body-util", "dep:http-body-0-4", "dep:http"]
   18     18   
hyper-0-14-x = ["dep:hyper-0-14"]
   19     19   
rt-tokio = [
   20     20   
    "dep:http-body-0-4",
   21     21   
    "dep:tokio-util",
   22     22   
    "dep:tokio",
   23     23   
    "tokio?/rt",
   24     24   
    "tokio?/fs",
   25     25   
    "tokio?/io-util",
   26     26   
    "tokio-util?/io",
   27     27   
    "dep:futures-core",
   28     28   
    "dep:http"
   29     29   
]
   30     30   
test-util = []
   31     31   
serde-serialize = []
   32     32   
serde-deserialize = []
   33     33   
   34     34   
[dependencies]
   35     35   
base64-simd = "0.8"
   36     36   
bytes = "1.10.0"
   37     37   
bytes-utils = "0.1"
   38         -
http = { version = "0.2.9", optional = true }
   39         -
http-1x = { package = "http", version = "1", optional = true }
   40         -
http-body-0-4 = { package = "http-body", version = "0.4.5", optional = true }
   41         -
http-body-1-0 = { package = "http-body", version = "1", optional = true }
   42         -
http-body-util = { version = "0.1.2", optional = true }
          38  +
http = { version = "0.2.12", optional = true }
          39  +
http-1x = { package = "http", version = "1.3.1" }
          40  +
http-body-0-4 = { package = "http-body", version = "0.4.6", optional = true }
          41  +
http-body-1-0 = { package = "http-body", version = "1.0.1", optional = true }
          42  +
http-body-util = { version = "0.1.3", optional = true }
   43     43   
hyper-0-14 = { package = "hyper", version = "0.14.26", optional = true }
   44     44   
itoa = "1.0.0"
   45     45   
num-integer = "0.1.44"
   46     46   
pin-project-lite = "0.2.14"
   47     47   
pin-utils = "0.1.0"
   48     48   
ryu = "1.0.5"
   49     49   
time = { version = "0.3.4", features = ["parsing"] }
   50     50   
   51     51   
# ByteStream internals
   52     52   
futures-core = { version = "0.3.31", optional = true }
   53         -
tokio = { version = "1.40.0", optional = true }
          53  +
tokio = { version = "1.46.0", optional = true }
   54     54   
tokio-util = { version = "0.7.1", optional = true }
   55     55   
   56     56   
[dev-dependencies]
   57     57   
base64 = "0.13.0"
   58     58   
ciborium = { version = "0.2.1" }
   59     59   
lazy_static = "1.4"
   60     60   
proptest = "1"
   61     61   
rand = "0.8.4"
   62     62   
serde = { version = "1", features = ["derive"] }
   63     63   
serde_json = "1"

tmp-codegen-diff/aws-sdk/sdk/aws-smithy-types/fuzz/Cargo.toml

@@ -17,17 +50,50 @@
   37     37   
edition = "2021"
   38     38   
   39     39   
[package.metadata]
   40     40   
cargo-fuzz = true
   41     41   
   42     42   
[dependencies]
   43     43   
libfuzzer-sys = "=0.4.7"
   44     44   
   45     45   
[dependencies.aws-smithy-types]
   46     46   
path = ".."
   47         -
version = "1.3.6"
          47  +
version = "1.4.0"
   48     48   
   49     49   
[workspace]
   50     50   
members = ["."]

tmp-codegen-diff/aws-sdk/sdk/aws-smithy-types/src/body.rs

@@ -1,1 +370,492 @@
    1      1   
/*
    2      2   
 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
    3      3   
 * SPDX-License-Identifier: Apache-2.0
    4      4   
 */
    5      5   
    6      6   
//! Types for representing the body of an HTTP request or response
    7      7   
    8      8   
use bytes::Bytes;
    9      9   
use pin_project_lite::pin_project;
          10  +
use std::collections::VecDeque;
   10     11   
use std::error::Error as StdError;
   11     12   
use std::fmt::{self, Debug, Formatter};
   12     13   
use std::future::poll_fn;
   13     14   
use std::pin::Pin;
   14     15   
use std::sync::Arc;
   15     16   
use std::task::{Context, Poll};
   16     17   
   17     18   
/// This module is named after the `http-body` version number since we anticipate
   18     19   
/// needing to provide equivalent functionality for 1.x of that crate in the future.
   19     20   
/// The name has a suffix `_x` to avoid name collision with a third-party `http-body-0-4`.
   20     21   
#[cfg(feature = "http-body-0-4-x")]
   21     22   
pub mod http_body_0_4_x;
   22     23   
#[cfg(feature = "http-body-1-x")]
   23     24   
pub mod http_body_1_x;
   24     25   
   25     26   
/// A generic, boxed error that's `Send` and `Sync`
   26     27   
pub type Error = Box<dyn StdError + Send + Sync>;
   27     28   
   28     29   
pin_project! {
   29     30   
    /// SdkBody type
   30     31   
    ///
   31     32   
    /// This is the Body used for dispatching all HTTP Requests.
   32     33   
    /// For handling responses, the type of the body will be controlled
   33     34   
    /// by the HTTP stack.
   34     35   
    ///
   35     36   
    pub struct SdkBody {
   36     37   
        #[pin]
   37     38   
        inner: Inner,
   38     39   
        // An optional function to recreate the inner body
   39     40   
        //
   40     41   
        // In the event of retry, this function will be called to generate a new body. See
   41     42   
        // [`try_clone()`](SdkBody::try_clone)
   42     43   
        rebuild: Option<Arc<dyn (Fn() -> Inner) + Send + Sync>>,
   43         -
        bytes_contents: Option<Bytes>
          44  +
        bytes_contents: Option<Bytes>,
          45  +
        // Here the optionality indicates whether we have started streaming trailers, and the
          46  +
        // VecDeque serves as a buffer for trailer frames that are polled by poll_next instead
          47  +
        // of poll_next_trailers
          48  +
        trailers: Option<VecDeque<http_1x::HeaderMap>>,
   44     49   
    }
   45     50   
}
   46     51   
   47     52   
impl Debug for SdkBody {
   48     53   
    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
   49     54   
        f.debug_struct("SdkBody")
   50     55   
            .field("inner", &self.inner)
   51     56   
            .field("retryable", &self.rebuild.is_some())
   52     57   
            .finish()
   53     58   
    }
   54     59   
}
   55     60   
   56     61   
/// A boxed generic HTTP body that, when consumed, will result in [`Bytes`] or an [`Error`].
          62  +
#[allow(dead_code)]
   57     63   
enum BoxBody {
   58     64   
    // This is enabled by the **dependency**, not the feature. This allows us to construct it
   59     65   
    // whenever we have the dependency and keep the APIs private
   60     66   
    #[cfg(any(
   61     67   
        feature = "http-body-0-4-x",
   62     68   
        feature = "http-body-1-x",
   63     69   
        feature = "rt-tokio"
   64     70   
    ))]
   65     71   
    // will be dead code with `--no-default-features --features rt-tokio`
   66     72   
    HttpBody04(#[allow(dead_code)] http_body_0_4::combinators::BoxBody<Bytes, Error>),
          73  +
          74  +
    #[cfg(feature = "http-body-1-x")]
          75  +
    HttpBody1(#[allow(dead_code)] http_body_util::combinators::BoxBody<Bytes, Error>),
   67     76   
}
   68     77   
   69     78   
pin_project! {
   70     79   
    #[project = InnerProj]
   71     80   
    enum Inner {
   72     81   
        // An in-memory body
   73     82   
        Once {
   74     83   
            inner: Option<Bytes>
   75     84   
        },
   76     85   
        // A streaming body
   77     86   
        Dyn {
   78     87   
            #[pin]
   79     88   
            inner: BoxBody,
   80     89   
        },
   81     90   
   82     91   
        /// When a streaming body is transferred out to a stream parser, the body is replaced with
   83     92   
        /// `Taken`. This will return an Error when polled. Attempting to read data out of a `Taken`
   84     93   
        /// Body is a bug.
   85     94   
        Taken,
   86     95   
    }
   87     96   
}
   88     97   
   89     98   
impl Debug for Inner {
   90     99   
    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
   91    100   
        match &self {
   92    101   
            Inner::Once { inner: once } => f.debug_tuple("Once").field(once).finish(),
   93    102   
            Inner::Dyn { .. } => write!(f, "BoxBody"),
   94    103   
            Inner::Taken => f.debug_tuple("Taken").finish(),
   95    104   
        }
   96    105   
    }
   97    106   
}
   98    107   
   99    108   
impl SdkBody {
  100    109   
    /// Construct an explicitly retryable SDK body
  101    110   
    ///
  102    111   
    /// _Note: This is probably not what you want_
  103    112   
    ///
  104    113   
    /// All bodies constructed from in-memory data (`String`, `Vec<u8>`, `Bytes`, etc.) will be
  105    114   
    /// retryable out of the box. If you want to read data from a file, you should use
  106    115   
    /// [`ByteStream::from_path`](crate::byte_stream::ByteStream::from_path). This function
  107    116   
    /// is only necessary when you need to enable retries for your own streaming container.
  108    117   
    pub fn retryable(f: impl Fn() -> SdkBody + Send + Sync + 'static) -> Self {
  109    118   
        let initial = f();
  110    119   
        SdkBody {
  111    120   
            inner: initial.inner,
  112    121   
            rebuild: Some(Arc::new(move || f().inner)),
  113    122   
            bytes_contents: initial.bytes_contents,
         123  +
            trailers: None,
  114    124   
        }
  115    125   
    }
  116    126   
  117    127   
    /// When an SdkBody is read, the inner data must be consumed. In order to do this, the SdkBody
  118    128   
    /// is swapped with a "taken" body. This "taken" body cannot be read but aids in debugging.
  119    129   
    pub fn taken() -> Self {
  120    130   
        Self {
  121    131   
            inner: Inner::Taken,
  122    132   
            rebuild: None,
  123    133   
            bytes_contents: None,
         134  +
            trailers: None,
  124    135   
        }
  125    136   
    }
  126    137   
  127    138   
    /// Create an empty SdkBody for requests and responses that don't transfer any data in the body.
  128    139   
    pub fn empty() -> Self {
  129    140   
        Self {
  130    141   
            inner: Inner::Once { inner: None },
  131    142   
            rebuild: Some(Arc::new(|| Inner::Once { inner: None })),
  132    143   
            bytes_contents: Some(Bytes::new()),
         144  +
            trailers: None,
  133    145   
        }
  134    146   
    }
  135    147   
  136    148   
    pub(crate) async fn next(&mut self) -> Option<Result<Bytes, Error>> {
  137    149   
        let mut me = Pin::new(self);
  138    150   
        poll_fn(|cx| me.as_mut().poll_next(cx)).await
  139    151   
    }
  140    152   
  141    153   
    pub(crate) fn poll_next(
  142    154   
        self: Pin<&mut Self>,
  143    155   
        #[allow(unused)] cx: &mut Context<'_>,
  144    156   
    ) -> Poll<Option<Result<Bytes, Error>>> {
  145    157   
        let this = self.project();
  146    158   
        match this.inner.project() {
  147    159   
            InnerProj::Once { ref mut inner } => {
  148    160   
                let data = inner.take();
  149    161   
                match data {
  150    162   
                    Some(bytes) if bytes.is_empty() => Poll::Ready(None),
  151    163   
                    Some(bytes) => Poll::Ready(Some(Ok(bytes))),
  152    164   
                    None => Poll::Ready(None),
  153    165   
                }
  154    166   
            }
  155    167   
            InnerProj::Dyn { inner: body } => match body.get_mut() {
  156    168   
                #[cfg(feature = "http-body-0-4-x")]
  157    169   
                BoxBody::HttpBody04(box_body) => {
  158    170   
                    use http_body_0_4::Body;
  159    171   
                    Pin::new(box_body).poll_data(cx)
  160    172   
                }
         173  +
                #[cfg(feature = "http-body-1-x")]
         174  +
                BoxBody::HttpBody1(box_body) => {
         175  +
                    // If this is polled after the trailers have been cached end early
         176  +
                    if this.trailers.is_some() {
         177  +
                        return Poll::Ready(None);
         178  +
                    }
         179  +
                    use http_body_1_0::Body;
         180  +
                    let maybe_data = Pin::new(box_body).poll_frame(cx);
         181  +
                    match maybe_data {
         182  +
                        Poll::Ready(Some(Ok(frame))) => {
         183  +
                            if frame.is_data() {
         184  +
                                Poll::Ready(Some(Ok(frame
         185  +
                                    .into_data()
         186  +
                                    .expect("Confirmed data frame"))))
         187  +
                            } else if frame.is_trailers() {
         188  +
                                let trailers =
         189  +
                                    frame.into_trailers().expect("Confirmed trailer frame");
         190  +
                                // Buffer the trailers for the trailer poll
         191  +
                                this.trailers.get_or_insert_with(VecDeque::new).push_back(trailers);
         192  +
         193  +
                                Poll::Ready(None)
         194  +
                            } else {
         195  +
                                unreachable!("Frame must be either data or trailers");
         196  +
                            }
         197  +
                        }
         198  +
                        Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err))),
         199  +
                        Poll::Ready(None) => Poll::Ready(None),
         200  +
                        Poll::Pending => Poll::Pending,
         201  +
                    }
         202  +
                }
  161    203   
                #[allow(unreachable_patterns)]
  162    204   
                _ => unreachable!(
  163         -
                    "enabling `http-body-0-4-x` is the only way to create the `Dyn` variant"
         205  +
                    "enabling `http-body-0-4-x` or `http-body-1-x` is the only way to create the `Dyn` variant"
  164    206   
                ),
  165    207   
            },
  166    208   
            InnerProj::Taken => {
  167    209   
                Poll::Ready(Some(Err("A `Taken` body should never be polled".into())))
  168    210   
            }
  169    211   
        }
  170    212   
    }
  171    213   
         214  +
    #[allow(dead_code)]
  172    215   
    #[cfg(any(
  173    216   
        feature = "http-body-0-4-x",
  174    217   
        feature = "http-body-1-x",
  175    218   
        feature = "rt-tokio"
  176    219   
    ))]
  177    220   
    pub(crate) fn from_body_0_4_internal<T, E>(body: T) -> Self
  178    221   
    where
  179    222   
        T: http_body_0_4::Body<Data = Bytes, Error = E> + Send + Sync + 'static,
  180    223   
        E: Into<Error> + 'static,
  181    224   
    {
  182    225   
        Self {
  183    226   
            inner: Inner::Dyn {
  184    227   
                inner: BoxBody::HttpBody04(http_body_0_4::combinators::BoxBody::new(
  185    228   
                    body.map_err(Into::into),
  186    229   
                )),
  187    230   
            },
  188    231   
            rebuild: None,
  189    232   
            bytes_contents: None,
         233  +
            trailers: None,
         234  +
        }
         235  +
    }
         236  +
         237  +
    #[cfg(feature = "http-body-1-x")]
         238  +
    pub(crate) fn from_body_1_x_internal<T, E>(body: T) -> Self
         239  +
    where
         240  +
        T: http_body_1_0::Body<Data = Bytes, Error = E> + Send + Sync + 'static,
         241  +
        E: Into<Error> + 'static,
         242  +
    {
         243  +
        use http_body_util::BodyExt;
         244  +
        Self {
         245  +
            inner: Inner::Dyn {
         246  +
                inner: BoxBody::HttpBody1(http_body_util::combinators::BoxBody::new(
         247  +
                    body.map_err(Into::into),
         248  +
                )),
         249  +
            },
         250  +
            rebuild: None,
         251  +
            bytes_contents: None,
         252  +
            trailers: None,
  190    253   
        }
  191    254   
    }
  192    255   
  193    256   
    #[cfg(any(feature = "http-body-0-4-x", feature = "http-body-1-x",))]
  194    257   
    pub(crate) fn poll_next_trailers(
  195    258   
        self: Pin<&mut Self>,
  196    259   
        cx: &mut Context<'_>,
  197         -
    ) -> Poll<Result<Option<http::HeaderMap<http::HeaderValue>>, Error>> {
         260  +
    ) -> Poll<Result<Option<http_1x::HeaderMap<http_1x::HeaderValue>>, Error>> {
         261  +
        // Three cases that matter here:
         262  +
        // 1) Both http-body features disabled, doesn't matter because this func won't compile
         263  +
        // 2) http-body-0-4-x enabled but 1-x disabled, we use the http_body_0_4_x conversion
         264  +
        // 3) http-body-1-x enabled (and 0-4-x is enabled or disabled), we use the 1-x conversion
         265  +
        // as our default whenever it is available
         266  +
        #[cfg(all(feature = "http-body-0-4-x", not(feature = "http-body-1-x")))]
         267  +
        use crate::body::http_body_0_4_x::convert_headers_0x_1x;
         268  +
        #[cfg(feature = "http-body-1-x")]
         269  +
        use crate::body::http_body_1_x::convert_headers_0x_1x;
         270  +
  198    271   
        let this = self.project();
  199    272   
        match this.inner.project() {
  200    273   
            InnerProj::Once { .. } => Poll::Ready(Ok(None)),
  201    274   
            InnerProj::Dyn { inner } => match inner.get_mut() {
  202    275   
                BoxBody::HttpBody04(box_body) => {
  203    276   
                    use http_body_0_4::Body;
  204         -
                    Pin::new(box_body).poll_trailers(cx)
         277  +
                    let polled = Pin::new(box_body).poll_trailers(cx);
         278  +
         279  +
                    match polled {
         280  +
                        Poll::Ready(Ok(maybe_trailers)) => {
         281  +
                            let http_1x_trailers = maybe_trailers.map(convert_headers_0x_1x);
         282  +
                            Poll::Ready(Ok(http_1x_trailers))
         283  +
                        }
         284  +
                        Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
         285  +
                        Poll::Pending => Poll::Pending,
         286  +
                    }
         287  +
                }
         288  +
                #[cfg(feature = "http-body-1-x")]
         289  +
                BoxBody::HttpBody1(box_body) => {
         290  +
                    use http_body_1_0::Body;
         291  +
                    // Return the cached trailers without polling
         292  +
                    if let Some(trailer_buf) = this.trailers {
         293  +
                        if let Some(next_trailer) = trailer_buf.pop_front() {
         294  +
                            return Poll::Ready(Ok(Some(next_trailer)));
         295  +
                        }
         296  +
                    }
         297  +
         298  +
                    let polled = Pin::new(box_body).poll_frame(cx);
         299  +
                    match polled {
         300  +
                        Poll::Ready(Some(Ok(maybe_trailers))) => {
         301  +
                            if maybe_trailers.is_data() {
         302  +
                                Poll::Ready(Err("Trailers polled while body still has data".into()))
         303  +
                            } else {
         304  +
                                let trailers = maybe_trailers
         305  +
                                    .into_trailers()
         306  +
                                    .expect("Frame must be trailers because it is not data");
         307  +
                                Poll::Ready(Ok(Some(trailers)))
         308  +
                            }
         309  +
                        }
         310  +
                        Poll::Ready(None) => Poll::Ready(Ok(None)),
         311  +
                        Poll::Ready(Some(Err(err))) => Poll::Ready(Err(err)),
         312  +
                        Poll::Pending => Poll::Pending,
         313  +
                    }
  205    314   
                }
  206    315   
            },
  207    316   
            InnerProj::Taken => Poll::Ready(Err(
  208    317   
                "A `Taken` body should never be polled for trailers".into(),
  209    318   
            )),
  210    319   
        }
  211    320   
    }
  212    321   
  213    322   
    /// If possible, return a reference to this body as `&[u8]`
  214    323   
    ///
  215    324   
    /// If this SdkBody is NOT streaming, this will return the byte slab
  216    325   
    /// If this SdkBody is streaming, this will return `None`
  217    326   
    pub fn bytes(&self) -> Option<&[u8]> {
  218    327   
        match &self.bytes_contents {
  219    328   
            Some(b) => Some(b),
  220    329   
            None => None,
  221    330   
        }
  222    331   
    }
  223    332   
  224    333   
    /// Attempt to clone this SdkBody. This will fail if the inner data is not cloneable, such as when
  225    334   
    /// it is a single-use stream that can't be recreated.
  226    335   
    pub fn try_clone(&self) -> Option<Self> {
  227    336   
        self.rebuild.as_ref().map(|rebuild| {
  228    337   
            let next = rebuild();
  229    338   
            Self {
  230    339   
                inner: next,
  231    340   
                rebuild: self.rebuild.clone(),
  232    341   
                bytes_contents: self.bytes_contents.clone(),
         342  +
                trailers: self.trailers.clone(),
  233    343   
            }
  234    344   
        })
  235    345   
    }
  236    346   
  237    347   
    /// Return `true` if this SdkBody is streaming, `false` if it is in-memory.
  238    348   
    pub fn is_streaming(&self) -> bool {
  239    349   
        matches!(self.inner, Inner::Dyn { .. })
  240    350   
    }
  241    351   
  242    352   
    /// Return the length, in bytes, of this SdkBody. If this returns `None`, then the body does not
  243    353   
    /// have a known length.
  244    354   
    pub fn content_length(&self) -> Option<u64> {
  245    355   
        match self.bounds_on_remaining_length() {
  246    356   
            (lo, Some(hi)) if lo == hi => Some(lo),
  247    357   
            _ => None,
  248    358   
        }
  249    359   
    }
  250    360   
  251    361   
    #[allow(dead_code)] // used by a feature-gated `http-body`'s trait method
  252    362   
    pub(crate) fn is_end_stream(&self) -> bool {
  253    363   
        match &self.inner {
  254    364   
            Inner::Once { inner: None } => true,
  255    365   
            Inner::Once { inner: Some(bytes) } => bytes.is_empty(),
  256    366   
            Inner::Dyn { inner: box_body } => match box_body {
  257    367   
                #[cfg(feature = "http-body-0-4-x")]
  258    368   
                BoxBody::HttpBody04(box_body) => {
  259    369   
                    use http_body_0_4::Body;
  260    370   
                    box_body.is_end_stream()
  261    371   
                }
         372  +
                #[cfg(feature = "http-body-1-x")]
         373  +
                BoxBody::HttpBody1(box_body) => {
         374  +
                    use http_body_1_0::Body;
         375  +
                    box_body.is_end_stream()
         376  +
                }
  262    377   
                #[allow(unreachable_patterns)]
  263    378   
                _ => unreachable!(
  264         -
                    "enabling `http-body-0-4-x` is the only way to create the `Dyn` variant"
         379  +
                    "enabling `http-body-0-4-x` or `http-body-1-x` is the only way to create the `Dyn` variant"
  265    380   
                ),
  266    381   
            },
  267    382   
            Inner::Taken => true,
  268    383   
        }
  269    384   
    }
  270    385   
  271    386   
    pub(crate) fn bounds_on_remaining_length(&self) -> (u64, Option<u64>) {
  272    387   
        match &self.inner {
  273    388   
            Inner::Once { inner: None } => (0, Some(0)),
  274    389   
            Inner::Once { inner: Some(bytes) } => {
  275    390   
                let len = bytes.len() as u64;
  276    391   
                (len, Some(len))
  277    392   
            }
  278    393   
            Inner::Dyn { inner: box_body } => match box_body {
  279    394   
                #[cfg(feature = "http-body-0-4-x")]
  280    395   
                BoxBody::HttpBody04(box_body) => {
  281    396   
                    use http_body_0_4::Body;
  282    397   
                    let hint = box_body.size_hint();
  283    398   
                    (hint.lower(), hint.upper())
  284    399   
                }
         400  +
                #[cfg(feature = "http-body-1-x")]
         401  +
                BoxBody::HttpBody1(box_body) => {
         402  +
                    use http_body_1_0::Body;
         403  +
                    let hint = box_body.size_hint();
         404  +
                    (hint.lower(), hint.upper())
         405  +
                }
  285    406   
                #[allow(unreachable_patterns)]
  286    407   
                _ => unreachable!(
  287         -
                    "enabling `http-body-0-4-x` is the only way to create the `Dyn` variant"
         408  +
                    "enabling `http-body-0-4-x` or `http-body-1-x` is the only way to create the `Dyn` variant"
  288    409   
                ),
  289    410   
            },
  290    411   
            Inner::Taken => (0, Some(0)),
  291    412   
        }
  292    413   
    }
  293    414   
  294    415   
    /// Given a function to modify an `SdkBody`, run that function against this `SdkBody` before
  295    416   
    /// returning the result.
  296    417   
    pub fn map(self, f: impl Fn(SdkBody) -> SdkBody + Sync + Send + 'static) -> SdkBody {
  297    418   
        if self.rebuild.is_some() {
  298    419   
            SdkBody::retryable(move || f(self.try_clone().unwrap()))
  299    420   
        } else {
  300    421   
            f(self)
  301    422   
        }
  302    423   
    }
  303    424   
  304    425   
    /// Update this `SdkBody` with `map`. **This function MUST NOT alter the data of the body.**
  305    426   
    ///
  306    427   
    /// This function is useful for adding metadata like progress tracking to an [`SdkBody`] that
  307    428   
    /// does not alter the actual byte data. If your mapper alters the contents of the body, use [`SdkBody::map`]
  308    429   
    /// instead.
  309    430   
    pub fn map_preserve_contents(
  310    431   
        self,
  311    432   
        f: impl Fn(SdkBody) -> SdkBody + Sync + Send + 'static,
  312    433   
    ) -> SdkBody {
  313    434   
        let contents = self.bytes_contents.clone();
  314    435   
        let mut out = if self.rebuild.is_some() {
  315    436   
            SdkBody::retryable(move || f(self.try_clone().unwrap()))
  316    437   
        } else {
  317    438   
            f(self)
  318    439   
        };
  319    440   
        out.bytes_contents = contents;
  320    441   
        out
  321    442   
    }
  322    443   
}
  323    444   
  324    445   
impl From<&str> for SdkBody {
  325    446   
    fn from(s: &str) -> Self {
  326    447   
        Self::from(s.as_bytes())
  327    448   
    }
  328    449   
}
  329    450   
  330    451   
impl From<Bytes> for SdkBody {
  331    452   
    fn from(bytes: Bytes) -> Self {
  332    453   
        let b = bytes.clone();
  333    454   
        SdkBody {
  334    455   
            inner: Inner::Once {
  335    456   
                inner: Some(bytes.clone()),
  336    457   
            },
  337    458   
            rebuild: Some(Arc::new(move || Inner::Once {
  338    459   
                inner: Some(bytes.clone()),
  339    460   
            })),
  340    461   
            bytes_contents: Some(b),
         462  +
            trailers: None,
  341    463   
        }
  342    464   
    }
  343    465   
}
  344    466   
  345    467   
impl From<Vec<u8>> for SdkBody {
  346    468   
    fn from(data: Vec<u8>) -> Self {
  347    469   
        Self::from(Bytes::from(data))
  348    470   
    }
  349    471   
}
  350    472   

tmp-codegen-diff/aws-sdk/sdk/aws-smithy-types/src/body/http_body_0_4_x.rs

@@ -18,18 +86,121 @@
   38     38   
        self: Pin<&mut Self>,
   39     39   
        cx: &mut Context<'_>,
   40     40   
    ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
   41     41   
        self.poll_next(cx)
   42     42   
    }
   43     43   
   44     44   
    fn poll_trailers(
   45     45   
        self: Pin<&mut Self>,
   46     46   
        cx: &mut Context<'_>,
   47     47   
    ) -> Poll<Result<Option<http::HeaderMap<http::HeaderValue>>, Self::Error>> {
   48         -
        self.poll_next_trailers(cx)
          48  +
        let polled = self.poll_next_trailers(cx);
          49  +
        match polled {
          50  +
            Poll::Ready(Ok(Some(headers))) => Poll::Ready(Ok(Some(convert_headers_1x_0x(headers)))),
          51  +
            Poll::Ready(Ok(None)) => Poll::Ready(Ok(None)),
          52  +
            Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
          53  +
            Poll::Pending => Poll::Pending,
          54  +
        }
   49     55   
    }
   50     56   
   51     57   
    fn is_end_stream(&self) -> bool {
   52     58   
        self.is_end_stream()
   53     59   
    }
   54     60   
   55     61   
    fn size_hint(&self) -> http_body_0_4::SizeHint {
   56     62   
        let mut result = http_body_0_4::SizeHint::default();
   57     63   
        let (lower, upper) = self.bounds_on_remaining_length();
   58     64   
        result.set_lower(lower);
   59     65   
        if let Some(u) = upper {
   60     66   
            result.set_upper(u)
   61     67   
        }
   62     68   
        result
   63     69   
    }
   64     70   
}
   65     71   
          72  +
pub(crate) fn convert_headers_1x_0x(input: http_1x::HeaderMap) -> http::HeaderMap {
          73  +
    let mut map = http::HeaderMap::with_capacity(input.capacity());
          74  +
    let mut mem: Option<http_1x::HeaderName> = None;
          75  +
    for (k, v) in input.into_iter() {
          76  +
        let name = k.or_else(|| mem.clone()).unwrap();
          77  +
        map.append(
          78  +
            http::HeaderName::from_bytes(name.as_str().as_bytes()).expect("already validated"),
          79  +
            http::HeaderValue::from_bytes(v.as_bytes()).expect("already validated"),
          80  +
        );
          81  +
        mem = Some(name);
          82  +
    }
          83  +
    map
          84  +
}
          85  +
          86  +
#[allow(dead_code)]
          87  +
pub(crate) fn convert_headers_0x_1x(input: http::HeaderMap) -> http_1x::HeaderMap {
          88  +
    let mut map = http_1x::HeaderMap::with_capacity(input.capacity());
          89  +
    let mut mem: Option<http::HeaderName> = None;
          90  +
    for (k, v) in input.into_iter() {
          91  +
        let name = k.or_else(|| mem.clone()).unwrap();
          92  +
        map.append(
          93  +
            http_1x::HeaderName::from_bytes(name.as_str().as_bytes()).expect("already validated"),
          94  +
            http_1x::HeaderValue::from_bytes(v.as_bytes()).expect("already validated"),
          95  +
        );
          96  +
        mem = Some(name);
          97  +
    }
          98  +
    map
          99  +
}
         100  +
   66    101   
#[cfg(test)]
   67    102   
mod tests {
   68    103   
    use crate::body::SdkBody;
   69    104   
   70    105   
    #[test]
   71    106   
    fn map_preserve_preserves_bytes_hint() {
   72    107   
        let initial = SdkBody::from("hello!");
   73    108   
        assert_eq!(initial.bytes(), Some(b"hello!".as_slice()));
   74    109   
   75    110   
        let new_body = initial.map_preserve_contents(SdkBody::from_body_0_4);

tmp-codegen-diff/aws-sdk/sdk/aws-smithy-types/src/body/http_body_1_x.rs

@@ -1,1 +201,206 @@
   14     14   
   15     15   
use crate::body::{Error, SdkBody};
   16     16   
   17     17   
impl SdkBody {
   18     18   
    /// Construct an `SdkBody` from a type that implements [`http_body_1_0::Body<Data = Bytes>`](http_body_1_0::Body).
   19     19   
    pub fn from_body_1_x<T, E>(body: T) -> Self
   20     20   
    where
   21     21   
        T: http_body_1_0::Body<Data = Bytes, Error = E> + Send + Sync + 'static,
   22     22   
        E: Into<Error> + 'static,
   23     23   
    {
   24         -
        SdkBody::from_body_0_4_internal(Http1toHttp04::new(body.map_err(Into::into)))
          24  +
        SdkBody::from_body_1_x_internal(body.map_err(Into::into))
   25     25   
    }
   26     26   
   27     27   
    pub(crate) fn poll_data_frame(
   28     28   
        mut self: Pin<&mut Self>,
   29     29   
        cx: &mut Context<'_>,
   30     30   
    ) -> Poll<Option<Result<http_body_1_0::Frame<Bytes>, Error>>> {
   31     31   
        match ready!(self.as_mut().poll_next(cx)) {
   32     32   
            // if there's no more data, try to return trailers
   33     33   
            None => match ready!(self.poll_next_trailers(cx)) {
   34         -
                Ok(Some(trailers)) => Poll::Ready(Some(Ok(http_body_1_0::Frame::trailers(
   35         -
                    convert_headers_0x_1x(trailers),
   36         -
                )))),
          34  +
                Ok(Some(trailers)) => {
          35  +
                    Poll::Ready(Some(Ok(http_body_1_0::Frame::trailers(trailers))))
          36  +
                }
   37     37   
                Ok(None) => Poll::Ready(None),
   38     38   
                Err(e) => Poll::Ready(Some(Err(e))),
   39     39   
            },
   40     40   
            Some(result) => match result {
   41     41   
                Err(err) => Poll::Ready(Some(Err(err))),
   42     42   
                Ok(bytes) => Poll::Ready(Some(Ok(http_body_1_0::Frame::data(bytes)))),
   43     43   
            },
   44     44   
        }
   45     45   
    }
   46     46   
}
   47     47   
   48     48   
#[cfg(feature = "http-body-1-x")]
   49     49   
impl http_body_1_0::Body for SdkBody {
   50     50   
    type Data = Bytes;
   51     51   
    type Error = Error;
   52     52   
   53     53   
    fn poll_frame(
   54     54   
        self: Pin<&mut Self>,
   55     55   
        cx: &mut Context<'_>,
   56     56   
    ) -> Poll<Option<Result<http_body_1_0::Frame<Self::Data>, Self::Error>>> {
   57     57   
        self.poll_data_frame(cx)
   58     58   
    }
   59     59   
   60     60   
    fn is_end_stream(&self) -> bool {
   61     61   
        self.is_end_stream()
   62     62   
    }
   63     63   
   64     64   
    fn size_hint(&self) -> http_body_1_0::SizeHint {
   65     65   
        let mut hint = http_body_1_0::SizeHint::default();
   66     66   
        let (lower, upper) = self.bounds_on_remaining_length();
   67     67   
        hint.set_lower(lower);
   68     68   
        if let Some(upper) = upper {
   69     69   
            hint.set_upper(upper);
   70     70   
        }
   71     71   
        hint
   72     72   
    }
   73     73   
}
   74     74   
   75     75   
pin_project! {
   76     76   
    struct Http1toHttp04<B> {
   77     77   
        #[pin]
   78     78   
        inner: B,
   79     79   
        trailers: Option<http_1x::HeaderMap>,
   80     80   
    }
   81     81   
}
   82     82   
   83     83   
impl<B> Http1toHttp04<B> {
          84  +
    #[allow(dead_code)]
   84     85   
    fn new(inner: B) -> Self {
   85     86   
        Self {
   86     87   
            inner,
   87     88   
            trailers: None,
   88     89   
        }
   89     90   
    }
   90     91   
}
   91     92   
   92     93   
impl<B> http_body_0_4::Body for Http1toHttp04<B>
   93     94   
where
   94     95   
    B: http_body_1_0::Body,
   95     96   
{
   96     97   
    type Data = B::Data;
   97     98   
    type Error = B::Error;
   98     99   
   99    100   
    fn poll_data(
  100    101   
        mut self: Pin<&mut Self>,
  101    102   
        cx: &mut Context<'_>,
  102    103   
    ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
  103    104   
        loop {
  104    105   
            let this = self.as_mut().project();
  105    106   
            match ready!(this.inner.poll_frame(cx)) {
  106    107   
                Some(Ok(frame)) => {
  107    108   
                    let frame = match frame.into_data() {
  108    109   
                        Ok(data) => return Poll::Ready(Some(Ok(data))),
  109    110   
                        Err(frame) => frame,
  110    111   
                    };
  111    112   
                    // when we get a trailers frame, store the trailers for the next poll
  112    113   
                    if let Ok(trailers) = frame.into_trailers() {
         114  +
                        if let Some(trailer_map) = this.trailers {
         115  +
                            trailer_map.extend(trailers);
         116  +
                        } else {
  113    117   
                            this.trailers.replace(trailers);
         118  +
                        }
  114    119   
                        return Poll::Ready(None);
  115    120   
                    };
  116    121   
                    // if the frame type was unknown, discard it. the next one might be something
  117    122   
                    // useful
  118    123   
                }
  119    124   
                Some(Err(e)) => return Poll::Ready(Some(Err(e))),
  120    125   
                None => return Poll::Ready(None),
  121    126   
            }
  122    127   
        }
  123    128   
    }
  124    129   
  125    130   
    fn poll_trailers(
  126    131   
        self: Pin<&mut Self>,
  127    132   
        _cx: &mut Context<'_>,
  128    133   
    ) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
  129    134   
        // all of the polling happens in poll_data, once we get to the trailers we've actually
  130    135   
        // already read everything
  131    136   
        let this = self.project();
  132    137   
        match this.trailers.take() {
  133    138   
            Some(headers) => Poll::Ready(Ok(Some(convert_headers_1x_0x(headers)))),
  134    139   
            None => Poll::Ready(Ok(None)),
  135    140   
        }
  136    141   
    }
  137    142   
  138    143   
    fn is_end_stream(&self) -> bool {
  139    144   
        self.inner.is_end_stream()
  140    145   
    }
  141    146   
  142    147   
    fn size_hint(&self) -> http_body_0_4::SizeHint {
  143    148   
        let mut size_hint = http_body_0_4::SizeHint::new();
  144    149   
        let inner_hint = self.inner.size_hint();
  145    150   
        if let Some(exact) = inner_hint.exact() {
  146    151   
            size_hint.set_exact(exact);
  147    152   
        } else {
  148    153   
            size_hint.set_lower(inner_hint.lower());
  149    154   
            if let Some(upper) = inner_hint.upper() {
  150    155   
                size_hint.set_upper(upper);
  151    156   
            }
  152    157   
        }
  153    158   
        size_hint
  154    159   
    }
  155    160   
}
  156    161   
  157         -
fn convert_headers_1x_0x(input: http_1x::HeaderMap) -> http::HeaderMap {
         162  +
pub(crate) fn convert_headers_1x_0x(input: http_1x::HeaderMap) -> http::HeaderMap {
  158    163   
    let mut map = http::HeaderMap::with_capacity(input.capacity());
  159    164   
    let mut mem: Option<http_1x::HeaderName> = None;
  160    165   
    for (k, v) in input.into_iter() {
  161    166   
        let name = k.or_else(|| mem.clone()).unwrap();
  162    167   
        map.append(
  163    168   
            http::HeaderName::from_bytes(name.as_str().as_bytes()).expect("already validated"),
  164    169   
            http::HeaderValue::from_bytes(v.as_bytes()).expect("already validated"),
  165    170   
        );
  166    171   
        mem = Some(name);
  167    172   
    }
  168    173   
    map
  169    174   
}
  170    175   
  171         -
fn convert_headers_0x_1x(input: http::HeaderMap) -> http_1x::HeaderMap {
         176  +
pub(crate) fn convert_headers_0x_1x(input: http::HeaderMap) -> http_1x::HeaderMap {
  172    177   
    let mut map = http_1x::HeaderMap::with_capacity(input.capacity());
  173    178   
    let mut mem: Option<http::HeaderName> = None;
  174    179   
    for (k, v) in input.into_iter() {
  175    180   
        let name = k.or_else(|| mem.clone()).unwrap();
  176    181   
        map.append(
  177    182   
            http_1x::HeaderName::from_bytes(name.as_str().as_bytes()).expect("already validated"),
  178    183   
            http_1x::HeaderValue::from_bytes(v.as_bytes()).expect("already validated"),
  179    184   
        );
  180    185   
        mem = Some(name);
  181    186   
    }
@@ -246,251 +375,420 @@
  266    271   
    async fn test_read_trailers() {
  267    272   
        let body = TestBody {
  268    273   
            chunks: vec![
  269    274   
                Chunk::Data("123"),
  270    275   
                Chunk::Data("456"),
  271    276   
                Chunk::Data("789"),
  272    277   
                Chunk::Trailers(trailers()),
  273    278   
            ]
  274    279   
            .into(),
  275    280   
        };
  276         -
        let mut body = SdkBody::from_body_1_x(body);
  277         -
        while let Some(_data) = http_body_0_4::Body::data(&mut body).await {}
  278         -
        assert_eq!(
  279         -
            http_body_0_4::Body::trailers(&mut body).await.unwrap(),
  280         -
            Some(convert_headers_1x_0x(trailers()))
  281         -
        );
         281  +
        let body = SdkBody::from_body_1_x(body);
         282  +
        let collected = body.collect().await.unwrap();
         283  +
        let collected_trailers = collected.trailers();
         284  +
         285  +
        assert_eq!(collected_trailers, Some(&trailers()));
  282    286   
    }
  283    287   
  284    288   
    #[tokio::test]
  285         -
    async fn test_read_trailers_as_1x() {
         289  +
    async fn test_read_multiple_trailers() {
         290  +
        let mut second_trailers = HeaderMap::new();
         291  +
        second_trailers.insert(
         292  +
            HeaderName::from_static("second-trailer"),
         293  +
            HeaderValue::from_static("second"),
         294  +
        );
         295  +
         296  +
        let mut merged_trailers = HeaderMap::new();
         297  +
        merged_trailers.extend(second_trailers.clone());
         298  +
        merged_trailers.extend(trailers());
         299  +
  286    300   
        let body = TestBody {
  287    301   
            chunks: vec![
  288    302   
                Chunk::Data("123"),
  289    303   
                Chunk::Data("456"),
  290    304   
                Chunk::Data("789"),
  291    305   
                Chunk::Trailers(trailers()),
         306  +
                Chunk::Trailers(second_trailers),
  292    307   
            ]
  293    308   
            .into(),
  294    309   
        };
  295    310   
        let body = SdkBody::from_body_1_x(body);
         311  +
        let collected = body.collect().await.unwrap();
         312  +
        let collected_trailers = collected.trailers().unwrap();
         313  +
         314  +
        assert_eq!(collected_trailers, &merged_trailers);
         315  +
    }
         316  +
         317  +
    #[tokio::test]
         318  +
    async fn test_trailers_04x_to_1x() {
         319  +
        let body = TestBody {
         320  +
            chunks: vec![
         321  +
                Chunk::Data("123"),
         322  +
                Chunk::Data("456"),
         323  +
                Chunk::Data("789"),
         324  +
                Chunk::Trailers(trailers()),
         325  +
            ]
         326  +
            .into(),
         327  +
        };
         328  +
        let body = SdkBody::from_body_0_4(Http1toHttp04::new(body));
  296    329   
  297    330   
        let collected = BodyExt::collect(body).await.expect("should succeed");
  298    331   
        assert_eq!(collected.trailers(), Some(&trailers()));
  299    332   
        assert_eq!(collected.to_bytes().as_ref(), b"123456789");
  300    333   
    }
  301    334   
  302    335   
    #[tokio::test]
  303         -
    async fn test_trailers_04x_to_1x() {
         336  +
    async fn test_multiple_trailers_04x_to_1x() {
         337  +
        let mut second_trailers = HeaderMap::new();
         338  +
        second_trailers.insert(
         339  +
            HeaderName::from_static("second-trailer"),
         340  +
            HeaderValue::from_static("second"),
         341  +
        );
         342  +
         343  +
        let mut merged_trailers = HeaderMap::new();
         344  +
        merged_trailers.extend(second_trailers.clone());
         345  +
        merged_trailers.extend(trailers());
         346  +
  304    347   
        let body = TestBody {
  305    348   
            chunks: vec![
  306    349   
                Chunk::Data("123"),
  307    350   
                Chunk::Data("456"),
  308    351   
                Chunk::Data("789"),
  309    352   
                Chunk::Trailers(trailers()),
         353  +
                Chunk::Trailers(second_trailers),
  310    354   
            ]
  311    355   
            .into(),
  312    356   
        };
  313    357   
        let body = SdkBody::from_body_0_4(Http1toHttp04::new(body));
  314    358   
  315    359   
        let collected = BodyExt::collect(body).await.expect("should succeed");
  316         -
        assert_eq!(collected.trailers(), Some(&trailers()));
         360  +
        assert_eq!(collected.trailers().unwrap(), &merged_trailers);
  317    361   
        assert_eq!(collected.to_bytes().as_ref(), b"123456789");
  318    362   
    }
  319    363   
  320    364   
    #[tokio::test]
  321    365   
    async fn test_errors() {
  322    366   
        let body = TestBody {
  323    367   
            chunks: vec![
  324    368   
                Chunk::Data("123"),
  325    369   
                Chunk::Data("456"),
  326    370   
                Chunk::Data("789"),
  327    371   
                Chunk::Error("errors!"),
  328    372   
            ]
  329    373   
            .into(),
  330    374   
        };
  331    375   
  332    376   
        let body = SdkBody::from_body_1_x(body);
  333    377   
        let body = ByteStream::new(body);
  334    378   
        body.collect().await.expect_err("body returned an error");
  335    379   
    }
  336    380   
  337    381   
    #[tokio::test]
  338    382   
    async fn test_no_trailers() {
  339    383   
        let body = TestBody {
  340    384   
            chunks: vec![Chunk::Data("123"), Chunk::Data("456"), Chunk::Data("789")].into(),
  341    385   
        };
  342    386   
  343    387   
        let body = SdkBody::from_body_1_x(body);
  344         -
        let body = ByteStream::new(body);
  345         -
        assert_eq!(body.collect().await.unwrap().to_vec(), b"123456789");
         388  +
        let collected = BodyExt::collect(body).await.expect("should succeed");
         389  +
        assert_eq!(collected.trailers(), None);
         390  +
        assert_eq!(collected.to_bytes().as_ref(), b"123456789");
  346    391   
    }
  347    392   
  348    393   
    #[test]
  349    394   
    fn test_convert_headers() {
  350    395   
        let mut http1_headermap = http_1x::HeaderMap::new();
  351    396   
        http1_headermap.append(CT1, HeaderValue::from_static("a"));
  352    397   
        http1_headermap.append(CT1, HeaderValue::from_static("b"));
  353    398   
        http1_headermap.append(CT1, HeaderValue::from_static("c"));
  354    399   
  355    400   
        http1_headermap.insert(CL1, HeaderValue::from_static("1234"));