AWS SDK

AWS SDK

rev. c4f9295a7b4566dca79c361e3a2aa9e63cdf82e7

Files changed:

tmp-codegen-diff/aws-sdk/sdk/aws-smithy-experimental/src/hyper_1_0.rs

@@ -1,0 +1246,0 @@
    1         -
/*
    2         -
 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
    3         -
 * SPDX-License-Identifier: Apache-2.0
    4         -
 */
    5         -
    6         -
use aws_smithy_async::future::timeout::TimedOutError;
    7         -
use aws_smithy_async::rt::sleep::{default_async_sleep, AsyncSleep, SharedAsyncSleep};
    8         -
use aws_smithy_runtime::client::http::connection_poisoning::CaptureSmithyConnection;
    9         -
use aws_smithy_runtime_api::box_error::BoxError;
   10         -
use aws_smithy_runtime_api::client::connection::ConnectionMetadata;
   11         -
use aws_smithy_runtime_api::client::connector_metadata::ConnectorMetadata;
   12         -
use aws_smithy_runtime_api::client::dns::ResolveDns;
   13         -
use aws_smithy_runtime_api::client::http::{
   14         -
    HttpClient, HttpConnector, HttpConnectorFuture, HttpConnectorSettings, SharedHttpClient,
   15         -
    SharedHttpConnector,
   16         -
};
   17         -
use aws_smithy_runtime_api::client::orchestrator::{HttpRequest, HttpResponse};
   18         -
use aws_smithy_runtime_api::client::result::ConnectorError;
   19         -
use aws_smithy_runtime_api::client::runtime_components::{
   20         -
    RuntimeComponents, RuntimeComponentsBuilder,
   21         -
};
   22         -
use aws_smithy_runtime_api::shared::IntoShared;
   23         -
use aws_smithy_types::body::SdkBody;
   24         -
use aws_smithy_types::config_bag::ConfigBag;
   25         -
use aws_smithy_types::error::display::DisplayErrorContext;
   26         -
use aws_smithy_types::retry::ErrorKind;
   27         -
use client::connect::Connection;
   28         -
use h2::Reason;
   29         -
use http::{Extensions, Uri};
   30         -
use hyper::rt::{Read, Write};
   31         -
use hyper_util::client::legacy as client;
   32         -
use hyper_util::client::legacy::connect::dns::Name;
   33         -
use hyper_util::client::legacy::connect::{
   34         -
    capture_connection, CaptureConnection, Connect, HttpInfo,
   35         -
};
   36         -
use hyper_util::rt::TokioExecutor;
   37         -
use rustls::crypto::CryptoProvider;
   38         -
use std::borrow::Cow;
   39         -
use std::collections::HashMap;
   40         -
use std::error::Error;
   41         -
use std::future::Future;
   42         -
use std::net::SocketAddr;
   43         -
use std::pin::Pin;
   44         -
use std::sync::RwLock;
   45         -
use std::task::{Context, Poll};
   46         -
use std::time::Duration;
   47         -
use std::{fmt, vec};
   48         -
   49         -
#[derive(Debug, Eq, PartialEq, Clone, Copy)]
   50         -
#[non_exhaustive]
   51         -
pub enum CryptoMode {
   52         -
    #[cfg(feature = "crypto-ring")]
   53         -
    Ring,
   54         -
    #[cfg(feature = "crypto-aws-lc")]
   55         -
    AwsLc,
   56         -
    #[cfg(feature = "crypto-aws-lc-fips")]
   57         -
    AwsLcFips,
   58         -
}
   59         -
   60         -
impl CryptoMode {
   61         -
    fn provider(self) -> CryptoProvider {
   62         -
        match self {
   63         -
            #[cfg(feature = "crypto-aws-lc")]
   64         -
            CryptoMode::AwsLc => rustls::crypto::aws_lc_rs::default_provider(),
   65         -
   66         -
            #[cfg(feature = "crypto-ring")]
   67         -
            CryptoMode::Ring => rustls::crypto::ring::default_provider(),
   68         -
   69         -
            #[cfg(feature = "crypto-aws-lc-fips")]
   70         -
            CryptoMode::AwsLcFips => {
   71         -
                let provider = rustls::crypto::default_fips_provider();
   72         -
                assert!(
   73         -
                    provider.fips(),
   74         -
                    "FIPS was requested but the provider did not support FIPS"
   75         -
                );
   76         -
                provider
   77         -
            }
   78         -
        }
   79         -
    }
   80         -
}
   81         -
   82         -
/// A bridge that allows our `ResolveDns` trait to work with Hyper's `Resolver` interface (based on tower)
   83         -
#[derive(Clone)]
   84         -
struct HyperUtilResolver<R> {
   85         -
    resolver: R,
   86         -
}
   87         -
   88         -
impl<R: ResolveDns + Clone + 'static> tower::Service<Name> for HyperUtilResolver<R> {
   89         -
    type Response = vec::IntoIter<SocketAddr>;
   90         -
    type Error = Box<dyn Error + Send + Sync>;
   91         -
    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
   92         -
   93         -
    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
   94         -
        Poll::Ready(Ok(()))
   95         -
    }
   96         -
   97         -
    fn call(&mut self, req: Name) -> Self::Future {
   98         -
        let resolver = self.resolver.clone();
   99         -
        Box::pin(async move {
  100         -
            let dns_entries = resolver.resolve_dns(req.as_str()).await?;
  101         -
            Ok(dns_entries
  102         -
                .into_iter()
  103         -
                .map(|ip_addr| SocketAddr::new(ip_addr, 0))
  104         -
                .collect::<Vec<_>>()
  105         -
                .into_iter())
  106         -
        })
  107         -
    }
  108         -
}
  109         -
  110         -
#[allow(unused_imports)]
  111         -
mod cached_connectors {
  112         -
    use client::connect::HttpConnector;
  113         -
    use hyper_util::client::legacy as client;
  114         -
    use hyper_util::client::legacy::connect::dns::GaiResolver;
  115         -
  116         -
    use crate::hyper_1_0::build_connector::make_tls;
  117         -
    use crate::hyper_1_0::{CryptoMode, Inner};
  118         -
  119         -
    #[cfg(feature = "crypto-ring")]
  120         -
    pub(crate) static HTTPS_NATIVE_ROOTS_RING: once_cell::sync::Lazy<
  121         -
        hyper_rustls::HttpsConnector<HttpConnector>,
  122         -
    > = once_cell::sync::Lazy::new(|| make_tls(GaiResolver::new(), CryptoMode::Ring.provider()));
  123         -
  124         -
    #[cfg(feature = "crypto-aws-lc")]
  125         -
    pub(crate) static HTTPS_NATIVE_ROOTS_AWS_LC: once_cell::sync::Lazy<
  126         -
        hyper_rustls::HttpsConnector<HttpConnector>,
  127         -
    > = once_cell::sync::Lazy::new(|| make_tls(GaiResolver::new(), CryptoMode::AwsLc.provider()));
  128         -
  129         -
    #[cfg(feature = "crypto-aws-lc-fips")]
  130         -
    pub(crate) static HTTPS_NATIVE_ROOTS_AWS_LC_FIPS: once_cell::sync::Lazy<
  131         -
        hyper_rustls::HttpsConnector<HttpConnector>,
  132         -
    > = once_cell::sync::Lazy::new(|| {
  133         -
        make_tls(GaiResolver::new(), CryptoMode::AwsLcFips.provider())
  134         -
    });
  135         -
  136         -
    pub(super) fn cached_https(mode: Inner) -> hyper_rustls::HttpsConnector<HttpConnector> {
  137         -
        match mode {
  138         -
            #[cfg(feature = "crypto-ring")]
  139         -
            Inner::Standard(CryptoMode::Ring) => HTTPS_NATIVE_ROOTS_RING.clone(),
  140         -
            #[cfg(feature = "crypto-aws-lc")]
  141         -
            Inner::Standard(CryptoMode::AwsLc) => HTTPS_NATIVE_ROOTS_AWS_LC.clone(),
  142         -
            #[cfg(feature = "crypto-aws-lc-fips")]
  143         -
            Inner::Standard(CryptoMode::AwsLcFips) => HTTPS_NATIVE_ROOTS_AWS_LC_FIPS.clone(),
  144         -
            #[allow(unreachable_patterns)]
  145         -
            Inner::Standard(_) => unreachable!("unexpected mode"),
  146         -
            Inner::Custom(provider) => make_tls(GaiResolver::new(), provider),
  147         -
        }
  148         -
    }
  149         -
}
  150         -
  151         -
mod build_connector {
  152         -
    use crate::hyper_1_0::{HyperUtilResolver, Inner};
  153         -
    use aws_smithy_runtime_api::client::dns::ResolveDns;
  154         -
    use client::connect::HttpConnector;
  155         -
    use hyper_util::client::legacy as client;
  156         -
    use rustls::crypto::CryptoProvider;
  157         -
    use std::sync::Arc;
  158         -
  159         -
    fn restrict_ciphers(base: CryptoProvider) -> CryptoProvider {
  160         -
        let suites = &[
  161         -
            rustls::CipherSuite::TLS13_AES_256_GCM_SHA384,
  162         -
            rustls::CipherSuite::TLS13_AES_128_GCM_SHA256,
  163         -
            // TLS1.2 suites
  164         -
            rustls::CipherSuite::TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
  165         -
            rustls::CipherSuite::TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
  166         -
            rustls::CipherSuite::TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
  167         -
            rustls::CipherSuite::TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
  168         -
            rustls::CipherSuite::TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_SHA256,
  169         -
        ];
  170         -
        let supported_suites = suites
  171         -
            .iter()
  172         -
            .flat_map(|suite| {
  173         -
                base.cipher_suites
  174         -
                    .iter()
  175         -
                    .find(|s| &s.suite() == suite)
  176         -
                    .cloned()
  177         -
            })
  178         -
            .collect::<Vec<_>>();
  179         -
        CryptoProvider {
  180         -
            cipher_suites: supported_suites,
  181         -
            ..base
  182         -
        }
  183         -
    }
  184         -
  185         -
    pub(crate) fn make_tls<R>(
  186         -
        resolver: R,
  187         -
        crypto_provider: CryptoProvider,
  188         -
    ) -> hyper_rustls::HttpsConnector<HttpConnector<R>> {
  189         -
        use hyper_rustls::ConfigBuilderExt;
  190         -
        let mut base_connector = HttpConnector::new_with_resolver(resolver);
  191         -
        base_connector.enforce_http(false);
  192         -
        hyper_rustls::HttpsConnectorBuilder::new()
  193         -
               .with_tls_config(
  194         -
                rustls::ClientConfig::builder_with_provider(Arc::new(restrict_ciphers(crypto_provider)))
  195         -
                    .with_safe_default_protocol_versions()
  196         -
                    .expect("Error with the TLS configuration. Please file a bug report under https://github.com/smithy-lang/smithy-rs/issues.")
  197         -
                    .with_native_roots().expect("error with TLS configuration.")
  198         -
                    .with_no_client_auth()
  199         -
            )
  200         -
            .https_or_http()
  201         -
            .enable_http1()
  202         -
            .enable_http2()
  203         -
            .wrap_connector(base_connector)
  204         -
    }
  205         -
  206         -
    pub(super) fn https_with_resolver<R: ResolveDns>(
  207         -
        crypto_provider: Inner,
  208         -
        resolver: R,
  209         -
    ) -> hyper_rustls::HttpsConnector<HttpConnector<HyperUtilResolver<R>>> {
  210         -
        make_tls(HyperUtilResolver { resolver }, crypto_provider.provider())
  211         -
    }
  212         -
}
  213         -
  214         -
/// [`HttpConnector`] that uses [`hyper`] to make HTTP requests.
  215         -
///
  216         -
/// This connector also implements socket connect and read timeouts.
  217         -
///
  218         -
/// This shouldn't be used directly in most cases.
  219         -
/// See the docs on [`HyperClientBuilder`] for examples of how
  220         -
/// to customize the Hyper client.
  221         -
#[derive(Debug)]
  222         -
pub struct HyperConnector {
  223         -
    adapter: Box<dyn HttpConnector>,
  224         -
}
  225         -
  226         -
impl HyperConnector {
  227         -
    /// Builder for a Hyper connector.
  228         -
    pub fn builder() -> HyperConnectorBuilder {
  229         -
        Default::default()
  230         -
    }
  231         -
}
  232         -
  233         -
impl HttpConnector for HyperConnector {
  234         -
    fn call(&self, request: HttpRequest) -> HttpConnectorFuture {
  235         -
        self.adapter.call(request)
  236         -
    }
  237         -
}
  238         -
  239         -
/// Builder for [`HyperConnector`].
  240         -
#[derive(Default, Debug)]
  241         -
pub struct HyperConnectorBuilder<Crypto = CryptoUnset> {
  242         -
    connector_settings: Option<HttpConnectorSettings>,
  243         -
    sleep_impl: Option<SharedAsyncSleep>,
  244         -
    client_builder: Option<hyper_util::client::legacy::Builder>,
  245         -
    #[allow(unused)]
  246         -
    crypto: Crypto,
  247         -
}
  248         -
  249         -
#[derive(Default)]
  250         -
#[non_exhaustive]
  251         -
pub struct CryptoUnset {}
  252         -
  253         -
pub struct CryptoProviderSelected {
  254         -
    crypto_provider: Inner,
  255         -
}
  256         -
  257         -
#[derive(Clone)]
  258         -
enum Inner {
  259         -
    Standard(CryptoMode),
  260         -
    #[allow(dead_code)]
  261         -
    Custom(CryptoProvider),
  262         -
}
  263         -
  264         -
impl Inner {
  265         -
    fn provider(&self) -> CryptoProvider {
  266         -
        match self {
  267         -
            Inner::Standard(mode) => mode.provider(),
  268         -
            Inner::Custom(provider) => provider.clone(),
  269         -
        }
  270         -
    }
  271         -
}
  272         -
  273         -
#[cfg(any(feature = "crypto-aws-lc", feature = "crypto-ring"))]
  274         -
impl HyperConnectorBuilder<CryptoProviderSelected> {
  275         -
    pub fn build_from_resolver<R: ResolveDns + Clone + 'static>(
  276         -
        self,
  277         -
        resolver: R,
  278         -
    ) -> HyperConnector {
  279         -
        let connector =
  280         -
            build_connector::https_with_resolver(self.crypto.crypto_provider.clone(), resolver);
  281         -
        self.build(connector)
  282         -
    }
  283         -
}
  284         -
  285         -
impl<Any> HyperConnectorBuilder<Any> {
  286         -
    /// Create a [`HyperConnector`] from this builder and a given connector.
  287         -
    pub(crate) fn build<C>(self, tcp_connector: C) -> HyperConnector
  288         -
    where
  289         -
        C: Send + Sync + 'static,
  290         -
        C: Clone,
  291         -
        C: tower::Service<Uri>,
  292         -
        C::Response: Read + Write + Connection + Send + Sync + Unpin,
  293         -
        C: Connect,
  294         -
        C::Future: Unpin + Send + 'static,
  295         -
        C::Error: Into<BoxError>,
  296         -
    {
  297         -
        let client_builder =
  298         -
            self.client_builder
  299         -
                .unwrap_or(hyper_util::client::legacy::Builder::new(
  300         -
                    TokioExecutor::new(),
  301         -
                ));
  302         -
        let sleep_impl = self.sleep_impl.or_else(default_async_sleep);
  303         -
        let (connect_timeout, read_timeout) = self
  304         -
            .connector_settings
  305         -
            .map(|c| (c.connect_timeout(), c.read_timeout()))
  306         -
            .unwrap_or((None, None));
  307         -
  308         -
        let connector = match connect_timeout {
  309         -
            Some(duration) => timeout_middleware::ConnectTimeout::new(
  310         -
                tcp_connector,
  311         -
                sleep_impl
  312         -
                    .clone()
  313         -
                    .expect("a sleep impl must be provided in order to have a connect timeout"),
  314         -
                duration,
  315         -
            ),
  316         -
            None => timeout_middleware::ConnectTimeout::no_timeout(tcp_connector),
  317         -
        };
  318         -
        let base = client_builder.build(connector);
  319         -
        let read_timeout = match read_timeout {
  320         -
            Some(duration) => timeout_middleware::HttpReadTimeout::new(
  321         -
                base,
  322         -
                sleep_impl.expect("a sleep impl must be provided in order to have a read timeout"),
  323         -
                duration,
  324         -
            ),
  325         -
            None => timeout_middleware::HttpReadTimeout::no_timeout(base),
  326         -
        };
  327         -
        HyperConnector {
  328         -
            adapter: Box::new(Adapter {
  329         -
                client: read_timeout,
  330         -
            }),
  331         -
        }
  332         -
    }
  333         -
  334         -
    /// Set the async sleep implementation used for timeouts
  335         -
    ///
  336         -
    /// Calling this is only necessary for testing or to use something other than
  337         -
    /// [`default_async_sleep`].
  338         -
    pub fn sleep_impl(mut self, sleep_impl: impl AsyncSleep + 'static) -> Self {
  339         -
        self.sleep_impl = Some(sleep_impl.into_shared());
  340         -
        self
  341         -
    }
  342         -
  343         -
    /// Set the async sleep implementation used for timeouts
  344         -
    ///
  345         -
    /// Calling this is only necessary for testing or to use something other than
  346         -
    /// [`default_async_sleep`].
  347         -
    pub fn set_sleep_impl(&mut self, sleep_impl: Option<SharedAsyncSleep>) -> &mut Self {
  348         -
        self.sleep_impl = sleep_impl;
  349         -
        self
  350         -
    }
  351         -
  352         -
    /// Configure the HTTP settings for the `HyperAdapter`
  353         -
    pub fn connector_settings(mut self, connector_settings: HttpConnectorSettings) -> Self {
  354         -
        self.connector_settings = Some(connector_settings);
  355         -
        self
  356         -
    }
  357         -
  358         -
    /// Configure the HTTP settings for the `HyperAdapter`
  359         -
    pub fn set_connector_settings(
  360         -
        &mut self,
  361         -
        connector_settings: Option<HttpConnectorSettings>,
  362         -
    ) -> &mut Self {
  363         -
        self.connector_settings = connector_settings;
  364         -
        self
  365         -
    }
  366         -
  367         -
    /// Override the Hyper client [`Builder`](hyper_util::client::legacy::Builder) used to construct this client.
  368         -
    ///
  369         -
    /// This enables changing settings like forcing HTTP2 and modifying other default client behavior.
  370         -
    pub(crate) fn hyper_builder(
  371         -
        mut self,
  372         -
        hyper_builder: hyper_util::client::legacy::Builder,
  373         -
    ) -> Self {
  374         -
        self.set_hyper_builder(Some(hyper_builder));
  375         -
        self
  376         -
    }
  377         -
  378         -
    /// Override the Hyper client [`Builder`](hyper_util::client::legacy::Builder) used to construct this client.
  379         -
    ///
  380         -
    /// This enables changing settings like forcing HTTP2 and modifying other default client behavior.
  381         -
    pub(crate) fn set_hyper_builder(
  382         -
        &mut self,
  383         -
        hyper_builder: Option<hyper_util::client::legacy::Builder>,
  384         -
    ) -> &mut Self {
  385         -
        self.client_builder = hyper_builder;
  386         -
        self
  387         -
    }
  388         -
}
  389         -
  390         -
/// Adapter to use a Hyper 1.0-based Client as an `HttpConnector`
  391         -
///
  392         -
/// This adapter also enables TCP `CONNECT` and HTTP `READ` timeouts via [`HyperConnector::builder`].
  393         -
struct Adapter<C> {
  394         -
    client: timeout_middleware::HttpReadTimeout<
  395         -
        hyper_util::client::legacy::Client<timeout_middleware::ConnectTimeout<C>, SdkBody>,
  396         -
    >,
  397         -
}
  398         -
  399         -
impl<C> fmt::Debug for Adapter<C> {
  400         -
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  401         -
        f.debug_struct("Adapter")
  402         -
            .field("client", &"** hyper client **")
  403         -
            .finish()
  404         -
    }
  405         -
}
  406         -
  407         -
/// Extract a smithy connection from a hyper CaptureConnection
  408         -
fn extract_smithy_connection(capture_conn: &CaptureConnection) -> Option<ConnectionMetadata> {
  409         -
    let capture_conn = capture_conn.clone();
  410         -
    if let Some(conn) = capture_conn.clone().connection_metadata().as_ref() {
  411         -
        let mut extensions = Extensions::new();
  412         -
        conn.get_extras(&mut extensions);
  413         -
        let http_info = extensions.get::<HttpInfo>();
  414         -
        let mut builder = ConnectionMetadata::builder()
  415         -
            .proxied(conn.is_proxied())
  416         -
            .poison_fn(move || match capture_conn.connection_metadata().as_ref() {
  417         -
                Some(conn) => conn.poison(),
  418         -
                None => tracing::trace!("no connection existed to poison"),
  419         -
            });
  420         -
  421         -
        builder
  422         -
            .set_local_addr(http_info.map(|info| info.local_addr()))
  423         -
            .set_remote_addr(http_info.map(|info| info.remote_addr()));
  424         -
  425         -
        let smithy_connection = builder.build();
  426         -
  427         -
        Some(smithy_connection)
  428         -
    } else {
  429         -
        None
  430         -
    }
  431         -
}
  432         -
  433         -
impl<C> HttpConnector for Adapter<C>
  434         -
where
  435         -
    C: Clone + Send + Sync + 'static,
  436         -
    C: tower::Service<Uri>,
  437         -
    C::Response: Connection + Read + Write + Unpin + 'static,
  438         -
    timeout_middleware::ConnectTimeout<C>: Connect,
  439         -
    C::Future: Unpin + Send + 'static,
  440         -
    C::Error: Into<BoxError>,
  441         -
{
  442         -
    fn call(&self, request: HttpRequest) -> HttpConnectorFuture {
  443         -
        let mut request = match request.try_into_http1x() {
  444         -
            Ok(request) => request,
  445         -
            Err(err) => {
  446         -
                return HttpConnectorFuture::ready(Err(ConnectorError::user(err.into())));
  447         -
            }
  448         -
        };
  449         -
        let capture_connection = capture_connection(&mut request);
  450         -
        if let Some(capture_smithy_connection) =
  451         -
            request.extensions().get::<CaptureSmithyConnection>()
  452         -
        {
  453         -
            capture_smithy_connection
  454         -
                .set_connection_retriever(move || extract_smithy_connection(&capture_connection));
  455         -
        }
  456         -
        let mut client = self.client.clone();
  457         -
        use tower::Service;
  458         -
        let fut = client.call(request);
  459         -
        HttpConnectorFuture::new(async move {
  460         -
            let response = fut
  461         -
                .await
  462         -
                .map_err(downcast_error)?
  463         -
                .map(SdkBody::from_body_1_x);
  464         -
            match HttpResponse::try_from(response) {
  465         -
                Ok(response) => Ok(response),
  466         -
                Err(err) => Err(ConnectorError::other(err.into(), None)),
  467         -
            }
  468         -
        })
  469         -
    }
  470         -
}
  471         -
  472         -
/// Downcast errors coming out of hyper into an appropriate `ConnectorError`
  473         -
fn downcast_error(err: BoxError) -> ConnectorError {
  474         -
    // is a `TimedOutError` (from aws_smithy_async::timeout) in the chain? if it is, this is a timeout
  475         -
    if find_source::<TimedOutError>(err.as_ref()).is_some() {
  476         -
        return ConnectorError::timeout(err);
  477         -
    }
  478         -
    // is the top of chain error actually already a `ConnectorError`? return that directly
  479         -
    let err = match err.downcast::<ConnectorError>() {
  480         -
        Ok(connector_error) => return *connector_error,
  481         -
        Err(box_error) => box_error,
  482         -
    };
  483         -
    // generally, the top of chain will probably be a hyper error. Go through a set of hyper specific
  484         -
    // error classifications
  485         -
    let err = match find_source::<hyper::Error>(err.as_ref()) {
  486         -
        Some(hyper_error) => return to_connector_error(hyper_error)(err),
  487         -
        None => err,
  488         -
    };
  489         -
  490         -
    // otherwise, we have no idea!
  491         -
    ConnectorError::other(err, None)
  492         -
}
  493         -
  494         -
/// Convert a [`hyper::Error`] into a [`ConnectorError`]
  495         -
fn to_connector_error(err: &hyper::Error) -> fn(BoxError) -> ConnectorError {
  496         -
    if err.is_timeout() || find_source::<timeout_middleware::HttpTimeoutError>(err).is_some() {
  497         -
        return ConnectorError::timeout;
  498         -
    }
  499         -
    if err.is_user() {
  500         -
        return ConnectorError::user;
  501         -
    }
  502         -
    if err.is_closed() || err.is_canceled() || find_source::<std::io::Error>(err).is_some() {
  503         -
        return ConnectorError::io;
  504         -
    }
  505         -
    // We sometimes receive this from S3: hyper::Error(IncompleteMessage)
  506         -
    if err.is_incomplete_message() {
  507         -
        return |err: BoxError| ConnectorError::other(err, Some(ErrorKind::TransientError));
  508         -
    }
  509         -
  510         -
    if let Some(h2_err) = find_source::<h2::Error>(err) {
  511         -
        if h2_err.is_go_away()
  512         -
            || (h2_err.is_reset() && h2_err.reason() == Some(Reason::REFUSED_STREAM))
  513         -
        {
  514         -
            return ConnectorError::io;
  515         -
        }
  516         -
    }
  517         -
  518         -
    tracing::warn!(err = %DisplayErrorContext(&err), "unrecognized error from Hyper. If this error should be retried, please file an issue.");
  519         -
    |err: BoxError| ConnectorError::other(err, None)
  520         -
}
  521         -
  522         -
fn find_source<'a, E: Error + 'static>(err: &'a (dyn Error + 'static)) -> Option<&'a E> {
  523         -
    let mut next = Some(err);
  524         -
    while let Some(err) = next {
  525         -
        if let Some(matching_err) = err.downcast_ref::<E>() {
  526         -
            return Some(matching_err);
  527         -
        }
  528         -
        next = err.source();
  529         -
    }
  530         -
    None
  531         -
}
  532         -
  533         -
// TODO(https://github.com/awslabs/aws-sdk-rust/issues/1090): CacheKey must also include ptr equality to any
  534         -
// runtime components that are used—sleep_impl as a base (unless we prohibit overriding sleep impl)
  535         -
// If we decide to put a DnsResolver in RuntimeComponents, then we'll need to handle that as well.
  536         -
#[derive(Clone, Debug, Eq, PartialEq, Hash)]
  537         -
struct CacheKey {
  538         -
    connect_timeout: Option<Duration>,
  539         -
    read_timeout: Option<Duration>,
  540         -
}
  541         -
  542         -
impl From<&HttpConnectorSettings> for CacheKey {
  543         -
    fn from(value: &HttpConnectorSettings) -> Self {
  544         -
        Self {
  545         -
            connect_timeout: value.connect_timeout(),
  546         -
            read_timeout: value.read_timeout(),
  547         -
        }
  548         -
    }
  549         -
}
  550         -
  551         -
struct HyperClient<F> {
  552         -
    connector_cache: RwLock<HashMap<CacheKey, SharedHttpConnector>>,
  553         -
    client_builder: hyper_util::client::legacy::Builder,
  554         -
    tcp_connector_fn: F,
  555         -
}
  556         -
  557         -
impl<F> fmt::Debug for HyperClient<F> {
  558         -
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  559         -
        f.debug_struct("HyperClient")
  560         -
            .field("connector_cache", &self.connector_cache)
  561         -
            .field("client_builder", &self.client_builder)
  562         -
            .finish()
  563         -
    }
  564         -
}
  565         -
  566         -
impl<C, F> HttpClient for HyperClient<F>
  567         -
where
  568         -
    F: Fn() -> C + Send + Sync,
  569         -
    C: Clone + Send + Sync + 'static,
  570         -
    C: tower::Service<Uri>,
  571         -
    C::Response: Connection + Read + Write + Send + Sync + Unpin + 'static,
  572         -
    C::Future: Unpin + Send + 'static,
  573         -
    C::Error: Into<BoxError>,
  574         -
{
  575         -
    fn http_connector(
  576         -
        &self,
  577         -
        settings: &HttpConnectorSettings,
  578         -
        components: &RuntimeComponents,
  579         -
    ) -> SharedHttpConnector {
  580         -
        let key = CacheKey::from(settings);
  581         -
        let mut connector = self.connector_cache.read().unwrap().get(&key).cloned();
  582         -
        if connector.is_none() {
  583         -
            let mut cache = self.connector_cache.write().unwrap();
  584         -
            // Short-circuit if another thread already wrote a connector to the cache for this key
  585         -
            if !cache.contains_key(&key) {
  586         -
                let mut builder = HyperConnector::builder()
  587         -
                    .hyper_builder(self.client_builder.clone())
  588         -
                    .connector_settings(settings.clone());
  589         -
                builder.set_sleep_impl(components.sleep_impl());
  590         -
  591         -
                let start = components.time_source().map(|ts| ts.now());
  592         -
                let tcp_connector = (self.tcp_connector_fn)();
  593         -
                let end = components.time_source().map(|ts| ts.now());
  594         -
                if let (Some(start), Some(end)) = (start, end) {
  595         -
                    if let Ok(elapsed) = end.duration_since(start) {
  596         -
                        tracing::debug!("new TCP connector created in {:?}", elapsed);
  597         -
                    }
  598         -
                }
  599         -
                let connector = SharedHttpConnector::new(builder.build(tcp_connector));
  600         -
                cache.insert(key.clone(), connector);
  601         -
            }
  602         -
            connector = cache.get(&key).cloned();
  603         -
        }
  604         -
  605         -
        connector.expect("cache populated above")
  606         -
    }
  607         -
  608         -
    fn validate_base_client_config(
  609         -
        &self,
  610         -
        _: &RuntimeComponentsBuilder,
  611         -
        _: &ConfigBag,
  612         -
    ) -> Result<(), BoxError> {
  613         -
        // Initialize the TCP connector at this point so that native certs load
  614         -
        // at client initialization time instead of upon first request. We do it
  615         -
        // here rather than at construction so that it won't run if this is not
  616         -
        // the selected HTTP client for the base config (for example, if this was
  617         -
        // the default HTTP client, and it was overridden by a later plugin).
  618         -
        let _ = (self.tcp_connector_fn)();
  619         -
        Ok(())
  620         -
    }
  621         -
  622         -
    fn connector_metadata(&self) -> Option<ConnectorMetadata> {
  623         -
        Some(ConnectorMetadata::new("hyper", Some(Cow::Borrowed("1.x"))))
  624         -
    }
  625         -
}
  626         -
  627         -
/// Builder for a hyper-backed [`HttpClient`] implementation.
  628         -
///
  629         -
/// This builder can be used to customize the underlying TCP connector used, as well as
  630         -
/// hyper client configuration.
  631         -
///
  632         -
/// # Examples
  633         -
///
  634         -
/// Construct a Hyper client with the RusTLS TLS implementation.
  635         -
/// This can be useful when you want to share a Hyper connector between multiple
  636         -
/// generated Smithy clients.
  637         -
#[derive(Clone, Default, Debug)]
  638         -
pub struct HyperClientBuilder<Crypto = CryptoUnset> {
  639         -
    client_builder: Option<hyper_util::client::legacy::Builder>,
  640         -
    crypto_provider: Crypto,
  641         -
}
  642         -
  643         -
impl HyperClientBuilder<CryptoProviderSelected> {
  644         -
    /// Create a hyper client using RusTLS for TLS
  645         -
    ///
  646         -
    /// The trusted certificates will be loaded later when this becomes the selected
  647         -
    /// HTTP client for a Smithy client.
  648         -
    pub fn build_https(self) -> SharedHttpClient {
  649         -
        let crypto = self.crypto_provider.crypto_provider;
  650         -
        build_with_fn(self.client_builder, move || {
  651         -
            cached_connectors::cached_https(crypto.clone())
  652         -
        })
  653         -
    }
  654         -
  655         -
    /// Create a hyper client using a custom DNS resolver
  656         -
    pub fn build_with_resolver(
  657         -
        self,
  658         -
        resolver: impl ResolveDns + Clone + 'static,
  659         -
    ) -> SharedHttpClient {
  660         -
        build_with_fn(self.client_builder, move || {
  661         -
            build_connector::https_with_resolver(
  662         -
                self.crypto_provider.crypto_provider.clone(),
  663         -
                resolver.clone(),
  664         -
            )
  665         -
        })
  666         -
    }
  667         -
}
  668         -
  669         -
impl HyperClientBuilder<CryptoUnset> {
  670         -
    /// Creates a new builder.
  671         -
    pub fn new() -> Self {
  672         -
        Self::default()
  673         -
    }
  674         -
  675         -
    pub fn crypto_mode(self, provider: CryptoMode) -> HyperClientBuilder<CryptoProviderSelected> {
  676         -
        HyperClientBuilder {
  677         -
            client_builder: self.client_builder,
  678         -
            crypto_provider: CryptoProviderSelected {
  679         -
                crypto_provider: Inner::Standard(provider),
  680         -
            },
  681         -
        }
  682         -
    }
  683         -
  684         -
    /// This interface will be broken in the future
  685         -
    ///
  686         -
    /// This exposes `CryptoProvider` from `rustls` directly and this API has no stability guarantee.
  687         -
    #[cfg(crypto_unstable)]
  688         -
    pub fn crypto_provider_unstable(
  689         -
        self,
  690         -
        provider: CryptoProvider,
  691         -
    ) -> HyperClientBuilder<CryptoProviderSelected> {
  692         -
        HyperClientBuilder {
  693         -
            client_builder: self.client_builder,
  694         -
            crypto_provider: CryptoProviderSelected {
  695         -
                crypto_provider: Inner::Custom(provider),
  696         -
            },
  697         -
        }
  698         -
    }
  699         -
}
  700         -
  701         -
fn build_with_fn<C, F>(
  702         -
    client_builder: Option<hyper_util::client::legacy::Builder>,
  703         -
    tcp_connector_fn: F,
  704         -
) -> SharedHttpClient
  705         -
where
  706         -
    F: Fn() -> C + Send + Sync + 'static,
  707         -
    C: Clone + Send + Sync + 'static,
  708         -
    C: tower::Service<Uri>,
  709         -
    C::Response: Connection + Read + Write + Send + Sync + Unpin + 'static,
  710         -
    C::Future: Unpin + Send + 'static,
  711         -
    C::Error: Into<BoxError>,
  712         -
    C: Connect,
  713         -
{
  714         -
    SharedHttpClient::new(HyperClient {
  715         -
        connector_cache: RwLock::new(HashMap::new()),
  716         -
        client_builder: client_builder
  717         -
            .unwrap_or_else(|| hyper_util::client::legacy::Builder::new(TokioExecutor::new())),
  718         -
        tcp_connector_fn,
  719         -
    })
  720         -
}
  721         -
  722         -
mod timeout_middleware {
  723         -
    use std::error::Error;
  724         -
    use std::fmt::Formatter;
  725         -
    use std::future::Future;
  726         -
    use std::pin::Pin;
  727         -
    use std::task::{Context, Poll};
  728         -
    use std::time::Duration;
  729         -
  730         -
    use http::Uri;
  731         -
    use pin_project_lite::pin_project;
  732         -
  733         -
    use aws_smithy_async::future::timeout::{TimedOutError, Timeout};
  734         -
    use aws_smithy_async::rt::sleep::Sleep;
  735         -
    use aws_smithy_async::rt::sleep::{AsyncSleep, SharedAsyncSleep};
  736         -
    use aws_smithy_runtime_api::box_error::BoxError;
  737         -
  738         -
    #[derive(Debug)]
  739         -
    pub(crate) struct HttpTimeoutError {
  740         -
        kind: &'static str,
  741         -
        duration: Duration,
  742         -
    }
  743         -
  744         -
    impl std::fmt::Display for HttpTimeoutError {
  745         -
        fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
  746         -
            write!(
  747         -
                f,
  748         -
                "{} timeout occurred after {:?}",
  749         -
                self.kind, self.duration
  750         -
            )
  751         -
        }
  752         -
    }
  753         -
  754         -
    impl Error for HttpTimeoutError {
  755         -
        // We implement the `source` function as returning a `TimedOutError` because when `downcast_error`
  756         -
        // or `find_source` is called with an `HttpTimeoutError` (or another error wrapping an `HttpTimeoutError`)
  757         -
        // this method will be checked to determine if it's a timeout-related error.
  758         -
        fn source(&self) -> Option<&(dyn Error + 'static)> {
  759         -
            Some(&TimedOutError)
  760         -
        }
  761         -
    }
  762         -
  763         -
    /// Timeout wrapper that will timeout on the initial TCP connection
  764         -
    ///
  765         -
    /// # Stability
  766         -
    /// This interface is unstable.
  767         -
    #[derive(Clone, Debug)]
  768         -
    pub(super) struct ConnectTimeout<I> {
  769         -
        inner: I,
  770         -
        timeout: Option<(SharedAsyncSleep, Duration)>,
  771         -
    }
  772         -
  773         -
    impl<I> ConnectTimeout<I> {
  774         -
        /// Create a new `ConnectTimeout` around `inner`.
  775         -
        ///
  776         -
        /// Typically, `I` will implement [`hyper_util::client::legacy::connect::Connect`].
  777         -
        pub(crate) fn new(inner: I, sleep: SharedAsyncSleep, timeout: Duration) -> Self {
  778         -
            Self {
  779         -
                inner,
  780         -
                timeout: Some((sleep, timeout)),
  781         -
            }
  782         -
        }
  783         -
  784         -
        pub(crate) fn no_timeout(inner: I) -> Self {
  785         -
            Self {
  786         -
                inner,
  787         -
                timeout: None,
  788         -
            }
  789         -
        }
  790         -
    }
  791         -
  792         -
    #[derive(Clone, Debug)]
  793         -
    pub(crate) struct HttpReadTimeout<I> {
  794         -
        inner: I,
  795         -
        timeout: Option<(SharedAsyncSleep, Duration)>,
  796         -
    }
  797         -
  798         -
    impl<I> HttpReadTimeout<I> {
  799         -
        /// Create a new `HttpReadTimeout` around `inner`.
  800         -
        ///
  801         -
        /// Typically, `I` will implement [`tower::Service<http::Request<SdkBody>>`].
  802         -
        pub(crate) fn new(inner: I, sleep: SharedAsyncSleep, timeout: Duration) -> Self {
  803         -
            Self {
  804         -
                inner,
  805         -
                timeout: Some((sleep, timeout)),
  806         -
            }
  807         -
        }
  808         -
  809         -
        pub(crate) fn no_timeout(inner: I) -> Self {
  810         -
            Self {
  811         -
                inner,
  812         -
                timeout: None,
  813         -
            }
  814         -
        }
  815         -
    }
  816         -
  817         -
    pin_project! {
  818         -
        /// Timeout future for Tower services
  819         -
        ///
  820         -
        /// Timeout future to handle timing out, mapping errors, and the possibility of not timing out
  821         -
        /// without incurring an additional allocation for each timeout layer.
  822         -
        #[project = MaybeTimeoutFutureProj]
  823         -
        pub enum MaybeTimeoutFuture<F> {
  824         -
            Timeout {
  825         -
                #[pin]
  826         -
                timeout: Timeout<F, Sleep>,
  827         -
                error_type: &'static str,
  828         -
                duration: Duration,
  829         -
            },
  830         -
            NoTimeout {
  831         -
                #[pin]
  832         -
                future: F
  833         -
            }
  834         -
        }
  835         -
    }
  836         -
  837         -
    impl<F, T, E> Future for MaybeTimeoutFuture<F>
  838         -
    where
  839         -
        F: Future<Output = Result<T, E>>,
  840         -
        E: Into<BoxError>,
  841         -
    {
  842         -
        type Output = Result<T, BoxError>;
  843         -
  844         -
        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
  845         -
            let (timeout_future, kind, &mut duration) = match self.project() {
  846         -
                MaybeTimeoutFutureProj::NoTimeout { future } => {
  847         -
                    return future.poll(cx).map_err(|err| err.into());
  848         -
                }
  849         -
                MaybeTimeoutFutureProj::Timeout {
  850         -
                    timeout,
  851         -
                    error_type,
  852         -
                    duration,
  853         -
                } => (timeout, error_type, duration),
  854         -
            };
  855         -
            match timeout_future.poll(cx) {
  856         -
                Poll::Ready(Ok(response)) => Poll::Ready(response.map_err(|err| err.into())),
  857         -
                Poll::Ready(Err(_timeout)) => {
  858         -
                    Poll::Ready(Err(HttpTimeoutError { kind, duration }.into()))
  859         -
                }
  860         -
                Poll::Pending => Poll::Pending,
  861         -
            }
  862         -
        }
  863         -
    }
  864         -
  865         -
    impl<I> tower::Service<Uri> for ConnectTimeout<I>
  866         -
    where
  867         -
        I: tower::Service<Uri>,
  868         -
        I::Error: Into<BoxError>,
  869         -
    {
  870         -
        type Response = I::Response;
  871         -
        type Error = BoxError;
  872         -
        type Future = MaybeTimeoutFuture<I::Future>;
  873         -
  874         -
        fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
  875         -
            self.inner.poll_ready(cx).map_err(|err| err.into())
  876         -
        }
  877         -
  878         -
        fn call(&mut self, req: Uri) -> Self::Future {
  879         -
            match &self.timeout {
  880         -
                Some((sleep, duration)) => {
  881         -
                    let sleep = sleep.sleep(*duration);
  882         -
                    MaybeTimeoutFuture::Timeout {
  883         -
                        timeout: Timeout::new(self.inner.call(req), sleep),
  884         -
                        error_type: "HTTP connect",
  885         -
                        duration: *duration,
  886         -
                    }
  887         -
                }
  888         -
                None => MaybeTimeoutFuture::NoTimeout {
  889         -
                    future: self.inner.call(req),
  890         -
                },
  891         -
            }
  892         -
        }
  893         -
    }
  894         -
  895         -
    impl<I, B> tower::Service<http::Request<B>> for HttpReadTimeout<I>
  896         -
    where
  897         -
        I: tower::Service<http::Request<B>>,
  898         -
        I::Error: Send + Sync + Error + 'static,
  899         -
    {
  900         -
        type Response = I::Response;
  901         -
        type Error = BoxError;
  902         -
        type Future = MaybeTimeoutFuture<I::Future>;
  903         -
  904         -
        fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
  905         -
            self.inner.poll_ready(cx).map_err(|err| err.into())
  906         -
        }
  907         -
  908         -
        fn call(&mut self, req: http::Request<B>) -> Self::Future {
  909         -
            match &self.timeout {
  910         -
                Some((sleep, duration)) => {
  911         -
                    let sleep = sleep.sleep(*duration);
  912         -
                    MaybeTimeoutFuture::Timeout {
  913         -
                        timeout: Timeout::new(self.inner.call(req), sleep),
  914         -
                        error_type: "HTTP read",
  915         -
                        duration: *duration,
  916         -
                    }
  917         -
                }
  918         -
                None => MaybeTimeoutFuture::NoTimeout {
  919         -
                    future: self.inner.call(req),
  920         -
                },
  921         -
            }
  922         -
        }
  923         -
    }
  924         -
  925         -
    #[cfg(test)]
  926         -
    pub(crate) mod test {
  927         -
        use std::time::Duration;
  928         -
  929         -
        use hyper::rt::ReadBufCursor;
  930         -
        use hyper_util::client::legacy::connect::Connected;
  931         -
        use hyper_util::rt::TokioIo;
  932         -
        use tokio::net::TcpStream;
  933         -
  934         -
        use aws_smithy_async::assert_elapsed;
  935         -
        use aws_smithy_async::future::never::Never;
  936         -
        use aws_smithy_async::rt::sleep::{SharedAsyncSleep, TokioSleep};
  937         -
        use aws_smithy_types::error::display::DisplayErrorContext;
  938         -
  939         -
        use super::super::*;
  940         -
  941         -
        #[allow(unused)]
  942         -
        fn connect_timeout_is_correct<T: Send + Sync + Clone + 'static>() {
  943         -
            is_send_sync::<super::ConnectTimeout<T>>();
  944         -
        }
  945         -
  946         -
        #[allow(unused)]
  947         -
        fn is_send_sync<T: Send + Sync>() {}
  948         -
  949         -
        /// A service that will never return whatever it is you want
  950         -
        ///
  951         -
        /// Returned futures will return Pending forever
  952         -
        #[non_exhaustive]
  953         -
        #[derive(Clone, Default, Debug)]
  954         -
        pub(crate) struct NeverConnects;
  955         -
        impl tower::Service<Uri> for NeverConnects {
  956         -
            type Response = TokioIo<TcpStream>;
  957         -
            type Error = ConnectorError;
  958         -
            type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
  959         -
  960         -
            fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
  961         -
                Poll::Ready(Ok(()))
  962         -
            }
  963         -
  964         -
            fn call(&mut self, _uri: Uri) -> Self::Future {
  965         -
                Box::pin(async move {
  966         -
                    Never::new().await;
  967         -
                    unreachable!()
  968         -
                })
  969         -
            }
  970         -
        }
  971         -
  972         -
        /// A service that will connect but never send any data
  973         -
        #[derive(Clone, Debug, Default)]
  974         -
        struct NeverReplies;
  975         -
        impl tower::Service<Uri> for NeverReplies {
  976         -
            type Response = EmptyStream;
  977         -
            type Error = BoxError;
  978         -
            type Future = std::future::Ready<Result<Self::Response, Self::Error>>;
  979         -
  980         -
            fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
  981         -
                Poll::Ready(Ok(()))
  982         -
            }
  983         -
  984         -
            fn call(&mut self, _req: Uri) -> Self::Future {
  985         -
                std::future::ready(Ok(EmptyStream))
  986         -
            }
  987         -
        }
  988         -
  989         -
        /// A stream that will never return or accept any data
  990         -
        #[non_exhaustive]
  991         -
        #[derive(Debug, Default)]
  992         -
        struct EmptyStream;
  993         -
        impl Read for EmptyStream {
  994         -
            fn poll_read(
  995         -
                self: Pin<&mut Self>,
  996         -
                _cx: &mut Context<'_>,
  997         -
                _buf: ReadBufCursor<'_>,
  998         -
            ) -> Poll<Result<(), std::io::Error>> {
  999         -
                Poll::Pending
 1000         -
            }
 1001         -
        }
 1002         -
        impl Write for EmptyStream {
 1003         -
            fn poll_write(
 1004         -
                self: Pin<&mut Self>,
 1005         -
                _cx: &mut Context<'_>,
 1006         -
                _buf: &[u8],
 1007         -
            ) -> Poll<Result<usize, std::io::Error>> {
 1008         -
                Poll::Pending
 1009         -
            }
 1010         -
 1011         -
            fn poll_flush(
 1012         -
                self: Pin<&mut Self>,
 1013         -
                _cx: &mut Context<'_>,
 1014         -
            ) -> Poll<Result<(), std::io::Error>> {
 1015         -
                Poll::Pending
 1016         -
            }
 1017         -
 1018         -
            fn poll_shutdown(
 1019         -
                self: Pin<&mut Self>,
 1020         -
                _cx: &mut Context<'_>,
 1021         -
            ) -> Poll<Result<(), std::io::Error>> {
 1022         -
                Poll::Pending
 1023         -
            }
 1024         -
        }
 1025         -
        impl Connection for EmptyStream {
 1026         -
            fn connected(&self) -> Connected {
 1027         -
                Connected::new()
 1028         -
            }
 1029         -
        }
 1030         -
 1031         -
        #[tokio::test]
 1032         -
        async fn http_connect_timeout_works() {
 1033         -
            let tcp_connector = NeverConnects::default();
 1034         -
            let connector_settings = HttpConnectorSettings::builder()
 1035         -
                .connect_timeout(Duration::from_secs(1))
 1036         -
                .build();
 1037         -
            let hyper = HyperConnector::builder()
 1038         -
                .connector_settings(connector_settings)
 1039         -
                .sleep_impl(SharedAsyncSleep::new(TokioSleep::new()))
 1040         -
                .build(tcp_connector)
 1041         -
                .adapter;
 1042         -
            let now = tokio::time::Instant::now();
 1043         -
            tokio::time::pause();
 1044         -
            let resp = hyper
 1045         -
                .call(HttpRequest::get("https://static-uri.com").unwrap())
 1046         -
                .await
 1047         -
                .unwrap_err();
 1048         -
            assert!(
 1049         -
                resp.is_timeout(),
 1050         -
                "expected resp.is_timeout() to be true but it was false, resp == {:?}",
 1051         -
                resp
 1052         -
            );
 1053         -
            let message = DisplayErrorContext(&resp).to_string();
 1054         -
            let expected =
 1055         -
                "timeout: client error (Connect): HTTP connect timeout occurred after 1s";
 1056         -
            assert!(
 1057         -
                message.contains(expected),
 1058         -
                "expected '{message}' to contain '{expected}'"
 1059         -
            );
 1060         -
            assert_elapsed!(now, Duration::from_secs(1));
 1061         -
        }
 1062         -
 1063         -
        #[tokio::test]
 1064         -
        async fn http_read_timeout_works() {
 1065         -
            let tcp_connector = NeverReplies;
 1066         -
            let connector_settings = HttpConnectorSettings::builder()
 1067         -
                .connect_timeout(Duration::from_secs(1))
 1068         -
                .read_timeout(Duration::from_secs(2))
 1069         -
                .build();
 1070         -
            let hyper = HyperConnector::builder()
 1071         -
                .connector_settings(connector_settings)
 1072         -
                .sleep_impl(SharedAsyncSleep::new(TokioSleep::new()))
 1073         -
                .build(tcp_connector)
 1074         -
                .adapter;
 1075         -
            let now = tokio::time::Instant::now();
 1076         -
            tokio::time::pause();
 1077         -
            let err = hyper
 1078         -
                .call(HttpRequest::get("https://fake-uri.com").unwrap())
 1079         -
                .await
 1080         -
                .unwrap_err();
 1081         -
            assert!(
 1082         -
                err.is_timeout(),
 1083         -
                "expected err.is_timeout() to be true but it was false, err == {err:?}",
 1084         -
            );
 1085         -
            let message = format!("{}", DisplayErrorContext(&err));
 1086         -
            let expected = "timeout: HTTP read timeout occurred after 2s";
 1087         -
            assert!(
 1088         -
                message.contains(expected),
 1089         -
                "expected '{message}' to contain '{expected}'"
 1090         -
            );
 1091         -
            assert_elapsed!(now, Duration::from_secs(2));
 1092         -
        }
 1093         -
    }
 1094         -
}
 1095         -
 1096         -
#[cfg(test)]
 1097         -
mod test {
 1098         -
    use std::io::{Error, ErrorKind};
 1099         -
    use std::pin::Pin;
 1100         -
    use std::sync::atomic::{AtomicU32, Ordering};
 1101         -
    use std::sync::Arc;
 1102         -
    use std::task::{Context, Poll};
 1103         -
 1104         -
    use http::Uri;
 1105         -
    use hyper::rt::ReadBufCursor;
 1106         -
    use hyper_util::client::legacy::connect::Connected;
 1107         -
 1108         -
    use aws_smithy_async::time::SystemTimeSource;
 1109         -
    use aws_smithy_runtime_api::client::runtime_components::RuntimeComponentsBuilder;
 1110         -
 1111         -
    use crate::hyper_1_0::timeout_middleware::test::NeverConnects;
 1112         -
 1113         -
    use super::*;
 1114         -
 1115         -
    #[tokio::test]
 1116         -
    async fn connector_selection() {
 1117         -
        // Create a client that increments a count every time it creates a new HyperConnector
 1118         -
        let creation_count = Arc::new(AtomicU32::new(0));
 1119         -
        let http_client = build_with_fn(None, {
 1120         -
            let count = creation_count.clone();
 1121         -
            move || {
 1122         -
                count.fetch_add(1, Ordering::Relaxed);
 1123         -
                NeverConnects
 1124         -
            }
 1125         -
        });
 1126         -
 1127         -
        // This configuration should result in 4 separate connectors with different timeout settings
 1128         -
        let settings = [
 1129         -
            HttpConnectorSettings::builder()
 1130         -
                .connect_timeout(Duration::from_secs(3))
 1131         -
                .build(),
 1132         -
            HttpConnectorSettings::builder()
 1133         -
                .read_timeout(Duration::from_secs(3))
 1134         -
                .build(),
 1135         -
            HttpConnectorSettings::builder()
 1136         -
                .connect_timeout(Duration::from_secs(3))
 1137         -
                .read_timeout(Duration::from_secs(3))
 1138         -
                .build(),
 1139         -
            HttpConnectorSettings::builder()
 1140         -
                .connect_timeout(Duration::from_secs(5))
 1141         -
                .read_timeout(Duration::from_secs(3))
 1142         -
                .build(),
 1143         -
        ];
 1144         -
 1145         -
        // Kick off thousands of parallel tasks that will try to create a connector
 1146         -
        let components = RuntimeComponentsBuilder::for_tests()
 1147         -
            .with_time_source(Some(SystemTimeSource::new()))
 1148         -
            .build()
 1149         -
            .unwrap();
 1150         -
        let mut handles = Vec::new();
 1151         -
        for setting in &settings {
 1152         -
            for _ in 0..1000 {
 1153         -
                let client = http_client.clone();
 1154         -
                handles.push(tokio::spawn({
 1155         -
                    let setting = setting.clone();
 1156         -
                    let components = components.clone();
 1157         -
                    async move {
 1158         -
                        let _ = client.http_connector(&setting, &components);
 1159         -
                    }
 1160         -
                }));
 1161         -
            }
 1162         -
        }
 1163         -
        for handle in handles {
 1164         -
            handle.await.unwrap();
 1165         -
        }
 1166         -
 1167         -
        // Verify only 4 connectors were created amidst the chaos
 1168         -
        assert_eq!(4, creation_count.load(Ordering::Relaxed));
 1169         -
    }
 1170         -
 1171         -
    #[tokio::test]
 1172         -
    async fn hyper_io_error() {
 1173         -
        let connector = TestConnection {
 1174         -
            inner: HangupStream,
 1175         -
        };
 1176         -
        let adapter = HyperConnector::builder().build(connector).adapter;
 1177         -
        let err = adapter
 1178         -
            .call(HttpRequest::get("https://socket-hangup.com").unwrap())
 1179         -
            .await
 1180         -
            .expect_err("socket hangup");
 1181         -
        assert!(err.is_io(), "unexpected error type: {:?}", err);
 1182         -
    }
 1183         -
 1184         -
    // ---- machinery to make a Hyper connector that responds with an IO Error
 1185         -
    #[derive(Clone)]
 1186         -
    struct HangupStream;
 1187         -
 1188         -
    impl Connection for HangupStream {
 1189         -
        fn connected(&self) -> Connected {
 1190         -
            Connected::new()
 1191         -
        }
 1192         -
    }
 1193         -
 1194         -
    impl Read for HangupStream {
 1195         -
        fn poll_read(
 1196         -
            self: Pin<&mut Self>,
 1197         -
            _cx: &mut Context<'_>,
 1198         -
            _buf: ReadBufCursor<'_>,
 1199         -
        ) -> Poll<std::io::Result<()>> {
 1200         -
            Poll::Ready(Err(Error::new(
 1201         -
                ErrorKind::ConnectionReset,
 1202         -
                "connection reset",
 1203         -
            )))
 1204         -
        }
 1205         -
    }
 1206         -
 1207         -
    impl Write for HangupStream {
 1208         -
        fn poll_write(
 1209         -
            self: Pin<&mut Self>,
 1210         -
            _cx: &mut Context<'_>,
 1211         -
            _buf: &[u8],
 1212         -
        ) -> Poll<Result<usize, Error>> {
 1213         -
            Poll::Pending
 1214         -
        }
 1215         -
 1216         -
        fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
 1217         -
            Poll::Pending
 1218         -
        }
 1219         -
 1220         -
        fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
 1221         -
            Poll::Pending
 1222         -
        }
 1223         -
    }
 1224         -
 1225         -
    #[derive(Clone)]
 1226         -
    struct TestConnection<T> {
 1227         -
        inner: T,
 1228         -
    }
 1229         -
 1230         -
    impl<T> tower::Service<Uri> for TestConnection<T>
 1231         -
    where
 1232         -
        T: Clone + Connection,
 1233         -
    {
 1234         -
        type Response = T;
 1235         -
        type Error = BoxError;
 1236         -
        type Future = std::future::Ready<Result<Self::Response, Self::Error>>;
 1237         -
 1238         -
        fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
 1239         -
            Poll::Ready(Ok(()))
 1240         -
        }
 1241         -
 1242         -
        fn call(&mut self, _req: Uri) -> Self::Future {
 1243         -
            std::future::ready(Ok(self.inner.clone()))
 1244         -
        }
 1245         -
    }
 1246         -
}