aws_smithy_http_client/
client.rs

1/*
2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6mod dns;
7/// Proxy configuration
8pub mod proxy;
9mod timeout;
10/// TLS connector(s)
11pub mod tls;
12
13pub(crate) mod connect;
14
15use crate::cfg::cfg_tls;
16use crate::tls::TlsContext;
17use aws_smithy_async::future::timeout::TimedOutError;
18use aws_smithy_async::rt::sleep::{default_async_sleep, AsyncSleep, SharedAsyncSleep};
19use aws_smithy_runtime_api::box_error::BoxError;
20use aws_smithy_runtime_api::client::connection::CaptureSmithyConnection;
21use aws_smithy_runtime_api::client::connection::ConnectionMetadata;
22use aws_smithy_runtime_api::client::connector_metadata::ConnectorMetadata;
23use aws_smithy_runtime_api::client::http::{
24    HttpClient, HttpConnector, HttpConnectorFuture, HttpConnectorSettings, SharedHttpClient,
25    SharedHttpConnector,
26};
27use aws_smithy_runtime_api::client::orchestrator::{HttpRequest, HttpResponse};
28use aws_smithy_runtime_api::client::result::ConnectorError;
29use aws_smithy_runtime_api::client::runtime_components::{
30    RuntimeComponents, RuntimeComponentsBuilder,
31};
32use aws_smithy_runtime_api::shared::IntoShared;
33use aws_smithy_types::body::SdkBody;
34use aws_smithy_types::config_bag::ConfigBag;
35use aws_smithy_types::error::display::DisplayErrorContext;
36use aws_smithy_types::retry::ErrorKind;
37use client::connect::Connection;
38use h2::Reason;
39use http_1x::{Extensions, Uri};
40use hyper::rt::{Read, Write};
41use hyper_util::client::legacy as client;
42use hyper_util::client::legacy::connect::dns::GaiResolver;
43use hyper_util::client::legacy::connect::{
44    capture_connection, CaptureConnection, Connect, HttpConnector as HyperHttpConnector, HttpInfo,
45};
46use hyper_util::client::proxy::matcher::Matcher;
47use hyper_util::rt::{TokioExecutor, TokioTimer};
48use std::borrow::Cow;
49use std::collections::HashMap;
50use std::error::Error;
51use std::fmt;
52use std::sync::RwLock;
53use std::time::Duration;
54
55/// Given `HttpConnectorSettings` and an `SharedAsyncSleep`, create a `SharedHttpConnector` from defaults depending on what cargo features are activated.
56pub fn default_connector(
57    settings: &HttpConnectorSettings,
58    sleep: Option<SharedAsyncSleep>,
59) -> Option<SharedHttpConnector> {
60    #[cfg(feature = "rustls-aws-lc")]
61    {
62        tracing::trace!(settings = ?settings, sleep = ?sleep, "creating a new default connector");
63        let mut conn_builder = Connector::builder().connector_settings(settings.clone());
64
65        if let Some(sleep) = sleep {
66            conn_builder = conn_builder.sleep_impl(sleep);
67        }
68
69        let conn = conn_builder
70            .tls_provider(tls::Provider::Rustls(
71                tls::rustls_provider::CryptoMode::AwsLc,
72            ))
73            .build();
74        Some(SharedHttpConnector::new(conn))
75    }
76    #[cfg(not(feature = "rustls-aws-lc"))]
77    {
78        tracing::trace!(settings = ?settings, sleep = ?sleep, "no default connector available");
79        None
80    }
81}
82
83/// [`HttpConnector`] used to make HTTP requests.
84///
85/// This connector also implements socket connect and read timeouts.
86///
87/// This shouldn't be used directly in most cases.
88/// See the docs on [`Builder`] for examples of how to customize the HTTP client.
89#[derive(Debug)]
90pub struct Connector {
91    adapter: Box<dyn HttpConnector>,
92}
93
94impl Connector {
95    /// Builder for an HTTP connector.
96    pub fn builder() -> ConnectorBuilder {
97        ConnectorBuilder {
98            enable_tcp_nodelay: true,
99            ..Default::default()
100        }
101    }
102}
103
104impl HttpConnector for Connector {
105    fn call(&self, request: HttpRequest) -> HttpConnectorFuture {
106        self.adapter.call(request)
107    }
108}
109
110/// Builder for [`Connector`].
111#[derive(Default, Debug, Clone)]
112pub struct ConnectorBuilder<Tls = TlsUnset> {
113    connector_settings: Option<HttpConnectorSettings>,
114    sleep_impl: Option<SharedAsyncSleep>,
115    client_builder: Option<hyper_util::client::legacy::Builder>,
116    pool_idle_timeout: Option<Option<Duration>>,
117    enable_tcp_nodelay: bool,
118    interface: Option<String>,
119    proxy_config: Option<proxy::ProxyConfig>,
120    #[allow(unused)]
121    tls: Tls,
122}
123
124/// Initial builder state, `TlsProvider` choice required
125#[derive(Default, Debug, Clone)]
126#[non_exhaustive]
127pub struct TlsUnset {}
128
129/// TLS implementation selected
130#[derive(Debug, Clone)]
131pub struct TlsProviderSelected {
132    #[allow(unused)]
133    provider: tls::Provider,
134    #[allow(unused)]
135    context: TlsContext,
136}
137
138impl ConnectorBuilder<TlsUnset> {
139    /// Set the TLS implementation to use for this connector
140    pub fn tls_provider(self, provider: tls::Provider) -> ConnectorBuilder<TlsProviderSelected> {
141        ConnectorBuilder {
142            connector_settings: self.connector_settings,
143            sleep_impl: self.sleep_impl,
144            client_builder: self.client_builder,
145            enable_tcp_nodelay: self.enable_tcp_nodelay,
146            interface: self.interface,
147            proxy_config: self.proxy_config,
148            pool_idle_timeout: self.pool_idle_timeout,
149            tls: TlsProviderSelected {
150                provider,
151                context: TlsContext::default(),
152            },
153        }
154    }
155
156    /// Build an HTTP connector sans TLS
157    #[doc(hidden)]
158    pub fn build_http(self) -> Connector {
159        if let Some(ref proxy_config) = self.proxy_config {
160            if proxy_config.requires_tls() {
161                tracing::warn!(
162                    "HTTPS proxy configured but no TLS provider set. \
163                     Connections to HTTPS proxy servers will fail. \
164                     Consider configuring a TLS provider to enable TLS support."
165                );
166            }
167        }
168
169        let base = self.base_connector();
170
171        // Wrap with HTTP proxy support if proxy is configured
172        let proxy_config = self
173            .proxy_config
174            .clone()
175            .unwrap_or_else(proxy::ProxyConfig::disabled);
176
177        if !proxy_config.is_disabled() {
178            let http_proxy_connector = connect::HttpProxyConnector::new(base, proxy_config);
179            self.wrap_connector(http_proxy_connector)
180        } else {
181            self.wrap_connector(base)
182        }
183    }
184}
185
186impl<Any> ConnectorBuilder<Any> {
187    /// Create a [`Connector`] from this builder and a given connector.
188    pub(crate) fn wrap_connector<C>(self, tcp_connector: C) -> Connector
189    where
190        C: Send + Sync + 'static,
191        C: Clone,
192        C: tower::Service<Uri>,
193        C::Response: Read + Write + Connection + Send + Sync + Unpin,
194        C: Connect,
195        C::Future: Unpin + Send + 'static,
196        C::Error: Into<BoxError>,
197    {
198        let client_builder = self
199            .client_builder
200            .unwrap_or_else(|| new_tokio_hyper_builder(self.pool_idle_timeout));
201        let sleep_impl = self.sleep_impl.or_else(default_async_sleep);
202        let (connect_timeout, read_timeout) = self
203            .connector_settings
204            .map(|c| (c.connect_timeout(), c.read_timeout()))
205            .unwrap_or((None, None));
206
207        let connector = match connect_timeout {
208            Some(duration) => timeout::ConnectTimeout::new(
209                tcp_connector,
210                sleep_impl
211                    .clone()
212                    .expect("a sleep impl must be provided in order to have a connect timeout"),
213                duration,
214            ),
215            None => timeout::ConnectTimeout::no_timeout(tcp_connector),
216        };
217        let base = client_builder.build(connector);
218        let read_timeout = match read_timeout {
219            Some(duration) => timeout::HttpReadTimeout::new(
220                base,
221                sleep_impl.expect("a sleep impl must be provided in order to have a read timeout"),
222                duration,
223            ),
224            None => timeout::HttpReadTimeout::no_timeout(base),
225        };
226
227        let proxy_matcher = self
228            .proxy_config
229            .as_ref()
230            .map(|config| config.clone().into_hyper_util_matcher());
231
232        Connector {
233            adapter: Box::new(Adapter {
234                client: read_timeout,
235                proxy_matcher,
236            }),
237        }
238    }
239
240    /// Get the base TCP connector by mapping our config to the underlying `HttpConnector` from hyper
241    /// (which is a base TCP connector with no TLS or any wrapping)
242    fn base_connector(&self) -> HyperHttpConnector {
243        self.base_connector_with_resolver(GaiResolver::new())
244    }
245
246    /// Get the base TCP connector by mapping our config to the underlying `HttpConnector` from hyper
247    /// using the given resolver `R`
248    fn base_connector_with_resolver<R>(&self, resolver: R) -> HyperHttpConnector<R> {
249        let mut conn = HyperHttpConnector::new_with_resolver(resolver);
250        conn.set_nodelay(self.enable_tcp_nodelay);
251        #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
252        if let Some(interface) = &self.interface {
253            conn.set_interface(interface);
254        }
255        conn
256    }
257
258    /// Set the async sleep implementation used for timeouts
259    ///
260    /// Calling this is only necessary for testing or to use something other than
261    /// [`default_async_sleep`].
262    pub fn sleep_impl(mut self, sleep_impl: impl AsyncSleep + 'static) -> Self {
263        self.sleep_impl = Some(sleep_impl.into_shared());
264        self
265    }
266
267    /// Set the async sleep implementation used for timeouts
268    ///
269    /// Calling this is only necessary for testing or to use something other than
270    /// [`default_async_sleep`].
271    pub fn set_sleep_impl(&mut self, sleep_impl: Option<SharedAsyncSleep>) -> &mut Self {
272        self.sleep_impl = sleep_impl;
273        self
274    }
275
276    /// Configure the HTTP settings for the `HyperAdapter`
277    pub fn connector_settings(mut self, connector_settings: HttpConnectorSettings) -> Self {
278        self.connector_settings = Some(connector_settings);
279        self
280    }
281
282    /// Configure the HTTP settings for the `HyperAdapter`
283    pub fn set_connector_settings(
284        &mut self,
285        connector_settings: Option<HttpConnectorSettings>,
286    ) -> &mut Self {
287        self.connector_settings = connector_settings;
288        self
289    }
290
291    /// Configure `SO_NODELAY` for all sockets to the supplied value `nodelay`
292    pub fn enable_tcp_nodelay(mut self, nodelay: bool) -> Self {
293        self.enable_tcp_nodelay = nodelay;
294        self
295    }
296
297    /// Configure `SO_NODELAY` for all sockets to the supplied value `nodelay`
298    pub fn set_enable_tcp_nodelay(&mut self, nodelay: bool) -> &mut Self {
299        self.enable_tcp_nodelay = nodelay;
300        self
301    }
302
303    /// Sets the value for the `SO_BINDTODEVICE` option on this socket.
304    ///
305    /// If a socket is bound to an interface, only packets received from that particular
306    /// interface are processed by the socket. Note that this only works for some socket
307    /// types (e.g. `AF_INET` sockets).
308    ///
309    /// On Linux it can be used to specify a [VRF], but the binary needs to either have
310    /// `CAP_NET_RAW` capability set or be run as root.
311    ///
312    /// This function is only available on Android, Fuchsia, and Linux.
313    ///
314    /// [VRF]: https://www.kernel.org/doc/Documentation/networking/vrf.txt
315    #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
316    pub fn set_interface<S: Into<String>>(&mut self, interface: S) -> &mut Self {
317        self.interface = Some(interface.into());
318        self
319    }
320
321    /// Configure proxy settings for this connector
322    ///
323    /// This method allows you to set explicit proxy configuration for the HTTP client.
324    /// The proxy configuration will be used to determine whether requests should be
325    /// routed through a proxy server or connect directly.
326    ///
327    /// # Examples
328    ///
329    /// ```rust
330    /// # #[cfg(feature = "rustls-aws-lc")]
331    /// # {
332    /// use aws_smithy_http_client::{Connector, proxy::ProxyConfig, tls};
333    ///
334    /// let proxy_config = ProxyConfig::http("http://proxy.example.com:8080")?;
335    /// let connector = Connector::builder()
336    ///     .proxy_config(proxy_config)
337    ///     .tls_provider(tls::Provider::Rustls(tls::rustls_provider::CryptoMode::AwsLc))
338    ///     .build();
339    /// # }
340    /// # Ok::<(), Box<dyn std::error::Error>>(())
341    /// ```
342    pub fn proxy_config(mut self, config: proxy::ProxyConfig) -> Self {
343        self.proxy_config = Some(config);
344        self
345    }
346
347    /// Configure proxy settings for this connector
348    ///
349    /// This is the mutable version of [`proxy_config`](Self::proxy_config).
350    pub fn set_proxy_config(&mut self, config: Option<proxy::ProxyConfig>) -> &mut Self {
351        self.proxy_config = config;
352        self
353    }
354
355    /// Set an optional timeout for idle sockets being kept-alive.
356    ///
357    /// Pass `None` to disable timeout.
358    ///
359    /// Defaults to Hyper's default timeout, which is currently 90 seconds - see
360    /// [hyper_util::client::legacy::Builder::pool_idle_timeout],
361    /// but unlike that function, there is no need to call `pool_timer` yourself.
362    ///
363    /// # Examples
364    ///
365    /// ```rust
366    /// # #[cfg(feature = "rustls-aws-lc")]
367    /// # {
368    /// use aws_smithy_http_client::{Connector, tls};
369    /// use std::time::Duration;
370    ///
371    /// let connector = Connector::builder()
372    ///     .pool_idle_timeout(Duration::from_secs(30))
373    ///     .tls_provider(tls::Provider::Rustls(tls::rustls_provider::CryptoMode::AwsLc))
374    ///     .build();
375    /// # }
376    /// # Ok::<(), Box<dyn std::error::Error>>(())
377    /// ```
378    pub fn pool_idle_timeout<D>(mut self, val: D) -> Self
379    where
380        D: Into<Option<Duration>>,
381    {
382        self.pool_idle_timeout = Some(val.into());
383        self
384    }
385
386    /// Set an optional timeout for idle sockets being kept-alive.
387    ///
388    /// Pass `None` to use Hyper's default timeout, `Some(None)` to disable timeouts.
389    ///
390    /// This is the mutable version of [`pool_idle_timeout`](Self::pool_idle_timeout).
391    ///
392    /// # Examples
393    ///
394    /// ```rust
395    /// # #[cfg(feature = "rustls-aws-lc")]
396    /// # {
397    /// use aws_smithy_http_client::{Connector, tls};
398    /// use std::time::Duration;
399    ///
400    /// let mut connector = Connector::builder();
401    /// connector
402    ///     .set_pool_idle_timeout(Some(Some(Duration::from_secs(30))));
403    /// connector
404    ///     .tls_provider(tls::Provider::Rustls(tls::rustls_provider::CryptoMode::AwsLc))
405    ///     .build();
406    /// # }
407    /// # Ok::<(), Box<dyn std::error::Error>>(())
408    /// ```
409    pub fn set_pool_idle_timeout(&mut self, val: Option<Option<Duration>>) -> &mut Self {
410        self.pool_idle_timeout = val;
411        self
412    }
413
414    /// Override the Hyper client [`Builder`](hyper_util::client::legacy::Builder) used to construct this client.
415    ///
416    /// This enables changing settings like forcing HTTP2 and modifying other default client behavior.
417    pub(crate) fn hyper_builder(
418        mut self,
419        hyper_builder: hyper_util::client::legacy::Builder,
420    ) -> Self {
421        self.set_hyper_builder(Some(hyper_builder));
422        self
423    }
424
425    /// Override the Hyper client [`Builder`](hyper_util::client::legacy::Builder) used to construct this client.
426    ///
427    /// This enables changing settings like forcing HTTP2 and modifying other default client behavior.
428    pub(crate) fn set_hyper_builder(
429        &mut self,
430        hyper_builder: Option<hyper_util::client::legacy::Builder>,
431    ) -> &mut Self {
432        self.client_builder = hyper_builder;
433        self
434    }
435}
436
437/// Adapter to use a Hyper 1.0-based Client as an `HttpConnector`
438///
439/// This adapter also enables TCP `CONNECT` and HTTP `READ` timeouts via [`Connector::builder`].
440struct Adapter<C> {
441    client: timeout::HttpReadTimeout<
442        hyper_util::client::legacy::Client<timeout::ConnectTimeout<C>, SdkBody>,
443    >,
444    proxy_matcher: Option<Matcher>,
445}
446
447impl<C> fmt::Debug for Adapter<C> {
448    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
449        f.debug_struct("Adapter")
450            .field("client", &"** hyper client **")
451            .field("proxy_matcher", &self.proxy_matcher.is_some())
452            .finish()
453    }
454}
455
456/// Extract a smithy connection from a hyper CaptureConnection
457fn extract_smithy_connection(capture_conn: &CaptureConnection) -> Option<ConnectionMetadata> {
458    let capture_conn = capture_conn.clone();
459    if let Some(conn) = capture_conn.clone().connection_metadata().as_ref() {
460        let mut extensions = Extensions::new();
461        conn.get_extras(&mut extensions);
462        let http_info = extensions.get::<HttpInfo>();
463        let mut builder = ConnectionMetadata::builder()
464            .proxied(conn.is_proxied())
465            .poison_fn(move || match capture_conn.connection_metadata().as_ref() {
466                Some(conn) => conn.poison(),
467                None => tracing::trace!("no connection existed to poison"),
468            });
469
470        builder
471            .set_local_addr(http_info.map(|info| info.local_addr()))
472            .set_remote_addr(http_info.map(|info| info.remote_addr()));
473
474        let smithy_connection = builder.build();
475
476        Some(smithy_connection)
477    } else {
478        None
479    }
480}
481
482fn new_tokio_hyper_builder(
483    pool_idle_timeout: Option<Option<Duration>>,
484) -> hyper_util::client::legacy::Builder {
485    let mut builder = hyper_util::client::legacy::Builder::new(TokioExecutor::new());
486    // Explicitly setting the pool_timer is required for connection timeouts to work.
487    builder.pool_timer(TokioTimer::new());
488
489    if let Some(pool_idle_timeout) = pool_idle_timeout {
490        builder.pool_idle_timeout(pool_idle_timeout);
491    }
492
493    builder
494}
495
496impl<C> Adapter<C> {
497    /// Add proxy authentication header to the request if needed
498    fn add_proxy_auth_header(&self, request: &mut http_1x::Request<SdkBody>) {
499        // Only add auth for HTTP requests (not HTTPS which uses CONNECT tunneling)
500        if request.uri().scheme() != Some(&http_1x::uri::Scheme::HTTP) {
501            return;
502        }
503
504        // Don't override existing proxy authorization header
505        if request
506            .headers()
507            .contains_key(http_1x::header::PROXY_AUTHORIZATION)
508        {
509            return;
510        }
511
512        if let Some(ref matcher) = self.proxy_matcher {
513            if let Some(intercept) = matcher.intercept(request.uri()) {
514                // Add basic auth header if available
515                if let Some(auth_header) = intercept.basic_auth() {
516                    request
517                        .headers_mut()
518                        .insert(http_1x::header::PROXY_AUTHORIZATION, auth_header.clone());
519                    tracing::debug!("added proxy authentication header for {}", request.uri());
520                }
521            }
522        }
523    }
524}
525
526impl<C> HttpConnector for Adapter<C>
527where
528    C: Clone + Send + Sync + 'static,
529    C: tower::Service<Uri>,
530    C::Response: Connection + Read + Write + Unpin + 'static,
531    timeout::ConnectTimeout<C>: Connect,
532    C::Future: Unpin + Send + 'static,
533    C::Error: Into<BoxError>,
534{
535    fn call(&self, request: HttpRequest) -> HttpConnectorFuture {
536        let mut request = match request.try_into_http1x() {
537            Ok(request) => request,
538            Err(err) => {
539                return HttpConnectorFuture::ready(Err(ConnectorError::user(err.into())));
540            }
541        };
542
543        self.add_proxy_auth_header(&mut request);
544
545        let capture_connection = capture_connection(&mut request);
546        if let Some(capture_smithy_connection) =
547            request.extensions().get::<CaptureSmithyConnection>()
548        {
549            capture_smithy_connection
550                .set_connection_retriever(move || extract_smithy_connection(&capture_connection));
551        }
552        let mut client = self.client.clone();
553        use tower::Service;
554        let fut = client.call(request);
555        HttpConnectorFuture::new(async move {
556            let response = fut
557                .await
558                .map_err(downcast_error)?
559                .map(SdkBody::from_body_1_x);
560            match HttpResponse::try_from(response) {
561                Ok(response) => Ok(response),
562                Err(err) => Err(ConnectorError::other(err.into(), None)),
563            }
564        })
565    }
566}
567
568/// Downcast errors coming out of hyper into an appropriate `ConnectorError`
569fn downcast_error(err: BoxError) -> ConnectorError {
570    // is a `TimedOutError` (from aws_smithy_async::timeout) in the chain? if it is, this is a timeout
571    if find_source::<TimedOutError>(err.as_ref()).is_some() {
572        return ConnectorError::timeout(err);
573    }
574    // is the top of chain error actually already a `ConnectorError`? return that directly
575    let err = match err.downcast::<ConnectorError>() {
576        Ok(connector_error) => return *connector_error,
577        Err(box_error) => box_error,
578    };
579    // generally, the top of chain will probably be a hyper error. Go through a set of hyper specific
580    // error classifications
581    let err = match find_source::<hyper::Error>(err.as_ref()) {
582        Some(hyper_error) => return to_connector_error(hyper_error)(err),
583        None => match find_source::<hyper_util::client::legacy::Error>(err.as_ref()) {
584            Some(hyper_util_err) => {
585                if hyper_util_err.is_connect()
586                    || find_source::<std::io::Error>(hyper_util_err).is_some()
587                {
588                    return ConnectorError::io(err);
589                }
590                err
591            }
592            None => err,
593        },
594    };
595
596    // otherwise, we have no idea!
597    ConnectorError::other(err, None)
598}
599
600/// Convert a [`hyper::Error`] into a [`ConnectorError`]
601fn to_connector_error(err: &hyper::Error) -> fn(BoxError) -> ConnectorError {
602    if err.is_timeout() || find_source::<timeout::HttpTimeoutError>(err).is_some() {
603        return ConnectorError::timeout;
604    }
605    if err.is_user() {
606        return ConnectorError::user;
607    }
608    if err.is_closed() || err.is_canceled() || find_source::<std::io::Error>(err).is_some() {
609        return ConnectorError::io;
610    }
611    // We sometimes receive this from S3: hyper::Error(IncompleteMessage)
612    if err.is_incomplete_message() {
613        return |err: BoxError| ConnectorError::other(err, Some(ErrorKind::TransientError));
614    }
615
616    if let Some(h2_err) = find_source::<h2::Error>(err) {
617        if h2_err.is_go_away()
618            || (h2_err.is_reset() && h2_err.reason() == Some(Reason::REFUSED_STREAM))
619        {
620            return ConnectorError::io;
621        }
622    }
623
624    tracing::warn!(err = %DisplayErrorContext(&err), "unrecognized error from Hyper. If this error should be retried, please file an issue.");
625    |err: BoxError| ConnectorError::other(err, None)
626}
627
628fn find_source<'a, E: Error + 'static>(err: &'a (dyn Error + 'static)) -> Option<&'a E> {
629    let mut next = Some(err);
630    while let Some(err) = next {
631        if let Some(matching_err) = err.downcast_ref::<E>() {
632            return Some(matching_err);
633        }
634        next = err.source();
635    }
636    None
637}
638
639// TODO(https://github.com/awslabs/aws-sdk-rust/issues/1090): CacheKey must also include ptr equality to any
640// runtime components that are used—sleep_impl as a base (unless we prohibit overriding sleep impl)
641// If we decide to put a DnsResolver in RuntimeComponents, then we'll need to handle that as well.
642#[derive(Clone, Debug, Eq, PartialEq, Hash)]
643struct CacheKey {
644    connect_timeout: Option<Duration>,
645    read_timeout: Option<Duration>,
646}
647
648impl From<&HttpConnectorSettings> for CacheKey {
649    fn from(value: &HttpConnectorSettings) -> Self {
650        Self {
651            connect_timeout: value.connect_timeout(),
652            read_timeout: value.read_timeout(),
653        }
654    }
655}
656
657struct HyperClient<F> {
658    connector_cache: RwLock<HashMap<CacheKey, SharedHttpConnector>>,
659    client_builder: hyper_util::client::legacy::Builder,
660    connector_fn: F,
661}
662
663impl<F> fmt::Debug for HyperClient<F> {
664    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
665        f.debug_struct("HyperClient")
666            .field("connector_cache", &self.connector_cache)
667            .field("client_builder", &self.client_builder)
668            .finish()
669    }
670}
671
672impl<F> HttpClient for HyperClient<F>
673where
674    F: Fn(
675            hyper_util::client::legacy::Builder,
676            Option<&HttpConnectorSettings>,
677            Option<&RuntimeComponents>,
678        ) -> Connector
679        + Send
680        + Sync
681        + 'static,
682{
683    fn http_connector(
684        &self,
685        settings: &HttpConnectorSettings,
686        components: &RuntimeComponents,
687    ) -> SharedHttpConnector {
688        let key = CacheKey::from(settings);
689        let mut connector = self.connector_cache.read().unwrap().get(&key).cloned();
690        if connector.is_none() {
691            let mut cache = self.connector_cache.write().unwrap();
692            // Short-circuit if another thread already wrote a connector to the cache for this key
693            if !cache.contains_key(&key) {
694                let start = components.time_source().map(|ts| ts.now());
695                let connector = (self.connector_fn)(
696                    self.client_builder.clone(),
697                    Some(settings),
698                    Some(components),
699                );
700                let end = components.time_source().map(|ts| ts.now());
701                if let (Some(start), Some(end)) = (start, end) {
702                    if let Ok(elapsed) = end.duration_since(start) {
703                        tracing::debug!("new connector created in {:?}", elapsed);
704                    }
705                }
706                let connector = SharedHttpConnector::new(connector);
707                cache.insert(key.clone(), connector);
708            }
709            connector = cache.get(&key).cloned();
710        }
711
712        connector.expect("cache populated above")
713    }
714
715    fn validate_base_client_config(
716        &self,
717        _: &RuntimeComponentsBuilder,
718        _: &ConfigBag,
719    ) -> Result<(), BoxError> {
720        // Initialize the TCP connector at this point so that native certs load
721        // at client initialization time instead of upon first request. We do it
722        // here rather than at construction so that it won't run if this is not
723        // the selected HTTP client for the base config (for example, if this was
724        // the default HTTP client, and it was overridden by a later plugin).
725        let _ = (self.connector_fn)(self.client_builder.clone(), None, None);
726        Ok(())
727    }
728
729    fn connector_metadata(&self) -> Option<ConnectorMetadata> {
730        Some(ConnectorMetadata::new("hyper", Some(Cow::Borrowed("1.x"))))
731    }
732}
733
734/// Builder for a hyper-backed [`HttpClient`] implementation.
735///
736/// This builder can be used to customize the underlying TCP connector used, as well as
737/// hyper client configuration.
738///
739/// # Examples
740///
741/// Construct a Hyper client with the RusTLS TLS implementation.
742/// This can be useful when you want to share a Hyper connector between multiple
743/// generated Smithy clients.
744#[derive(Clone, Default, Debug)]
745pub struct Builder<Tls = TlsUnset> {
746    client_builder: Option<hyper_util::client::legacy::Builder>,
747    pool_idle_timeout: Option<Option<Duration>>,
748    #[allow(unused)]
749    tls_provider: Tls,
750}
751
752cfg_tls! {
753    use aws_smithy_runtime_api::client::dns::ResolveDns;
754
755    impl ConnectorBuilder<TlsProviderSelected> {
756        /// Build a [`Connector`] that will use the default DNS resolver implementation.
757        pub fn build(self) -> Connector {
758            let http_connector = self.base_connector();
759            self.build_https(http_connector)
760        }
761
762        /// Configure the TLS context
763        pub fn tls_context(mut self, ctx: TlsContext) -> Self {
764            self.tls.context = ctx;
765            self
766        }
767
768        /// Configure the TLS context
769        pub fn set_tls_context(&mut self, ctx: TlsContext) -> &mut Self {
770            self.tls.context = ctx;
771            self
772        }
773
774        /// Build a [`Connector`] that will use the given DNS resolver implementation.
775        pub fn build_with_resolver<R: ResolveDns + Clone + 'static>(self, resolver: R) -> Connector {
776            use crate::client::dns::HyperUtilResolver;
777            let http_connector = self.base_connector_with_resolver(HyperUtilResolver { resolver });
778            self.build_https(http_connector)
779        }
780
781        fn build_https<R>(self, http_connector: HyperHttpConnector<R>) -> Connector
782        where
783            R: Clone + Send + Sync + 'static,
784            R: tower::Service<hyper_util::client::legacy::connect::dns::Name>,
785            R::Response: Iterator<Item = std::net::SocketAddr>,
786            R::Future: Send,
787            R::Error: Into<Box<dyn Error + Send + Sync>>,
788        {
789            match &self.tls.provider {
790                // TODO(hyper1) - fix cfg_rustls! to allow matching on patterns so we can re-use it and not duplicate these cfg matches everywhere
791                #[cfg(any(
792                    feature = "rustls-aws-lc",
793                    feature = "rustls-aws-lc-fips",
794                    feature = "rustls-ring"
795                ))]
796                tls::Provider::Rustls(crypto_mode) => {
797                    let proxy_config = self.proxy_config.clone()
798                        .unwrap_or_else(proxy::ProxyConfig::disabled);
799
800                    let https_connector = tls::rustls_provider::build_connector::wrap_connector(
801                        http_connector,
802                        crypto_mode.clone(),
803                        &self.tls.context,
804                        proxy_config,
805                    );
806                    self.wrap_connector(https_connector)
807                },
808                #[cfg(feature = "s2n-tls")]
809                tls::Provider::S2nTls  => {
810                    let proxy_config = self.proxy_config.clone()
811                        .unwrap_or_else(proxy::ProxyConfig::disabled);
812
813                    let https_connector = tls::s2n_tls_provider::build_connector::wrap_connector(
814                        http_connector,
815                        &self.tls.context,
816                        proxy_config,
817                    );
818                    self.wrap_connector(https_connector)
819                }
820            }
821        }
822    }
823
824    impl Builder<TlsProviderSelected> {
825        /// Create an HTTPS client with the selected TLS provider.
826        ///
827        /// The trusted certificates will be loaded later when this becomes the selected
828        /// HTTP client for a Smithy client.
829        pub fn build_https(self) -> SharedHttpClient {
830            build_with_conn_fn(
831                self.client_builder,
832                self.pool_idle_timeout,
833                move |client_builder, settings, runtime_components| {
834                    let builder = new_conn_builder(client_builder, settings, runtime_components)
835                        .tls_provider(self.tls_provider.provider.clone())
836                        .tls_context(self.tls_provider.context.clone());
837                    builder.build()
838                },
839            )
840        }
841
842        /// Create an HTTPS client using a custom DNS resolver
843        pub fn build_with_resolver(
844            self,
845            resolver: impl ResolveDns + Clone + 'static,
846        ) -> SharedHttpClient {
847            build_with_conn_fn(
848                self.client_builder,
849                self.pool_idle_timeout,
850                move |client_builder, settings, runtime_components| {
851                    let builder = new_conn_builder(client_builder, settings, runtime_components)
852                        .tls_provider(self.tls_provider.provider.clone())
853                        .tls_context(self.tls_provider.context.clone());
854                    builder.build_with_resolver(resolver.clone())
855                },
856            )
857        }
858
859        /// Configure the TLS context
860        pub fn tls_context(mut self, ctx: TlsContext) -> Self {
861            self.tls_provider.context = ctx;
862            self
863        }
864    }
865}
866
867impl<Any> Builder<Any> {
868    /// Set an optional timeout for idle sockets being kept-alive.
869    ///
870    /// Pass `None` to disable timeout.
871    ///
872    /// Defaults to Hyper's default timeout, which is currently 90 seconds - see
873    /// [hyper_util::client::legacy::Builder::pool_idle_timeout],
874    /// but unlike that function, there is no need to call `pool_timer` yourself.
875    ///
876    /// # Examples
877    ///
878    /// ```rust
879    /// # #[cfg(feature = "rustls-aws-lc")]
880    /// # {
881    /// use aws_smithy_http_client::{Builder, tls};
882    /// use std::time::Duration;
883    ///
884    /// let client = Builder::new()
885    ///     .pool_idle_timeout(Duration::from_secs(30))
886    ///     .tls_provider(tls::Provider::Rustls(tls::rustls_provider::CryptoMode::AwsLc))
887    ///     .build_https();
888    /// # }
889    /// # Ok::<(), Box<dyn std::error::Error>>(())
890    /// ```
891    pub fn pool_idle_timeout<D>(mut self, val: D) -> Self
892    where
893        D: Into<Option<Duration>>,
894    {
895        self.pool_idle_timeout = Some(val.into());
896        self
897    }
898
899    /// Set an optional timeout for idle sockets being kept-alive.
900    ///
901    /// Pass `None` to use Hyper's default timeout, `Some(None)` to disable timeouts.
902    ///
903    /// This is the mutable version of [`pool_idle_timeout`](Self::pool_idle_timeout).
904    ///
905    /// # Examples
906    ///
907    /// ```rust
908    /// # #[cfg(feature = "rustls-aws-lc")]
909    /// # {
910    /// use std::time::Duration;
911    /// use aws_smithy_http_client::{Builder, tls};
912    ///
913    /// let mut client = Builder::new();
914    /// client.set_pool_idle_timeout(Some(Some(Duration::from_secs(30))));
915    /// client
916    ///     .tls_provider(tls::Provider::Rustls(tls::rustls_provider::CryptoMode::AwsLc))
917    ///     .build_https();
918    /// # }
919    /// # Ok::<(), Box<dyn std::error::Error>>(())
920    /// ```
921    pub fn set_pool_idle_timeout(&mut self, val: Option<Option<Duration>>) -> &mut Self {
922        self.pool_idle_timeout = val;
923        self
924    }
925}
926
927impl Builder<TlsUnset> {
928    /// Creates a new builder.
929    pub fn new() -> Self {
930        Self::default()
931    }
932
933    /// Returns a [`SharedHttpClient`] that calls the given `connector` function to select an HTTP(S) connector.
934    #[doc(hidden)]
935    pub fn build_with_connector_fn<F>(self, connector_fn: F) -> SharedHttpClient
936    where
937        F: Fn(Option<&HttpConnectorSettings>, Option<&RuntimeComponents>) -> Connector
938            + Send
939            + Sync
940            + 'static,
941    {
942        build_with_conn_fn(
943            self.client_builder,
944            self.pool_idle_timeout,
945            move |_builder, settings, runtime_components| {
946                connector_fn(settings, runtime_components)
947            },
948        )
949    }
950
951    /// Build a new HTTP client without TLS enabled
952    #[doc(hidden)]
953    pub fn build_http(self) -> SharedHttpClient {
954        build_with_conn_fn(
955            self.client_builder,
956            self.pool_idle_timeout,
957            move |client_builder, settings, runtime_components| {
958                let builder = new_conn_builder(client_builder, settings, runtime_components);
959                builder.build_http()
960            },
961        )
962    }
963
964    /// Set the TLS implementation to use
965    pub fn tls_provider(self, provider: tls::Provider) -> Builder<TlsProviderSelected> {
966        Builder {
967            client_builder: self.client_builder,
968            pool_idle_timeout: self.pool_idle_timeout,
969            tls_provider: TlsProviderSelected {
970                provider,
971                context: TlsContext::default(),
972            },
973        }
974    }
975}
976
977pub(crate) fn build_with_conn_fn<F>(
978    client_builder: Option<hyper_util::client::legacy::Builder>,
979    pool_idle_timeout: Option<Option<Duration>>,
980    connector_fn: F,
981) -> SharedHttpClient
982where
983    F: Fn(
984            hyper_util::client::legacy::Builder,
985            Option<&HttpConnectorSettings>,
986            Option<&RuntimeComponents>,
987        ) -> Connector
988        + Send
989        + Sync
990        + 'static,
991{
992    let client_builder =
993        client_builder.unwrap_or_else(|| new_tokio_hyper_builder(pool_idle_timeout));
994    SharedHttpClient::new(HyperClient {
995        connector_cache: RwLock::new(HashMap::new()),
996        client_builder,
997        connector_fn,
998    })
999}
1000
1001#[allow(dead_code)]
1002pub(crate) fn build_with_tcp_conn_fn<C, F>(
1003    client_builder: Option<hyper_util::client::legacy::Builder>,
1004    pool_idle_timeout: Option<Option<Duration>>,
1005    tcp_connector_fn: F,
1006) -> SharedHttpClient
1007where
1008    F: Fn() -> C + Send + Sync + 'static,
1009    C: Clone + Send + Sync + 'static,
1010    C: tower::Service<Uri>,
1011    C::Response: Connection + Read + Write + Send + Sync + Unpin + 'static,
1012    C::Future: Unpin + Send + 'static,
1013    C::Error: Into<BoxError>,
1014    C: Connect,
1015{
1016    build_with_conn_fn(
1017        client_builder,
1018        pool_idle_timeout,
1019        move |client_builder, settings, runtime_components| {
1020            let builder = new_conn_builder(client_builder, settings, runtime_components);
1021            builder.wrap_connector(tcp_connector_fn())
1022        },
1023    )
1024}
1025
1026fn new_conn_builder(
1027    client_builder: hyper_util::client::legacy::Builder,
1028    settings: Option<&HttpConnectorSettings>,
1029    runtime_components: Option<&RuntimeComponents>,
1030) -> ConnectorBuilder {
1031    let mut builder = Connector::builder().hyper_builder(client_builder);
1032    builder.set_connector_settings(settings.cloned());
1033    if let Some(components) = runtime_components {
1034        builder.set_sleep_impl(components.sleep_impl());
1035    }
1036    builder
1037}
1038
1039#[cfg(test)]
1040mod test {
1041    use std::io::{Error, ErrorKind};
1042    use std::pin::Pin;
1043    use std::sync::atomic::{AtomicU32, Ordering};
1044    use std::sync::Arc;
1045    use std::task::{Context, Poll};
1046
1047    use crate::client::timeout::test::NeverConnects;
1048    use aws_smithy_async::assert_elapsed;
1049    use aws_smithy_async::rt::sleep::TokioSleep;
1050    use aws_smithy_async::time::SystemTimeSource;
1051    use aws_smithy_runtime_api::client::runtime_components::RuntimeComponentsBuilder;
1052    use http_1x::Uri;
1053    use hyper::rt::ReadBufCursor;
1054    use hyper_util::client::legacy::connect::Connected;
1055
1056    use super::*;
1057
1058    #[tokio::test]
1059    async fn connector_selection() {
1060        // Create a client that increments a count every time it creates a new Connector
1061        let creation_count = Arc::new(AtomicU32::new(0));
1062        let http_client = build_with_tcp_conn_fn(None, None, {
1063            let count = creation_count.clone();
1064            move || {
1065                count.fetch_add(1, Ordering::Relaxed);
1066                NeverConnects
1067            }
1068        });
1069
1070        // This configuration should result in 4 separate connectors with different timeout settings
1071        let settings = [
1072            HttpConnectorSettings::builder()
1073                .connect_timeout(Duration::from_secs(3))
1074                .build(),
1075            HttpConnectorSettings::builder()
1076                .read_timeout(Duration::from_secs(3))
1077                .build(),
1078            HttpConnectorSettings::builder()
1079                .connect_timeout(Duration::from_secs(3))
1080                .read_timeout(Duration::from_secs(3))
1081                .build(),
1082            HttpConnectorSettings::builder()
1083                .connect_timeout(Duration::from_secs(5))
1084                .read_timeout(Duration::from_secs(3))
1085                .build(),
1086        ];
1087
1088        // Kick off thousands of parallel tasks that will try to create a connector
1089        let components = RuntimeComponentsBuilder::for_tests()
1090            .with_time_source(Some(SystemTimeSource::new()))
1091            .build()
1092            .unwrap();
1093        let mut handles = Vec::new();
1094        for setting in &settings {
1095            for _ in 0..1000 {
1096                let client = http_client.clone();
1097                handles.push(tokio::spawn({
1098                    let setting = setting.clone();
1099                    let components = components.clone();
1100                    async move {
1101                        let _ = client.http_connector(&setting, &components);
1102                    }
1103                }));
1104            }
1105        }
1106        for handle in handles {
1107            handle.await.unwrap();
1108        }
1109
1110        // Verify only 4 connectors were created amidst the chaos
1111        assert_eq!(4, creation_count.load(Ordering::Relaxed));
1112    }
1113
1114    #[tokio::test]
1115    async fn hyper_io_error() {
1116        let connector = TestConnection {
1117            inner: HangupStream,
1118        };
1119        let adapter = Connector::builder().wrap_connector(connector).adapter;
1120        let err = adapter
1121            .call(HttpRequest::get("https://socket-hangup.com").unwrap())
1122            .await
1123            .expect_err("socket hangup");
1124        assert!(err.is_io(), "unexpected error type: {:?}", err);
1125    }
1126
1127    // ---- machinery to make a Hyper connector that responds with an IO Error
1128    #[derive(Clone)]
1129    struct HangupStream;
1130
1131    impl Connection for HangupStream {
1132        fn connected(&self) -> Connected {
1133            Connected::new()
1134        }
1135    }
1136
1137    impl Read for HangupStream {
1138        fn poll_read(
1139            self: Pin<&mut Self>,
1140            _cx: &mut Context<'_>,
1141            _buf: ReadBufCursor<'_>,
1142        ) -> Poll<std::io::Result<()>> {
1143            Poll::Ready(Err(Error::new(
1144                ErrorKind::ConnectionReset,
1145                "connection reset",
1146            )))
1147        }
1148    }
1149
1150    impl Write for HangupStream {
1151        fn poll_write(
1152            self: Pin<&mut Self>,
1153            _cx: &mut Context<'_>,
1154            _buf: &[u8],
1155        ) -> Poll<Result<usize, Error>> {
1156            Poll::Pending
1157        }
1158
1159        fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
1160            Poll::Pending
1161        }
1162
1163        fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
1164            Poll::Pending
1165        }
1166    }
1167
1168    #[derive(Clone)]
1169    struct TestConnection<T> {
1170        inner: T,
1171    }
1172
1173    impl<T> tower::Service<Uri> for TestConnection<T>
1174    where
1175        T: Clone + Connection,
1176    {
1177        type Response = T;
1178        type Error = BoxError;
1179        type Future = std::future::Ready<Result<Self::Response, Self::Error>>;
1180
1181        fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1182            Poll::Ready(Ok(()))
1183        }
1184
1185        fn call(&mut self, _req: Uri) -> Self::Future {
1186            std::future::ready(Ok(self.inner.clone()))
1187        }
1188    }
1189
1190    #[tokio::test]
1191    async fn http_connect_timeout_works() {
1192        let tcp_connector = NeverConnects::default();
1193        let connector_settings = HttpConnectorSettings::builder()
1194            .connect_timeout(Duration::from_secs(1))
1195            .build();
1196        let hyper = Connector::builder()
1197            .connector_settings(connector_settings)
1198            .sleep_impl(SharedAsyncSleep::new(TokioSleep::new()))
1199            .wrap_connector(tcp_connector)
1200            .adapter;
1201        let now = tokio::time::Instant::now();
1202        tokio::time::pause();
1203        let resp = hyper
1204            .call(HttpRequest::get("https://static-uri.com").unwrap())
1205            .await
1206            .unwrap_err();
1207        assert!(
1208            resp.is_timeout(),
1209            "expected resp.is_timeout() to be true but it was false, resp == {:?}",
1210            resp
1211        );
1212        let message = DisplayErrorContext(&resp).to_string();
1213        let expected = "timeout: client error (Connect): HTTP connect timeout occurred after 1s";
1214        assert!(
1215            message.contains(expected),
1216            "expected '{message}' to contain '{expected}'"
1217        );
1218        assert_elapsed!(now, Duration::from_secs(1));
1219    }
1220
1221    #[tokio::test]
1222    async fn http_read_timeout_works() {
1223        let tcp_connector = crate::client::timeout::test::NeverReplies;
1224        let connector_settings = HttpConnectorSettings::builder()
1225            .connect_timeout(Duration::from_secs(1))
1226            .read_timeout(Duration::from_secs(2))
1227            .build();
1228        let hyper = Connector::builder()
1229            .connector_settings(connector_settings)
1230            .sleep_impl(SharedAsyncSleep::new(TokioSleep::new()))
1231            .wrap_connector(tcp_connector)
1232            .adapter;
1233        let now = tokio::time::Instant::now();
1234        tokio::time::pause();
1235        let err = hyper
1236            .call(HttpRequest::get("https://fake-uri.com").unwrap())
1237            .await
1238            .unwrap_err();
1239        assert!(
1240            err.is_timeout(),
1241            "expected err.is_timeout() to be true but it was false, err == {err:?}",
1242        );
1243        let message = format!("{}", DisplayErrorContext(&err));
1244        let expected = "timeout: HTTP read timeout occurred after 2s";
1245        assert!(
1246            message.contains(expected),
1247            "expected '{message}' to contain '{expected}'"
1248        );
1249        assert_elapsed!(now, Duration::from_secs(2));
1250    }
1251
1252    #[cfg(not(windows))]
1253    #[tokio::test]
1254    async fn connection_refused_works() {
1255        use crate::client::dns::HyperUtilResolver;
1256        use aws_smithy_runtime_api::client::dns::{DnsFuture, ResolveDns};
1257        use std::net::{IpAddr, Ipv4Addr};
1258
1259        #[derive(Debug, Clone, Default)]
1260        struct TestResolver;
1261        impl ResolveDns for TestResolver {
1262            fn resolve_dns<'a>(&'a self, _name: &'a str) -> DnsFuture<'a> {
1263                let localhost_v4 = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
1264                DnsFuture::ready(Ok(vec![localhost_v4]))
1265            }
1266        }
1267
1268        let connector_settings = HttpConnectorSettings::builder()
1269            .connect_timeout(Duration::from_secs(20))
1270            .build();
1271
1272        let resolver = HyperUtilResolver {
1273            resolver: TestResolver,
1274        };
1275        let connector = Connector::builder().base_connector_with_resolver(resolver);
1276
1277        let hyper = Connector::builder()
1278            .connector_settings(connector_settings)
1279            .sleep_impl(SharedAsyncSleep::new(TokioSleep::new()))
1280            .wrap_connector(connector)
1281            .adapter;
1282
1283        let resp = hyper
1284            .call(HttpRequest::get("http://static-uri:50227.com").unwrap())
1285            .await
1286            .unwrap_err();
1287        assert!(
1288            resp.is_io(),
1289            "expected resp.is_io() to be true but it was false, resp == {:?}",
1290            resp
1291        );
1292        let message = DisplayErrorContext(&resp).to_string();
1293        let expected = "Connection refused";
1294        assert!(
1295            message.contains(expected),
1296            "expected '{message}' to contain '{expected}'"
1297        );
1298    }
1299
1300    #[cfg(feature = "s2n-tls")]
1301    #[tokio::test]
1302    async fn s2n_tls_provider() {
1303        // Create an HttpConnector with the s2n-tls provider.
1304        let client = Builder::new()
1305            .tls_provider(tls::Provider::S2nTls)
1306            .build_https();
1307        let connector_settings = HttpConnectorSettings::builder().build();
1308
1309        // HyperClient::http_connector invokes TimeSource::now to determine how long it takes to
1310        // create new HttpConnectors. As such, a real time source must be provided.
1311        let runtime_components = RuntimeComponentsBuilder::for_tests()
1312            .with_time_source(Some(SystemTimeSource::new()))
1313            .build()
1314            .unwrap();
1315
1316        let connector = client.http_connector(&connector_settings, &runtime_components);
1317
1318        // Ensure that s2n-tls is used as the underlying TLS provider when selected.
1319        //
1320        // s2n-tls-hyper will error when given an invalid scheme. Ensure that this error is produced
1321        // from s2n-tls-hyper, and not another TLS provider.
1322        let error = connector
1323            .call(HttpRequest::get("notascheme://amazon.com").unwrap())
1324            .await
1325            .unwrap_err();
1326        let error = error.into_source();
1327        let s2n_error = error
1328            .source()
1329            .unwrap()
1330            .downcast_ref::<s2n_tls_hyper::error::Error>()
1331            .unwrap();
1332        assert!(matches!(
1333            s2n_error,
1334            s2n_tls_hyper::error::Error::InvalidScheme
1335        ));
1336    }
1337}