aws_smithy_http_client/
client.rs

1/*
2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6mod dns;
7/// Proxy configuration
8pub mod proxy;
9mod timeout;
10/// TLS connector(s)
11pub mod tls;
12
13pub(crate) mod connect;
14
15use crate::cfg::cfg_tls;
16use crate::tls::TlsContext;
17use aws_smithy_async::future::timeout::TimedOutError;
18use aws_smithy_async::rt::sleep::{default_async_sleep, AsyncSleep, SharedAsyncSleep};
19use aws_smithy_runtime_api::box_error::BoxError;
20use aws_smithy_runtime_api::client::connection::CaptureSmithyConnection;
21use aws_smithy_runtime_api::client::connection::ConnectionMetadata;
22use aws_smithy_runtime_api::client::connector_metadata::ConnectorMetadata;
23use aws_smithy_runtime_api::client::http::{
24    HttpClient, HttpConnector, HttpConnectorFuture, HttpConnectorSettings, SharedHttpClient,
25    SharedHttpConnector,
26};
27use aws_smithy_runtime_api::client::orchestrator::{HttpRequest, HttpResponse};
28use aws_smithy_runtime_api::client::result::ConnectorError;
29use aws_smithy_runtime_api::client::runtime_components::{
30    RuntimeComponents, RuntimeComponentsBuilder,
31};
32use aws_smithy_runtime_api::shared::IntoShared;
33use aws_smithy_types::body::SdkBody;
34use aws_smithy_types::config_bag::ConfigBag;
35use aws_smithy_types::error::display::DisplayErrorContext;
36use aws_smithy_types::retry::ErrorKind;
37use client::connect::Connection;
38use h2::Reason;
39use http_1x::{Extensions, Uri};
40use hyper::rt::{Read, Write};
41use hyper_util::client::legacy as client;
42use hyper_util::client::legacy::connect::dns::GaiResolver;
43use hyper_util::client::legacy::connect::{
44    capture_connection, CaptureConnection, Connect, HttpConnector as HyperHttpConnector, HttpInfo,
45};
46use hyper_util::client::proxy::matcher::Matcher;
47use hyper_util::rt::{TokioExecutor, TokioTimer};
48use std::borrow::Cow;
49use std::collections::HashMap;
50use std::error::Error;
51use std::fmt;
52use std::sync::RwLock;
53use std::time::Duration;
54
55/// Given `HttpConnectorSettings` and an `SharedAsyncSleep`, create a `SharedHttpConnector` from defaults depending on what cargo features are activated.
56pub fn default_connector(
57    settings: &HttpConnectorSettings,
58    sleep: Option<SharedAsyncSleep>,
59) -> Option<SharedHttpConnector> {
60    #[cfg(feature = "rustls-aws-lc")]
61    {
62        tracing::trace!(settings = ?settings, sleep = ?sleep, "creating a new default connector");
63        let mut conn_builder = Connector::builder().connector_settings(settings.clone());
64
65        if let Some(sleep) = sleep {
66            conn_builder = conn_builder.sleep_impl(sleep);
67        }
68
69        let conn = conn_builder
70            .tls_provider(tls::Provider::Rustls(
71                tls::rustls_provider::CryptoMode::AwsLc,
72            ))
73            .build();
74        Some(SharedHttpConnector::new(conn))
75    }
76    #[cfg(not(feature = "rustls-aws-lc"))]
77    {
78        tracing::trace!(settings = ?settings, sleep = ?sleep, "no default connector available");
79        None
80    }
81}
82
83/// [`HttpConnector`] used to make HTTP requests.
84///
85/// This connector also implements socket connect and read timeouts.
86///
87/// This shouldn't be used directly in most cases.
88/// See the docs on [`Builder`] for examples of how to customize the HTTP client.
89#[derive(Debug)]
90pub struct Connector {
91    adapter: Box<dyn HttpConnector>,
92}
93
94impl Connector {
95    /// Builder for an HTTP connector.
96    pub fn builder() -> ConnectorBuilder {
97        ConnectorBuilder {
98            enable_tcp_nodelay: true,
99            ..Default::default()
100        }
101    }
102}
103
104impl HttpConnector for Connector {
105    fn call(&self, request: HttpRequest) -> HttpConnectorFuture {
106        self.adapter.call(request)
107    }
108}
109
110/// Builder for [`Connector`].
111#[derive(Default, Debug, Clone)]
112pub struct ConnectorBuilder<Tls = TlsUnset> {
113    connector_settings: Option<HttpConnectorSettings>,
114    sleep_impl: Option<SharedAsyncSleep>,
115    client_builder: Option<hyper_util::client::legacy::Builder>,
116    enable_tcp_nodelay: bool,
117    interface: Option<String>,
118    proxy_config: Option<proxy::ProxyConfig>,
119    #[allow(unused)]
120    tls: Tls,
121}
122
123/// Initial builder state, `TlsProvider` choice required
124#[derive(Default, Debug, Clone)]
125#[non_exhaustive]
126pub struct TlsUnset {}
127
128/// TLS implementation selected
129#[derive(Debug, Clone)]
130pub struct TlsProviderSelected {
131    #[allow(unused)]
132    provider: tls::Provider,
133    #[allow(unused)]
134    context: TlsContext,
135}
136
137impl ConnectorBuilder<TlsUnset> {
138    /// Set the TLS implementation to use for this connector
139    pub fn tls_provider(self, provider: tls::Provider) -> ConnectorBuilder<TlsProviderSelected> {
140        ConnectorBuilder {
141            connector_settings: self.connector_settings,
142            sleep_impl: self.sleep_impl,
143            client_builder: self.client_builder,
144            enable_tcp_nodelay: self.enable_tcp_nodelay,
145            interface: self.interface,
146            proxy_config: self.proxy_config,
147            tls: TlsProviderSelected {
148                provider,
149                context: TlsContext::default(),
150            },
151        }
152    }
153
154    /// Build an HTTP connector sans TLS
155    #[doc(hidden)]
156    pub fn build_http(self) -> Connector {
157        if let Some(ref proxy_config) = self.proxy_config {
158            if proxy_config.requires_tls() {
159                tracing::warn!(
160                    "HTTPS proxy configured but no TLS provider set. \
161                     Connections to HTTPS proxy servers will fail. \
162                     Consider configuring a TLS provider to enable TLS support."
163                );
164            }
165        }
166
167        let base = self.base_connector();
168
169        // Wrap with HTTP proxy support if proxy is configured
170        let proxy_config = self
171            .proxy_config
172            .clone()
173            .unwrap_or_else(proxy::ProxyConfig::disabled);
174
175        if !proxy_config.is_disabled() {
176            let http_proxy_connector = connect::HttpProxyConnector::new(base, proxy_config);
177            self.wrap_connector(http_proxy_connector)
178        } else {
179            self.wrap_connector(base)
180        }
181    }
182}
183
184impl<Any> ConnectorBuilder<Any> {
185    /// Create a [`Connector`] from this builder and a given connector.
186    pub(crate) fn wrap_connector<C>(self, tcp_connector: C) -> Connector
187    where
188        C: Send + Sync + 'static,
189        C: Clone,
190        C: tower::Service<Uri>,
191        C::Response: Read + Write + Connection + Send + Sync + Unpin,
192        C: Connect,
193        C::Future: Unpin + Send + 'static,
194        C::Error: Into<BoxError>,
195    {
196        let client_builder = self.client_builder.unwrap_or_else(|| {
197            let mut builder = hyper_util::client::legacy::Builder::new(TokioExecutor::new());
198            // Explicitly setting the pool_timer is required for connection timeouts to work.
199            builder.pool_timer(TokioTimer::new());
200
201            builder
202        });
203        let sleep_impl = self.sleep_impl.or_else(default_async_sleep);
204        let (connect_timeout, read_timeout) = self
205            .connector_settings
206            .map(|c| (c.connect_timeout(), c.read_timeout()))
207            .unwrap_or((None, None));
208
209        let connector = match connect_timeout {
210            Some(duration) => timeout::ConnectTimeout::new(
211                tcp_connector,
212                sleep_impl
213                    .clone()
214                    .expect("a sleep impl must be provided in order to have a connect timeout"),
215                duration,
216            ),
217            None => timeout::ConnectTimeout::no_timeout(tcp_connector),
218        };
219        let base = client_builder.build(connector);
220        let read_timeout = match read_timeout {
221            Some(duration) => timeout::HttpReadTimeout::new(
222                base,
223                sleep_impl.expect("a sleep impl must be provided in order to have a read timeout"),
224                duration,
225            ),
226            None => timeout::HttpReadTimeout::no_timeout(base),
227        };
228
229        let proxy_matcher = self
230            .proxy_config
231            .as_ref()
232            .map(|config| config.clone().into_hyper_util_matcher());
233
234        Connector {
235            adapter: Box::new(Adapter {
236                client: read_timeout,
237                proxy_matcher,
238            }),
239        }
240    }
241
242    /// Get the base TCP connector by mapping our config to the underlying `HttpConnector` from hyper
243    /// (which is a base TCP connector with no TLS or any wrapping)
244    fn base_connector(&self) -> HyperHttpConnector {
245        self.base_connector_with_resolver(GaiResolver::new())
246    }
247
248    /// Get the base TCP connector by mapping our config to the underlying `HttpConnector` from hyper
249    /// using the given resolver `R`
250    fn base_connector_with_resolver<R>(&self, resolver: R) -> HyperHttpConnector<R> {
251        let mut conn = HyperHttpConnector::new_with_resolver(resolver);
252        conn.set_nodelay(self.enable_tcp_nodelay);
253        #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
254        if let Some(interface) = &self.interface {
255            conn.set_interface(interface);
256        }
257        conn
258    }
259
260    /// Set the async sleep implementation used for timeouts
261    ///
262    /// Calling this is only necessary for testing or to use something other than
263    /// [`default_async_sleep`].
264    pub fn sleep_impl(mut self, sleep_impl: impl AsyncSleep + 'static) -> Self {
265        self.sleep_impl = Some(sleep_impl.into_shared());
266        self
267    }
268
269    /// Set the async sleep implementation used for timeouts
270    ///
271    /// Calling this is only necessary for testing or to use something other than
272    /// [`default_async_sleep`].
273    pub fn set_sleep_impl(&mut self, sleep_impl: Option<SharedAsyncSleep>) -> &mut Self {
274        self.sleep_impl = sleep_impl;
275        self
276    }
277
278    /// Configure the HTTP settings for the `HyperAdapter`
279    pub fn connector_settings(mut self, connector_settings: HttpConnectorSettings) -> Self {
280        self.connector_settings = Some(connector_settings);
281        self
282    }
283
284    /// Configure the HTTP settings for the `HyperAdapter`
285    pub fn set_connector_settings(
286        &mut self,
287        connector_settings: Option<HttpConnectorSettings>,
288    ) -> &mut Self {
289        self.connector_settings = connector_settings;
290        self
291    }
292
293    /// Configure `SO_NODELAY` for all sockets to the supplied value `nodelay`
294    pub fn enable_tcp_nodelay(mut self, nodelay: bool) -> Self {
295        self.enable_tcp_nodelay = nodelay;
296        self
297    }
298
299    /// Configure `SO_NODELAY` for all sockets to the supplied value `nodelay`
300    pub fn set_enable_tcp_nodelay(&mut self, nodelay: bool) -> &mut Self {
301        self.enable_tcp_nodelay = nodelay;
302        self
303    }
304
305    /// Sets the value for the `SO_BINDTODEVICE` option on this socket.
306    ///
307    /// If a socket is bound to an interface, only packets received from that particular
308    /// interface are processed by the socket. Note that this only works for some socket
309    /// types (e.g. `AF_INET` sockets).
310    ///
311    /// On Linux it can be used to specify a [VRF], but the binary needs to either have
312    /// `CAP_NET_RAW` capability set or be run as root.
313    ///
314    /// This function is only available on Android, Fuchsia, and Linux.
315    ///
316    /// [VRF]: https://www.kernel.org/doc/Documentation/networking/vrf.txt
317    #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
318    pub fn set_interface<S: Into<String>>(&mut self, interface: S) -> &mut Self {
319        self.interface = Some(interface.into());
320        self
321    }
322
323    /// Configure proxy settings for this connector
324    ///
325    /// This method allows you to set explicit proxy configuration for the HTTP client.
326    /// The proxy configuration will be used to determine whether requests should be
327    /// routed through a proxy server or connect directly.
328    ///
329    /// # Examples
330    ///
331    /// ```rust
332    /// # #[cfg(feature = "rustls-aws-lc")]
333    /// # {
334    /// use aws_smithy_http_client::{Connector, proxy::ProxyConfig, tls};
335    ///
336    /// let proxy_config = ProxyConfig::http("http://proxy.example.com:8080")?;
337    /// let connector = Connector::builder()
338    ///     .proxy_config(proxy_config)
339    ///     .tls_provider(tls::Provider::Rustls(tls::rustls_provider::CryptoMode::AwsLc))
340    ///     .build();
341    /// # }
342    /// # Ok::<(), Box<dyn std::error::Error>>(())
343    /// ```
344    pub fn proxy_config(mut self, config: proxy::ProxyConfig) -> Self {
345        self.proxy_config = Some(config);
346        self
347    }
348
349    /// Configure proxy settings for this connector
350    ///
351    /// This is the mutable version of [`proxy_config`](Self::proxy_config).
352    pub fn set_proxy_config(&mut self, config: Option<proxy::ProxyConfig>) -> &mut Self {
353        self.proxy_config = config;
354        self
355    }
356
357    /// Override the Hyper client [`Builder`](hyper_util::client::legacy::Builder) used to construct this client.
358    ///
359    /// This enables changing settings like forcing HTTP2 and modifying other default client behavior.
360    pub(crate) fn hyper_builder(
361        mut self,
362        hyper_builder: hyper_util::client::legacy::Builder,
363    ) -> Self {
364        self.set_hyper_builder(Some(hyper_builder));
365        self
366    }
367
368    /// Override the Hyper client [`Builder`](hyper_util::client::legacy::Builder) used to construct this client.
369    ///
370    /// This enables changing settings like forcing HTTP2 and modifying other default client behavior.
371    pub(crate) fn set_hyper_builder(
372        &mut self,
373        hyper_builder: Option<hyper_util::client::legacy::Builder>,
374    ) -> &mut Self {
375        self.client_builder = hyper_builder;
376        self
377    }
378}
379
380/// Adapter to use a Hyper 1.0-based Client as an `HttpConnector`
381///
382/// This adapter also enables TCP `CONNECT` and HTTP `READ` timeouts via [`Connector::builder`].
383struct Adapter<C> {
384    client: timeout::HttpReadTimeout<
385        hyper_util::client::legacy::Client<timeout::ConnectTimeout<C>, SdkBody>,
386    >,
387    proxy_matcher: Option<Matcher>,
388}
389
390impl<C> fmt::Debug for Adapter<C> {
391    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
392        f.debug_struct("Adapter")
393            .field("client", &"** hyper client **")
394            .field("proxy_matcher", &self.proxy_matcher.is_some())
395            .finish()
396    }
397}
398
399/// Extract a smithy connection from a hyper CaptureConnection
400fn extract_smithy_connection(capture_conn: &CaptureConnection) -> Option<ConnectionMetadata> {
401    let capture_conn = capture_conn.clone();
402    if let Some(conn) = capture_conn.clone().connection_metadata().as_ref() {
403        let mut extensions = Extensions::new();
404        conn.get_extras(&mut extensions);
405        let http_info = extensions.get::<HttpInfo>();
406        let mut builder = ConnectionMetadata::builder()
407            .proxied(conn.is_proxied())
408            .poison_fn(move || match capture_conn.connection_metadata().as_ref() {
409                Some(conn) => conn.poison(),
410                None => tracing::trace!("no connection existed to poison"),
411            });
412
413        builder
414            .set_local_addr(http_info.map(|info| info.local_addr()))
415            .set_remote_addr(http_info.map(|info| info.remote_addr()));
416
417        let smithy_connection = builder.build();
418
419        Some(smithy_connection)
420    } else {
421        None
422    }
423}
424
425impl<C> Adapter<C> {
426    /// Add proxy authentication header to the request if needed
427    fn add_proxy_auth_header(&self, request: &mut http_1x::Request<SdkBody>) {
428        // Only add auth for HTTP requests (not HTTPS which uses CONNECT tunneling)
429        if request.uri().scheme() != Some(&http_1x::uri::Scheme::HTTP) {
430            return;
431        }
432
433        // Don't override existing proxy authorization header
434        if request
435            .headers()
436            .contains_key(http_1x::header::PROXY_AUTHORIZATION)
437        {
438            return;
439        }
440
441        if let Some(ref matcher) = self.proxy_matcher {
442            if let Some(intercept) = matcher.intercept(request.uri()) {
443                // Add basic auth header if available
444                if let Some(auth_header) = intercept.basic_auth() {
445                    request
446                        .headers_mut()
447                        .insert(http_1x::header::PROXY_AUTHORIZATION, auth_header.clone());
448                    tracing::debug!("added proxy authentication header for {}", request.uri());
449                }
450            }
451        }
452    }
453}
454
455impl<C> HttpConnector for Adapter<C>
456where
457    C: Clone + Send + Sync + 'static,
458    C: tower::Service<Uri>,
459    C::Response: Connection + Read + Write + Unpin + 'static,
460    timeout::ConnectTimeout<C>: Connect,
461    C::Future: Unpin + Send + 'static,
462    C::Error: Into<BoxError>,
463{
464    fn call(&self, request: HttpRequest) -> HttpConnectorFuture {
465        let mut request = match request.try_into_http1x() {
466            Ok(request) => request,
467            Err(err) => {
468                return HttpConnectorFuture::ready(Err(ConnectorError::user(err.into())));
469            }
470        };
471
472        self.add_proxy_auth_header(&mut request);
473
474        let capture_connection = capture_connection(&mut request);
475        if let Some(capture_smithy_connection) =
476            request.extensions().get::<CaptureSmithyConnection>()
477        {
478            capture_smithy_connection
479                .set_connection_retriever(move || extract_smithy_connection(&capture_connection));
480        }
481        let mut client = self.client.clone();
482        use tower::Service;
483        let fut = client.call(request);
484        HttpConnectorFuture::new(async move {
485            let response = fut
486                .await
487                .map_err(downcast_error)?
488                .map(SdkBody::from_body_1_x);
489            match HttpResponse::try_from(response) {
490                Ok(response) => Ok(response),
491                Err(err) => Err(ConnectorError::other(err.into(), None)),
492            }
493        })
494    }
495}
496
497/// Downcast errors coming out of hyper into an appropriate `ConnectorError`
498fn downcast_error(err: BoxError) -> ConnectorError {
499    // is a `TimedOutError` (from aws_smithy_async::timeout) in the chain? if it is, this is a timeout
500    if find_source::<TimedOutError>(err.as_ref()).is_some() {
501        return ConnectorError::timeout(err);
502    }
503    // is the top of chain error actually already a `ConnectorError`? return that directly
504    let err = match err.downcast::<ConnectorError>() {
505        Ok(connector_error) => return *connector_error,
506        Err(box_error) => box_error,
507    };
508    // generally, the top of chain will probably be a hyper error. Go through a set of hyper specific
509    // error classifications
510    let err = match find_source::<hyper::Error>(err.as_ref()) {
511        Some(hyper_error) => return to_connector_error(hyper_error)(err),
512        None => match find_source::<hyper_util::client::legacy::Error>(err.as_ref()) {
513            Some(hyper_util_err) => {
514                if hyper_util_err.is_connect()
515                    || find_source::<std::io::Error>(hyper_util_err).is_some()
516                {
517                    return ConnectorError::io(err);
518                }
519                err
520            }
521            None => err,
522        },
523    };
524
525    // otherwise, we have no idea!
526    ConnectorError::other(err, None)
527}
528
529/// Convert a [`hyper::Error`] into a [`ConnectorError`]
530fn to_connector_error(err: &hyper::Error) -> fn(BoxError) -> ConnectorError {
531    if err.is_timeout() || find_source::<timeout::HttpTimeoutError>(err).is_some() {
532        return ConnectorError::timeout;
533    }
534    if err.is_user() {
535        return ConnectorError::user;
536    }
537    if err.is_closed() || err.is_canceled() || find_source::<std::io::Error>(err).is_some() {
538        return ConnectorError::io;
539    }
540    // We sometimes receive this from S3: hyper::Error(IncompleteMessage)
541    if err.is_incomplete_message() {
542        return |err: BoxError| ConnectorError::other(err, Some(ErrorKind::TransientError));
543    }
544
545    if let Some(h2_err) = find_source::<h2::Error>(err) {
546        if h2_err.is_go_away()
547            || (h2_err.is_reset() && h2_err.reason() == Some(Reason::REFUSED_STREAM))
548        {
549            return ConnectorError::io;
550        }
551    }
552
553    tracing::warn!(err = %DisplayErrorContext(&err), "unrecognized error from Hyper. If this error should be retried, please file an issue.");
554    |err: BoxError| ConnectorError::other(err, None)
555}
556
557fn find_source<'a, E: Error + 'static>(err: &'a (dyn Error + 'static)) -> Option<&'a E> {
558    let mut next = Some(err);
559    while let Some(err) = next {
560        if let Some(matching_err) = err.downcast_ref::<E>() {
561            return Some(matching_err);
562        }
563        next = err.source();
564    }
565    None
566}
567
568// TODO(https://github.com/awslabs/aws-sdk-rust/issues/1090): CacheKey must also include ptr equality to any
569// runtime components that are used—sleep_impl as a base (unless we prohibit overriding sleep impl)
570// If we decide to put a DnsResolver in RuntimeComponents, then we'll need to handle that as well.
571#[derive(Clone, Debug, Eq, PartialEq, Hash)]
572struct CacheKey {
573    connect_timeout: Option<Duration>,
574    read_timeout: Option<Duration>,
575}
576
577impl From<&HttpConnectorSettings> for CacheKey {
578    fn from(value: &HttpConnectorSettings) -> Self {
579        Self {
580            connect_timeout: value.connect_timeout(),
581            read_timeout: value.read_timeout(),
582        }
583    }
584}
585
586struct HyperClient<F> {
587    connector_cache: RwLock<HashMap<CacheKey, SharedHttpConnector>>,
588    client_builder: hyper_util::client::legacy::Builder,
589    connector_fn: F,
590}
591
592impl<F> fmt::Debug for HyperClient<F> {
593    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
594        f.debug_struct("HyperClient")
595            .field("connector_cache", &self.connector_cache)
596            .field("client_builder", &self.client_builder)
597            .finish()
598    }
599}
600
601impl<F> HttpClient for HyperClient<F>
602where
603    F: Fn(
604            hyper_util::client::legacy::Builder,
605            Option<&HttpConnectorSettings>,
606            Option<&RuntimeComponents>,
607        ) -> Connector
608        + Send
609        + Sync
610        + 'static,
611{
612    fn http_connector(
613        &self,
614        settings: &HttpConnectorSettings,
615        components: &RuntimeComponents,
616    ) -> SharedHttpConnector {
617        let key = CacheKey::from(settings);
618        let mut connector = self.connector_cache.read().unwrap().get(&key).cloned();
619        if connector.is_none() {
620            let mut cache = self.connector_cache.write().unwrap();
621            // Short-circuit if another thread already wrote a connector to the cache for this key
622            if !cache.contains_key(&key) {
623                let start = components.time_source().map(|ts| ts.now());
624                let connector = (self.connector_fn)(
625                    self.client_builder.clone(),
626                    Some(settings),
627                    Some(components),
628                );
629                let end = components.time_source().map(|ts| ts.now());
630                if let (Some(start), Some(end)) = (start, end) {
631                    if let Ok(elapsed) = end.duration_since(start) {
632                        tracing::debug!("new connector created in {:?}", elapsed);
633                    }
634                }
635                let connector = SharedHttpConnector::new(connector);
636                cache.insert(key.clone(), connector);
637            }
638            connector = cache.get(&key).cloned();
639        }
640
641        connector.expect("cache populated above")
642    }
643
644    fn validate_base_client_config(
645        &self,
646        _: &RuntimeComponentsBuilder,
647        _: &ConfigBag,
648    ) -> Result<(), BoxError> {
649        // Initialize the TCP connector at this point so that native certs load
650        // at client initialization time instead of upon first request. We do it
651        // here rather than at construction so that it won't run if this is not
652        // the selected HTTP client for the base config (for example, if this was
653        // the default HTTP client, and it was overridden by a later plugin).
654        let _ = (self.connector_fn)(self.client_builder.clone(), None, None);
655        Ok(())
656    }
657
658    fn connector_metadata(&self) -> Option<ConnectorMetadata> {
659        Some(ConnectorMetadata::new("hyper", Some(Cow::Borrowed("1.x"))))
660    }
661}
662
663/// Builder for a hyper-backed [`HttpClient`] implementation.
664///
665/// This builder can be used to customize the underlying TCP connector used, as well as
666/// hyper client configuration.
667///
668/// # Examples
669///
670/// Construct a Hyper client with the RusTLS TLS implementation.
671/// This can be useful when you want to share a Hyper connector between multiple
672/// generated Smithy clients.
673#[derive(Clone, Default, Debug)]
674pub struct Builder<Tls = TlsUnset> {
675    client_builder: Option<hyper_util::client::legacy::Builder>,
676    #[allow(unused)]
677    tls_provider: Tls,
678}
679
680cfg_tls! {
681    use aws_smithy_runtime_api::client::dns::ResolveDns;
682
683    impl ConnectorBuilder<TlsProviderSelected> {
684        /// Build a [`Connector`] that will use the default DNS resolver implementation.
685        pub fn build(self) -> Connector {
686            let http_connector = self.base_connector();
687            self.build_https(http_connector)
688        }
689
690        /// Configure the TLS context
691        pub fn tls_context(mut self, ctx: TlsContext) -> Self {
692            self.tls.context = ctx;
693            self
694        }
695
696        /// Configure the TLS context
697        pub fn set_tls_context(&mut self, ctx: TlsContext) -> &mut Self {
698            self.tls.context = ctx;
699            self
700        }
701
702        /// Build a [`Connector`] that will use the given DNS resolver implementation.
703        pub fn build_with_resolver<R: ResolveDns + Clone + 'static>(self, resolver: R) -> Connector {
704            use crate::client::dns::HyperUtilResolver;
705            let http_connector = self.base_connector_with_resolver(HyperUtilResolver { resolver });
706            self.build_https(http_connector)
707        }
708
709        fn build_https<R>(self, http_connector: HyperHttpConnector<R>) -> Connector
710        where
711            R: Clone + Send + Sync + 'static,
712            R: tower::Service<hyper_util::client::legacy::connect::dns::Name>,
713            R::Response: Iterator<Item = std::net::SocketAddr>,
714            R::Future: Send,
715            R::Error: Into<Box<dyn Error + Send + Sync>>,
716        {
717            match &self.tls.provider {
718                // TODO(hyper1) - fix cfg_rustls! to allow matching on patterns so we can re-use it and not duplicate these cfg matches everywhere
719                #[cfg(any(
720                    feature = "rustls-aws-lc",
721                    feature = "rustls-aws-lc-fips",
722                    feature = "rustls-ring"
723                ))]
724                tls::Provider::Rustls(crypto_mode) => {
725                    let proxy_config = self.proxy_config.clone()
726                        .unwrap_or_else(proxy::ProxyConfig::disabled);
727
728                    let https_connector = tls::rustls_provider::build_connector::wrap_connector(
729                        http_connector,
730                        crypto_mode.clone(),
731                        &self.tls.context,
732                        proxy_config,
733                    );
734                    self.wrap_connector(https_connector)
735                },
736                #[cfg(feature = "s2n-tls")]
737                tls::Provider::S2nTls  => {
738                    let proxy_config = self.proxy_config.clone()
739                        .unwrap_or_else(proxy::ProxyConfig::disabled);
740
741                    let https_connector = tls::s2n_tls_provider::build_connector::wrap_connector(
742                        http_connector,
743                        &self.tls.context,
744                        proxy_config,
745                    );
746                    self.wrap_connector(https_connector)
747                }
748            }
749        }
750    }
751
752    impl Builder<TlsProviderSelected> {
753        /// Create an HTTPS client with the selected TLS provider.
754        ///
755        /// The trusted certificates will be loaded later when this becomes the selected
756        /// HTTP client for a Smithy client.
757        pub fn build_https(self) -> SharedHttpClient {
758            build_with_conn_fn(
759                self.client_builder,
760                move |client_builder, settings, runtime_components| {
761                    let builder = new_conn_builder(client_builder, settings, runtime_components)
762                        .tls_provider(self.tls_provider.provider.clone())
763                        .tls_context(self.tls_provider.context.clone());
764                    builder.build()
765                },
766            )
767        }
768
769        /// Create an HTTPS client using a custom DNS resolver
770        pub fn build_with_resolver(
771            self,
772            resolver: impl ResolveDns + Clone + 'static,
773        ) -> SharedHttpClient {
774            build_with_conn_fn(
775                self.client_builder,
776                move |client_builder, settings, runtime_components| {
777                    let builder = new_conn_builder(client_builder, settings, runtime_components)
778                        .tls_provider(self.tls_provider.provider.clone())
779                        .tls_context(self.tls_provider.context.clone());
780                    builder.build_with_resolver(resolver.clone())
781                },
782            )
783        }
784
785        /// Configure the TLS context
786        pub fn tls_context(mut self, ctx: TlsContext) -> Self {
787            self.tls_provider.context = ctx;
788            self
789        }
790    }
791}
792
793impl Builder<TlsUnset> {
794    /// Creates a new builder.
795    pub fn new() -> Self {
796        Self::default()
797    }
798
799    /// Returns a [`SharedHttpClient`] that calls the given `connector` function to select an HTTP(S) connector.
800    #[doc(hidden)]
801    pub fn build_with_connector_fn<F>(self, connector_fn: F) -> SharedHttpClient
802    where
803        F: Fn(Option<&HttpConnectorSettings>, Option<&RuntimeComponents>) -> Connector
804            + Send
805            + Sync
806            + 'static,
807    {
808        build_with_conn_fn(
809            self.client_builder,
810            move |_builder, settings, runtime_components| {
811                connector_fn(settings, runtime_components)
812            },
813        )
814    }
815
816    /// Build a new HTTP client without TLS enabled
817    #[doc(hidden)]
818    pub fn build_http(self) -> SharedHttpClient {
819        build_with_conn_fn(
820            self.client_builder,
821            move |client_builder, settings, runtime_components| {
822                let builder = new_conn_builder(client_builder, settings, runtime_components);
823                builder.build_http()
824            },
825        )
826    }
827
828    /// Set the TLS implementation to use
829    pub fn tls_provider(self, provider: tls::Provider) -> Builder<TlsProviderSelected> {
830        Builder {
831            client_builder: self.client_builder,
832            tls_provider: TlsProviderSelected {
833                provider,
834                context: TlsContext::default(),
835            },
836        }
837    }
838}
839
840pub(crate) fn build_with_conn_fn<F>(
841    client_builder: Option<hyper_util::client::legacy::Builder>,
842    connector_fn: F,
843) -> SharedHttpClient
844where
845    F: Fn(
846            hyper_util::client::legacy::Builder,
847            Option<&HttpConnectorSettings>,
848            Option<&RuntimeComponents>,
849        ) -> Connector
850        + Send
851        + Sync
852        + 'static,
853{
854    SharedHttpClient::new(HyperClient {
855        connector_cache: RwLock::new(HashMap::new()),
856        client_builder: client_builder
857            .unwrap_or_else(|| hyper_util::client::legacy::Builder::new(TokioExecutor::new())),
858        connector_fn,
859    })
860}
861
862#[allow(dead_code)]
863pub(crate) fn build_with_tcp_conn_fn<C, F>(
864    client_builder: Option<hyper_util::client::legacy::Builder>,
865    tcp_connector_fn: F,
866) -> SharedHttpClient
867where
868    F: Fn() -> C + Send + Sync + 'static,
869    C: Clone + Send + Sync + 'static,
870    C: tower::Service<Uri>,
871    C::Response: Connection + Read + Write + Send + Sync + Unpin + 'static,
872    C::Future: Unpin + Send + 'static,
873    C::Error: Into<BoxError>,
874    C: Connect,
875{
876    build_with_conn_fn(
877        client_builder,
878        move |client_builder, settings, runtime_components| {
879            let builder = new_conn_builder(client_builder, settings, runtime_components);
880            builder.wrap_connector(tcp_connector_fn())
881        },
882    )
883}
884
885fn new_conn_builder(
886    client_builder: hyper_util::client::legacy::Builder,
887    settings: Option<&HttpConnectorSettings>,
888    runtime_components: Option<&RuntimeComponents>,
889) -> ConnectorBuilder {
890    let mut builder = Connector::builder().hyper_builder(client_builder);
891    builder.set_connector_settings(settings.cloned());
892    if let Some(components) = runtime_components {
893        builder.set_sleep_impl(components.sleep_impl());
894    }
895    builder
896}
897
898#[cfg(test)]
899mod test {
900    use std::io::{Error, ErrorKind};
901    use std::pin::Pin;
902    use std::sync::atomic::{AtomicU32, Ordering};
903    use std::sync::Arc;
904    use std::task::{Context, Poll};
905
906    use crate::client::timeout::test::NeverConnects;
907    use aws_smithy_async::assert_elapsed;
908    use aws_smithy_async::rt::sleep::TokioSleep;
909    use aws_smithy_async::time::SystemTimeSource;
910    use aws_smithy_runtime_api::client::runtime_components::RuntimeComponentsBuilder;
911    use http_1x::Uri;
912    use hyper::rt::ReadBufCursor;
913    use hyper_util::client::legacy::connect::Connected;
914
915    use super::*;
916
917    #[tokio::test]
918    async fn connector_selection() {
919        // Create a client that increments a count every time it creates a new Connector
920        let creation_count = Arc::new(AtomicU32::new(0));
921        let http_client = build_with_tcp_conn_fn(None, {
922            let count = creation_count.clone();
923            move || {
924                count.fetch_add(1, Ordering::Relaxed);
925                NeverConnects
926            }
927        });
928
929        // This configuration should result in 4 separate connectors with different timeout settings
930        let settings = [
931            HttpConnectorSettings::builder()
932                .connect_timeout(Duration::from_secs(3))
933                .build(),
934            HttpConnectorSettings::builder()
935                .read_timeout(Duration::from_secs(3))
936                .build(),
937            HttpConnectorSettings::builder()
938                .connect_timeout(Duration::from_secs(3))
939                .read_timeout(Duration::from_secs(3))
940                .build(),
941            HttpConnectorSettings::builder()
942                .connect_timeout(Duration::from_secs(5))
943                .read_timeout(Duration::from_secs(3))
944                .build(),
945        ];
946
947        // Kick off thousands of parallel tasks that will try to create a connector
948        let components = RuntimeComponentsBuilder::for_tests()
949            .with_time_source(Some(SystemTimeSource::new()))
950            .build()
951            .unwrap();
952        let mut handles = Vec::new();
953        for setting in &settings {
954            for _ in 0..1000 {
955                let client = http_client.clone();
956                handles.push(tokio::spawn({
957                    let setting = setting.clone();
958                    let components = components.clone();
959                    async move {
960                        let _ = client.http_connector(&setting, &components);
961                    }
962                }));
963            }
964        }
965        for handle in handles {
966            handle.await.unwrap();
967        }
968
969        // Verify only 4 connectors were created amidst the chaos
970        assert_eq!(4, creation_count.load(Ordering::Relaxed));
971    }
972
973    #[tokio::test]
974    async fn hyper_io_error() {
975        let connector = TestConnection {
976            inner: HangupStream,
977        };
978        let adapter = Connector::builder().wrap_connector(connector).adapter;
979        let err = adapter
980            .call(HttpRequest::get("https://socket-hangup.com").unwrap())
981            .await
982            .expect_err("socket hangup");
983        assert!(err.is_io(), "unexpected error type: {:?}", err);
984    }
985
986    // ---- machinery to make a Hyper connector that responds with an IO Error
987    #[derive(Clone)]
988    struct HangupStream;
989
990    impl Connection for HangupStream {
991        fn connected(&self) -> Connected {
992            Connected::new()
993        }
994    }
995
996    impl Read for HangupStream {
997        fn poll_read(
998            self: Pin<&mut Self>,
999            _cx: &mut Context<'_>,
1000            _buf: ReadBufCursor<'_>,
1001        ) -> Poll<std::io::Result<()>> {
1002            Poll::Ready(Err(Error::new(
1003                ErrorKind::ConnectionReset,
1004                "connection reset",
1005            )))
1006        }
1007    }
1008
1009    impl Write for HangupStream {
1010        fn poll_write(
1011            self: Pin<&mut Self>,
1012            _cx: &mut Context<'_>,
1013            _buf: &[u8],
1014        ) -> Poll<Result<usize, Error>> {
1015            Poll::Pending
1016        }
1017
1018        fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
1019            Poll::Pending
1020        }
1021
1022        fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
1023            Poll::Pending
1024        }
1025    }
1026
1027    #[derive(Clone)]
1028    struct TestConnection<T> {
1029        inner: T,
1030    }
1031
1032    impl<T> tower::Service<Uri> for TestConnection<T>
1033    where
1034        T: Clone + Connection,
1035    {
1036        type Response = T;
1037        type Error = BoxError;
1038        type Future = std::future::Ready<Result<Self::Response, Self::Error>>;
1039
1040        fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1041            Poll::Ready(Ok(()))
1042        }
1043
1044        fn call(&mut self, _req: Uri) -> Self::Future {
1045            std::future::ready(Ok(self.inner.clone()))
1046        }
1047    }
1048
1049    #[tokio::test]
1050    async fn http_connect_timeout_works() {
1051        let tcp_connector = NeverConnects::default();
1052        let connector_settings = HttpConnectorSettings::builder()
1053            .connect_timeout(Duration::from_secs(1))
1054            .build();
1055        let hyper = Connector::builder()
1056            .connector_settings(connector_settings)
1057            .sleep_impl(SharedAsyncSleep::new(TokioSleep::new()))
1058            .wrap_connector(tcp_connector)
1059            .adapter;
1060        let now = tokio::time::Instant::now();
1061        tokio::time::pause();
1062        let resp = hyper
1063            .call(HttpRequest::get("https://static-uri.com").unwrap())
1064            .await
1065            .unwrap_err();
1066        assert!(
1067            resp.is_timeout(),
1068            "expected resp.is_timeout() to be true but it was false, resp == {:?}",
1069            resp
1070        );
1071        let message = DisplayErrorContext(&resp).to_string();
1072        let expected = "timeout: client error (Connect): HTTP connect timeout occurred after 1s";
1073        assert!(
1074            message.contains(expected),
1075            "expected '{message}' to contain '{expected}'"
1076        );
1077        assert_elapsed!(now, Duration::from_secs(1));
1078    }
1079
1080    #[tokio::test]
1081    async fn http_read_timeout_works() {
1082        let tcp_connector = crate::client::timeout::test::NeverReplies;
1083        let connector_settings = HttpConnectorSettings::builder()
1084            .connect_timeout(Duration::from_secs(1))
1085            .read_timeout(Duration::from_secs(2))
1086            .build();
1087        let hyper = Connector::builder()
1088            .connector_settings(connector_settings)
1089            .sleep_impl(SharedAsyncSleep::new(TokioSleep::new()))
1090            .wrap_connector(tcp_connector)
1091            .adapter;
1092        let now = tokio::time::Instant::now();
1093        tokio::time::pause();
1094        let err = hyper
1095            .call(HttpRequest::get("https://fake-uri.com").unwrap())
1096            .await
1097            .unwrap_err();
1098        assert!(
1099            err.is_timeout(),
1100            "expected err.is_timeout() to be true but it was false, err == {err:?}",
1101        );
1102        let message = format!("{}", DisplayErrorContext(&err));
1103        let expected = "timeout: HTTP read timeout occurred after 2s";
1104        assert!(
1105            message.contains(expected),
1106            "expected '{message}' to contain '{expected}'"
1107        );
1108        assert_elapsed!(now, Duration::from_secs(2));
1109    }
1110
1111    #[cfg(not(windows))]
1112    #[tokio::test]
1113    async fn connection_refused_works() {
1114        use crate::client::dns::HyperUtilResolver;
1115        use aws_smithy_runtime_api::client::dns::{DnsFuture, ResolveDns};
1116        use std::net::{IpAddr, Ipv4Addr};
1117
1118        #[derive(Debug, Clone, Default)]
1119        struct TestResolver;
1120        impl ResolveDns for TestResolver {
1121            fn resolve_dns<'a>(&'a self, _name: &'a str) -> DnsFuture<'a> {
1122                let localhost_v4 = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
1123                DnsFuture::ready(Ok(vec![localhost_v4]))
1124            }
1125        }
1126
1127        let connector_settings = HttpConnectorSettings::builder()
1128            .connect_timeout(Duration::from_secs(20))
1129            .build();
1130
1131        let resolver = HyperUtilResolver {
1132            resolver: TestResolver,
1133        };
1134        let connector = Connector::builder().base_connector_with_resolver(resolver);
1135
1136        let hyper = Connector::builder()
1137            .connector_settings(connector_settings)
1138            .sleep_impl(SharedAsyncSleep::new(TokioSleep::new()))
1139            .wrap_connector(connector)
1140            .adapter;
1141
1142        let resp = hyper
1143            .call(HttpRequest::get("http://static-uri:50227.com").unwrap())
1144            .await
1145            .unwrap_err();
1146        assert!(
1147            resp.is_io(),
1148            "expected resp.is_io() to be true but it was false, resp == {:?}",
1149            resp
1150        );
1151        let message = DisplayErrorContext(&resp).to_string();
1152        let expected = "Connection refused";
1153        assert!(
1154            message.contains(expected),
1155            "expected '{message}' to contain '{expected}'"
1156        );
1157    }
1158
1159    #[cfg(feature = "s2n-tls")]
1160    #[tokio::test]
1161    async fn s2n_tls_provider() {
1162        // Create an HttpConnector with the s2n-tls provider.
1163        let client = Builder::new()
1164            .tls_provider(tls::Provider::S2nTls)
1165            .build_https();
1166        let connector_settings = HttpConnectorSettings::builder().build();
1167
1168        // HyperClient::http_connector invokes TimeSource::now to determine how long it takes to
1169        // create new HttpConnectors. As such, a real time source must be provided.
1170        let runtime_components = RuntimeComponentsBuilder::for_tests()
1171            .with_time_source(Some(SystemTimeSource::new()))
1172            .build()
1173            .unwrap();
1174
1175        let connector = client.http_connector(&connector_settings, &runtime_components);
1176
1177        // Ensure that s2n-tls is used as the underlying TLS provider when selected.
1178        //
1179        // s2n-tls-hyper will error when given an invalid scheme. Ensure that this error is produced
1180        // from s2n-tls-hyper, and not another TLS provider.
1181        let error = connector
1182            .call(HttpRequest::get("notascheme://amazon.com").unwrap())
1183            .await
1184            .unwrap_err();
1185        let error = error.into_source();
1186        let s2n_error = error
1187            .source()
1188            .unwrap()
1189            .downcast_ref::<s2n_tls_hyper::error::Error>()
1190            .unwrap();
1191        assert!(matches!(
1192            s2n_error,
1193            s2n_tls_hyper::error::Error::InvalidScheme
1194        ));
1195    }
1196}