aws_smithy_http_client/
client.rs

1/*
2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6mod dns;
7/// Proxy configuration
8pub mod proxy;
9mod timeout;
10/// TLS connector(s)
11pub mod tls;
12
13pub(crate) mod connect;
14
15use crate::cfg::cfg_tls;
16use crate::tls::TlsContext;
17use aws_smithy_async::future::timeout::TimedOutError;
18use aws_smithy_async::rt::sleep::{default_async_sleep, AsyncSleep, SharedAsyncSleep};
19use aws_smithy_runtime_api::box_error::BoxError;
20use aws_smithy_runtime_api::client::connection::CaptureSmithyConnection;
21use aws_smithy_runtime_api::client::connection::ConnectionMetadata;
22use aws_smithy_runtime_api::client::connector_metadata::ConnectorMetadata;
23use aws_smithy_runtime_api::client::http::{
24    HttpClient, HttpConnector, HttpConnectorFuture, HttpConnectorSettings, SharedHttpClient,
25    SharedHttpConnector,
26};
27use aws_smithy_runtime_api::client::orchestrator::{HttpRequest, HttpResponse};
28use aws_smithy_runtime_api::client::result::ConnectorError;
29use aws_smithy_runtime_api::client::runtime_components::{
30    RuntimeComponents, RuntimeComponentsBuilder,
31};
32use aws_smithy_runtime_api::shared::IntoShared;
33use aws_smithy_types::body::SdkBody;
34use aws_smithy_types::config_bag::ConfigBag;
35use aws_smithy_types::error::display::DisplayErrorContext;
36use aws_smithy_types::retry::ErrorKind;
37use client::connect::Connection;
38use h2::Reason;
39use http_1x::{Extensions, Uri};
40use hyper::rt::{Read, Write};
41use hyper_util::client::legacy as client;
42use hyper_util::client::legacy::connect::dns::GaiResolver;
43use hyper_util::client::legacy::connect::{
44    capture_connection, CaptureConnection, Connect, HttpConnector as HyperHttpConnector, HttpInfo,
45};
46use hyper_util::client::proxy::matcher::Matcher;
47use hyper_util::rt::TokioExecutor;
48use std::borrow::Cow;
49use std::collections::HashMap;
50use std::error::Error;
51use std::fmt;
52use std::sync::RwLock;
53use std::time::Duration;
54
55/// Given `HttpConnectorSettings` and an `SharedAsyncSleep`, create a `SharedHttpConnector` from defaults depending on what cargo features are activated.
56pub fn default_connector(
57    settings: &HttpConnectorSettings,
58    sleep: Option<SharedAsyncSleep>,
59) -> Option<SharedHttpConnector> {
60    #[cfg(feature = "rustls-aws-lc")]
61    {
62        tracing::trace!(settings = ?settings, sleep = ?sleep, "creating a new default connector");
63        let mut conn_builder = Connector::builder().connector_settings(settings.clone());
64
65        if let Some(sleep) = sleep {
66            conn_builder = conn_builder.sleep_impl(sleep);
67        }
68
69        let conn = conn_builder
70            .tls_provider(tls::Provider::Rustls(
71                tls::rustls_provider::CryptoMode::AwsLc,
72            ))
73            .build();
74        Some(SharedHttpConnector::new(conn))
75    }
76    #[cfg(not(feature = "rustls-aws-lc"))]
77    {
78        tracing::trace!(settings = ?settings, sleep = ?sleep, "no default connector available");
79        None
80    }
81}
82
83/// [`HttpConnector`] used to make HTTP requests.
84///
85/// This connector also implements socket connect and read timeouts.
86///
87/// This shouldn't be used directly in most cases.
88/// See the docs on [`Builder`] for examples of how to customize the HTTP client.
89#[derive(Debug)]
90pub struct Connector {
91    adapter: Box<dyn HttpConnector>,
92}
93
94impl Connector {
95    /// Builder for an HTTP connector.
96    pub fn builder() -> ConnectorBuilder {
97        ConnectorBuilder {
98            enable_tcp_nodelay: true,
99            ..Default::default()
100        }
101    }
102}
103
104impl HttpConnector for Connector {
105    fn call(&self, request: HttpRequest) -> HttpConnectorFuture {
106        self.adapter.call(request)
107    }
108}
109
110/// Builder for [`Connector`].
111#[derive(Default, Debug, Clone)]
112pub struct ConnectorBuilder<Tls = TlsUnset> {
113    connector_settings: Option<HttpConnectorSettings>,
114    sleep_impl: Option<SharedAsyncSleep>,
115    client_builder: Option<hyper_util::client::legacy::Builder>,
116    enable_tcp_nodelay: bool,
117    interface: Option<String>,
118    proxy_config: Option<proxy::ProxyConfig>,
119    #[allow(unused)]
120    tls: Tls,
121}
122
123/// Initial builder state, `TlsProvider` choice required
124#[derive(Default, Debug, Clone)]
125#[non_exhaustive]
126pub struct TlsUnset {}
127
128/// TLS implementation selected
129#[derive(Debug, Clone)]
130pub struct TlsProviderSelected {
131    #[allow(unused)]
132    provider: tls::Provider,
133    #[allow(unused)]
134    context: TlsContext,
135}
136
137impl ConnectorBuilder<TlsUnset> {
138    /// Set the TLS implementation to use for this connector
139    pub fn tls_provider(self, provider: tls::Provider) -> ConnectorBuilder<TlsProviderSelected> {
140        ConnectorBuilder {
141            connector_settings: self.connector_settings,
142            sleep_impl: self.sleep_impl,
143            client_builder: self.client_builder,
144            enable_tcp_nodelay: self.enable_tcp_nodelay,
145            interface: self.interface,
146            proxy_config: self.proxy_config,
147            tls: TlsProviderSelected {
148                provider,
149                context: TlsContext::default(),
150            },
151        }
152    }
153
154    /// Build an HTTP connector sans TLS
155    #[doc(hidden)]
156    pub fn build_http(self) -> Connector {
157        if let Some(ref proxy_config) = self.proxy_config {
158            if proxy_config.requires_tls() {
159                tracing::warn!(
160                    "HTTPS proxy configured but no TLS provider set. \
161                     Connections to HTTPS proxy servers will fail. \
162                     Consider configuring a TLS provider to enable TLS support."
163                );
164            }
165        }
166
167        let base = self.base_connector();
168
169        // Wrap with HTTP proxy support if proxy is configured
170        let proxy_config = self
171            .proxy_config
172            .clone()
173            .unwrap_or_else(proxy::ProxyConfig::disabled);
174
175        if !proxy_config.is_disabled() {
176            let http_proxy_connector = connect::HttpProxyConnector::new(base, proxy_config);
177            self.wrap_connector(http_proxy_connector)
178        } else {
179            self.wrap_connector(base)
180        }
181    }
182}
183
184impl<Any> ConnectorBuilder<Any> {
185    /// Create a [`Connector`] from this builder and a given connector.
186    pub(crate) fn wrap_connector<C>(self, tcp_connector: C) -> Connector
187    where
188        C: Send + Sync + 'static,
189        C: Clone,
190        C: tower::Service<Uri>,
191        C::Response: Read + Write + Connection + Send + Sync + Unpin,
192        C: Connect,
193        C::Future: Unpin + Send + 'static,
194        C::Error: Into<BoxError>,
195    {
196        let client_builder =
197            self.client_builder
198                .unwrap_or(hyper_util::client::legacy::Builder::new(
199                    TokioExecutor::new(),
200                ));
201        let sleep_impl = self.sleep_impl.or_else(default_async_sleep);
202        let (connect_timeout, read_timeout) = self
203            .connector_settings
204            .map(|c| (c.connect_timeout(), c.read_timeout()))
205            .unwrap_or((None, None));
206
207        let connector = match connect_timeout {
208            Some(duration) => timeout::ConnectTimeout::new(
209                tcp_connector,
210                sleep_impl
211                    .clone()
212                    .expect("a sleep impl must be provided in order to have a connect timeout"),
213                duration,
214            ),
215            None => timeout::ConnectTimeout::no_timeout(tcp_connector),
216        };
217        let base = client_builder.build(connector);
218        let read_timeout = match read_timeout {
219            Some(duration) => timeout::HttpReadTimeout::new(
220                base,
221                sleep_impl.expect("a sleep impl must be provided in order to have a read timeout"),
222                duration,
223            ),
224            None => timeout::HttpReadTimeout::no_timeout(base),
225        };
226
227        let proxy_matcher = self
228            .proxy_config
229            .as_ref()
230            .map(|config| config.clone().into_hyper_util_matcher());
231
232        Connector {
233            adapter: Box::new(Adapter {
234                client: read_timeout,
235                proxy_matcher,
236            }),
237        }
238    }
239
240    /// Get the base TCP connector by mapping our config to the underlying `HttpConnector` from hyper
241    /// (which is a base TCP connector with no TLS or any wrapping)
242    fn base_connector(&self) -> HyperHttpConnector {
243        self.base_connector_with_resolver(GaiResolver::new())
244    }
245
246    /// Get the base TCP connector by mapping our config to the underlying `HttpConnector` from hyper
247    /// using the given resolver `R`
248    fn base_connector_with_resolver<R>(&self, resolver: R) -> HyperHttpConnector<R> {
249        let mut conn = HyperHttpConnector::new_with_resolver(resolver);
250        conn.set_nodelay(self.enable_tcp_nodelay);
251        #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
252        if let Some(interface) = &self.interface {
253            conn.set_interface(interface);
254        }
255        conn
256    }
257
258    /// Set the async sleep implementation used for timeouts
259    ///
260    /// Calling this is only necessary for testing or to use something other than
261    /// [`default_async_sleep`].
262    pub fn sleep_impl(mut self, sleep_impl: impl AsyncSleep + 'static) -> Self {
263        self.sleep_impl = Some(sleep_impl.into_shared());
264        self
265    }
266
267    /// Set the async sleep implementation used for timeouts
268    ///
269    /// Calling this is only necessary for testing or to use something other than
270    /// [`default_async_sleep`].
271    pub fn set_sleep_impl(&mut self, sleep_impl: Option<SharedAsyncSleep>) -> &mut Self {
272        self.sleep_impl = sleep_impl;
273        self
274    }
275
276    /// Configure the HTTP settings for the `HyperAdapter`
277    pub fn connector_settings(mut self, connector_settings: HttpConnectorSettings) -> Self {
278        self.connector_settings = Some(connector_settings);
279        self
280    }
281
282    /// Configure the HTTP settings for the `HyperAdapter`
283    pub fn set_connector_settings(
284        &mut self,
285        connector_settings: Option<HttpConnectorSettings>,
286    ) -> &mut Self {
287        self.connector_settings = connector_settings;
288        self
289    }
290
291    /// Configure `SO_NODELAY` for all sockets to the supplied value `nodelay`
292    pub fn enable_tcp_nodelay(mut self, nodelay: bool) -> Self {
293        self.enable_tcp_nodelay = nodelay;
294        self
295    }
296
297    /// Configure `SO_NODELAY` for all sockets to the supplied value `nodelay`
298    pub fn set_enable_tcp_nodelay(&mut self, nodelay: bool) -> &mut Self {
299        self.enable_tcp_nodelay = nodelay;
300        self
301    }
302
303    /// Sets the value for the `SO_BINDTODEVICE` option on this socket.
304    ///
305    /// If a socket is bound to an interface, only packets received from that particular
306    /// interface are processed by the socket. Note that this only works for some socket
307    /// types (e.g. `AF_INET` sockets).
308    ///
309    /// On Linux it can be used to specify a [VRF], but the binary needs to either have
310    /// `CAP_NET_RAW` capability set or be run as root.
311    ///
312    /// This function is only available on Android, Fuchsia, and Linux.
313    ///
314    /// [VRF]: https://www.kernel.org/doc/Documentation/networking/vrf.txt
315    #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
316    pub fn set_interface<S: Into<String>>(&mut self, interface: S) -> &mut Self {
317        self.interface = Some(interface.into());
318        self
319    }
320
321    /// Configure proxy settings for this connector
322    ///
323    /// This method allows you to set explicit proxy configuration for the HTTP client.
324    /// The proxy configuration will be used to determine whether requests should be
325    /// routed through a proxy server or connect directly.
326    ///
327    /// # Examples
328    ///
329    /// ```rust
330    /// # #[cfg(feature = "rustls-aws-lc")]
331    /// # {
332    /// use aws_smithy_http_client::{Connector, proxy::ProxyConfig, tls};
333    ///
334    /// let proxy_config = ProxyConfig::http("http://proxy.example.com:8080")?;
335    /// let connector = Connector::builder()
336    ///     .proxy_config(proxy_config)
337    ///     .tls_provider(tls::Provider::Rustls(tls::rustls_provider::CryptoMode::AwsLc))
338    ///     .build();
339    /// # }
340    /// # Ok::<(), Box<dyn std::error::Error>>(())
341    /// ```
342    pub fn proxy_config(mut self, config: proxy::ProxyConfig) -> Self {
343        self.proxy_config = Some(config);
344        self
345    }
346
347    /// Configure proxy settings for this connector
348    ///
349    /// This is the mutable version of [`proxy_config`](Self::proxy_config).
350    pub fn set_proxy_config(&mut self, config: Option<proxy::ProxyConfig>) -> &mut Self {
351        self.proxy_config = config;
352        self
353    }
354
355    /// Override the Hyper client [`Builder`](hyper_util::client::legacy::Builder) used to construct this client.
356    ///
357    /// This enables changing settings like forcing HTTP2 and modifying other default client behavior.
358    pub(crate) fn hyper_builder(
359        mut self,
360        hyper_builder: hyper_util::client::legacy::Builder,
361    ) -> Self {
362        self.set_hyper_builder(Some(hyper_builder));
363        self
364    }
365
366    /// Override the Hyper client [`Builder`](hyper_util::client::legacy::Builder) used to construct this client.
367    ///
368    /// This enables changing settings like forcing HTTP2 and modifying other default client behavior.
369    pub(crate) fn set_hyper_builder(
370        &mut self,
371        hyper_builder: Option<hyper_util::client::legacy::Builder>,
372    ) -> &mut Self {
373        self.client_builder = hyper_builder;
374        self
375    }
376}
377
378/// Adapter to use a Hyper 1.0-based Client as an `HttpConnector`
379///
380/// This adapter also enables TCP `CONNECT` and HTTP `READ` timeouts via [`Connector::builder`].
381struct Adapter<C> {
382    client: timeout::HttpReadTimeout<
383        hyper_util::client::legacy::Client<timeout::ConnectTimeout<C>, SdkBody>,
384    >,
385    proxy_matcher: Option<Matcher>,
386}
387
388impl<C> fmt::Debug for Adapter<C> {
389    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
390        f.debug_struct("Adapter")
391            .field("client", &"** hyper client **")
392            .field("proxy_matcher", &self.proxy_matcher.is_some())
393            .finish()
394    }
395}
396
397/// Extract a smithy connection from a hyper CaptureConnection
398fn extract_smithy_connection(capture_conn: &CaptureConnection) -> Option<ConnectionMetadata> {
399    let capture_conn = capture_conn.clone();
400    if let Some(conn) = capture_conn.clone().connection_metadata().as_ref() {
401        let mut extensions = Extensions::new();
402        conn.get_extras(&mut extensions);
403        let http_info = extensions.get::<HttpInfo>();
404        let mut builder = ConnectionMetadata::builder()
405            .proxied(conn.is_proxied())
406            .poison_fn(move || match capture_conn.connection_metadata().as_ref() {
407                Some(conn) => conn.poison(),
408                None => tracing::trace!("no connection existed to poison"),
409            });
410
411        builder
412            .set_local_addr(http_info.map(|info| info.local_addr()))
413            .set_remote_addr(http_info.map(|info| info.remote_addr()));
414
415        let smithy_connection = builder.build();
416
417        Some(smithy_connection)
418    } else {
419        None
420    }
421}
422
423impl<C> Adapter<C> {
424    /// Add proxy authentication header to the request if needed
425    fn add_proxy_auth_header(&self, request: &mut http_1x::Request<SdkBody>) {
426        // Only add auth for HTTP requests (not HTTPS which uses CONNECT tunneling)
427        if request.uri().scheme() != Some(&http_1x::uri::Scheme::HTTP) {
428            return;
429        }
430
431        // Don't override existing proxy authorization header
432        if request
433            .headers()
434            .contains_key(http_1x::header::PROXY_AUTHORIZATION)
435        {
436            return;
437        }
438
439        if let Some(ref matcher) = self.proxy_matcher {
440            if let Some(intercept) = matcher.intercept(request.uri()) {
441                // Add basic auth header if available
442                if let Some(auth_header) = intercept.basic_auth() {
443                    request
444                        .headers_mut()
445                        .insert(http_1x::header::PROXY_AUTHORIZATION, auth_header.clone());
446                    tracing::debug!("added proxy authentication header for {}", request.uri());
447                }
448            }
449        }
450    }
451}
452
453impl<C> HttpConnector for Adapter<C>
454where
455    C: Clone + Send + Sync + 'static,
456    C: tower::Service<Uri>,
457    C::Response: Connection + Read + Write + Unpin + 'static,
458    timeout::ConnectTimeout<C>: Connect,
459    C::Future: Unpin + Send + 'static,
460    C::Error: Into<BoxError>,
461{
462    fn call(&self, request: HttpRequest) -> HttpConnectorFuture {
463        let mut request = match request.try_into_http1x() {
464            Ok(request) => request,
465            Err(err) => {
466                return HttpConnectorFuture::ready(Err(ConnectorError::user(err.into())));
467            }
468        };
469
470        self.add_proxy_auth_header(&mut request);
471
472        let capture_connection = capture_connection(&mut request);
473        if let Some(capture_smithy_connection) =
474            request.extensions().get::<CaptureSmithyConnection>()
475        {
476            capture_smithy_connection
477                .set_connection_retriever(move || extract_smithy_connection(&capture_connection));
478        }
479        let mut client = self.client.clone();
480        use tower::Service;
481        let fut = client.call(request);
482        HttpConnectorFuture::new(async move {
483            let response = fut
484                .await
485                .map_err(downcast_error)?
486                .map(SdkBody::from_body_1_x);
487            match HttpResponse::try_from(response) {
488                Ok(response) => Ok(response),
489                Err(err) => Err(ConnectorError::other(err.into(), None)),
490            }
491        })
492    }
493}
494
495/// Downcast errors coming out of hyper into an appropriate `ConnectorError`
496fn downcast_error(err: BoxError) -> ConnectorError {
497    // is a `TimedOutError` (from aws_smithy_async::timeout) in the chain? if it is, this is a timeout
498    if find_source::<TimedOutError>(err.as_ref()).is_some() {
499        return ConnectorError::timeout(err);
500    }
501    // is the top of chain error actually already a `ConnectorError`? return that directly
502    let err = match err.downcast::<ConnectorError>() {
503        Ok(connector_error) => return *connector_error,
504        Err(box_error) => box_error,
505    };
506    // generally, the top of chain will probably be a hyper error. Go through a set of hyper specific
507    // error classifications
508    let err = match find_source::<hyper::Error>(err.as_ref()) {
509        Some(hyper_error) => return to_connector_error(hyper_error)(err),
510        None => match find_source::<hyper_util::client::legacy::Error>(err.as_ref()) {
511            Some(hyper_util_err) => {
512                if hyper_util_err.is_connect()
513                    || find_source::<std::io::Error>(hyper_util_err).is_some()
514                {
515                    return ConnectorError::io(err);
516                }
517                err
518            }
519            None => err,
520        },
521    };
522
523    // otherwise, we have no idea!
524    ConnectorError::other(err, None)
525}
526
527/// Convert a [`hyper::Error`] into a [`ConnectorError`]
528fn to_connector_error(err: &hyper::Error) -> fn(BoxError) -> ConnectorError {
529    if err.is_timeout() || find_source::<timeout::HttpTimeoutError>(err).is_some() {
530        return ConnectorError::timeout;
531    }
532    if err.is_user() {
533        return ConnectorError::user;
534    }
535    if err.is_closed() || err.is_canceled() || find_source::<std::io::Error>(err).is_some() {
536        return ConnectorError::io;
537    }
538    // We sometimes receive this from S3: hyper::Error(IncompleteMessage)
539    if err.is_incomplete_message() {
540        return |err: BoxError| ConnectorError::other(err, Some(ErrorKind::TransientError));
541    }
542
543    if let Some(h2_err) = find_source::<h2::Error>(err) {
544        if h2_err.is_go_away()
545            || (h2_err.is_reset() && h2_err.reason() == Some(Reason::REFUSED_STREAM))
546        {
547            return ConnectorError::io;
548        }
549    }
550
551    tracing::warn!(err = %DisplayErrorContext(&err), "unrecognized error from Hyper. If this error should be retried, please file an issue.");
552    |err: BoxError| ConnectorError::other(err, None)
553}
554
555fn find_source<'a, E: Error + 'static>(err: &'a (dyn Error + 'static)) -> Option<&'a E> {
556    let mut next = Some(err);
557    while let Some(err) = next {
558        if let Some(matching_err) = err.downcast_ref::<E>() {
559            return Some(matching_err);
560        }
561        next = err.source();
562    }
563    None
564}
565
566// TODO(https://github.com/awslabs/aws-sdk-rust/issues/1090): CacheKey must also include ptr equality to any
567// runtime components that are used—sleep_impl as a base (unless we prohibit overriding sleep impl)
568// If we decide to put a DnsResolver in RuntimeComponents, then we'll need to handle that as well.
569#[derive(Clone, Debug, Eq, PartialEq, Hash)]
570struct CacheKey {
571    connect_timeout: Option<Duration>,
572    read_timeout: Option<Duration>,
573}
574
575impl From<&HttpConnectorSettings> for CacheKey {
576    fn from(value: &HttpConnectorSettings) -> Self {
577        Self {
578            connect_timeout: value.connect_timeout(),
579            read_timeout: value.read_timeout(),
580        }
581    }
582}
583
584struct HyperClient<F> {
585    connector_cache: RwLock<HashMap<CacheKey, SharedHttpConnector>>,
586    client_builder: hyper_util::client::legacy::Builder,
587    connector_fn: F,
588}
589
590impl<F> fmt::Debug for HyperClient<F> {
591    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
592        f.debug_struct("HyperClient")
593            .field("connector_cache", &self.connector_cache)
594            .field("client_builder", &self.client_builder)
595            .finish()
596    }
597}
598
599impl<F> HttpClient for HyperClient<F>
600where
601    F: Fn(
602            hyper_util::client::legacy::Builder,
603            Option<&HttpConnectorSettings>,
604            Option<&RuntimeComponents>,
605        ) -> Connector
606        + Send
607        + Sync
608        + 'static,
609{
610    fn http_connector(
611        &self,
612        settings: &HttpConnectorSettings,
613        components: &RuntimeComponents,
614    ) -> SharedHttpConnector {
615        let key = CacheKey::from(settings);
616        let mut connector = self.connector_cache.read().unwrap().get(&key).cloned();
617        if connector.is_none() {
618            let mut cache = self.connector_cache.write().unwrap();
619            // Short-circuit if another thread already wrote a connector to the cache for this key
620            if !cache.contains_key(&key) {
621                let start = components.time_source().map(|ts| ts.now());
622                let connector = (self.connector_fn)(
623                    self.client_builder.clone(),
624                    Some(settings),
625                    Some(components),
626                );
627                let end = components.time_source().map(|ts| ts.now());
628                if let (Some(start), Some(end)) = (start, end) {
629                    if let Ok(elapsed) = end.duration_since(start) {
630                        tracing::debug!("new connector created in {:?}", elapsed);
631                    }
632                }
633                let connector = SharedHttpConnector::new(connector);
634                cache.insert(key.clone(), connector);
635            }
636            connector = cache.get(&key).cloned();
637        }
638
639        connector.expect("cache populated above")
640    }
641
642    fn validate_base_client_config(
643        &self,
644        _: &RuntimeComponentsBuilder,
645        _: &ConfigBag,
646    ) -> Result<(), BoxError> {
647        // Initialize the TCP connector at this point so that native certs load
648        // at client initialization time instead of upon first request. We do it
649        // here rather than at construction so that it won't run if this is not
650        // the selected HTTP client for the base config (for example, if this was
651        // the default HTTP client, and it was overridden by a later plugin).
652        let _ = (self.connector_fn)(self.client_builder.clone(), None, None);
653        Ok(())
654    }
655
656    fn connector_metadata(&self) -> Option<ConnectorMetadata> {
657        Some(ConnectorMetadata::new("hyper", Some(Cow::Borrowed("1.x"))))
658    }
659}
660
661/// Builder for a hyper-backed [`HttpClient`] implementation.
662///
663/// This builder can be used to customize the underlying TCP connector used, as well as
664/// hyper client configuration.
665///
666/// # Examples
667///
668/// Construct a Hyper client with the RusTLS TLS implementation.
669/// This can be useful when you want to share a Hyper connector between multiple
670/// generated Smithy clients.
671#[derive(Clone, Default, Debug)]
672pub struct Builder<Tls = TlsUnset> {
673    client_builder: Option<hyper_util::client::legacy::Builder>,
674    #[allow(unused)]
675    tls_provider: Tls,
676}
677
678cfg_tls! {
679    use aws_smithy_runtime_api::client::dns::ResolveDns;
680
681    impl ConnectorBuilder<TlsProviderSelected> {
682        /// Build a [`Connector`] that will use the default DNS resolver implementation.
683        pub fn build(self) -> Connector {
684            let http_connector = self.base_connector();
685            self.build_https(http_connector)
686        }
687
688        /// Configure the TLS context
689        pub fn tls_context(mut self, ctx: TlsContext) -> Self {
690            self.tls.context = ctx;
691            self
692        }
693
694        /// Configure the TLS context
695        pub fn set_tls_context(&mut self, ctx: TlsContext) -> &mut Self {
696            self.tls.context = ctx;
697            self
698        }
699
700        /// Build a [`Connector`] that will use the given DNS resolver implementation.
701        pub fn build_with_resolver<R: ResolveDns + Clone + 'static>(self, resolver: R) -> Connector {
702            use crate::client::dns::HyperUtilResolver;
703            let http_connector = self.base_connector_with_resolver(HyperUtilResolver { resolver });
704            self.build_https(http_connector)
705        }
706
707        fn build_https<R>(self, http_connector: HyperHttpConnector<R>) -> Connector
708        where
709            R: Clone + Send + Sync + 'static,
710            R: tower::Service<hyper_util::client::legacy::connect::dns::Name>,
711            R::Response: Iterator<Item = std::net::SocketAddr>,
712            R::Future: Send,
713            R::Error: Into<Box<dyn Error + Send + Sync>>,
714        {
715            match &self.tls.provider {
716                // TODO(hyper1) - fix cfg_rustls! to allow matching on patterns so we can re-use it and not duplicate these cfg matches everywhere
717                #[cfg(any(
718                    feature = "rustls-aws-lc",
719                    feature = "rustls-aws-lc-fips",
720                    feature = "rustls-ring"
721                ))]
722                tls::Provider::Rustls(crypto_mode) => {
723                    let proxy_config = self.proxy_config.clone()
724                        .unwrap_or_else(proxy::ProxyConfig::disabled);
725
726                    let https_connector = tls::rustls_provider::build_connector::wrap_connector(
727                        http_connector,
728                        crypto_mode.clone(),
729                        &self.tls.context,
730                        proxy_config,
731                    );
732                    self.wrap_connector(https_connector)
733                },
734                #[cfg(feature = "s2n-tls")]
735                tls::Provider::S2nTls  => {
736                    let proxy_config = self.proxy_config.clone()
737                        .unwrap_or_else(proxy::ProxyConfig::disabled);
738
739                    let https_connector = tls::s2n_tls_provider::build_connector::wrap_connector(
740                        http_connector,
741                        &self.tls.context,
742                        proxy_config,
743                    );
744                    self.wrap_connector(https_connector)
745                }
746            }
747        }
748    }
749
750    impl Builder<TlsProviderSelected> {
751        /// Create an HTTPS client with the selected TLS provider.
752        ///
753        /// The trusted certificates will be loaded later when this becomes the selected
754        /// HTTP client for a Smithy client.
755        pub fn build_https(self) -> SharedHttpClient {
756            build_with_conn_fn(
757                self.client_builder,
758                move |client_builder, settings, runtime_components| {
759                    let builder = new_conn_builder(client_builder, settings, runtime_components)
760                        .tls_provider(self.tls_provider.provider.clone())
761                        .tls_context(self.tls_provider.context.clone());
762                    builder.build()
763                },
764            )
765        }
766
767        /// Create an HTTPS client using a custom DNS resolver
768        pub fn build_with_resolver(
769            self,
770            resolver: impl ResolveDns + Clone + 'static,
771        ) -> SharedHttpClient {
772            build_with_conn_fn(
773                self.client_builder,
774                move |client_builder, settings, runtime_components| {
775                    let builder = new_conn_builder(client_builder, settings, runtime_components)
776                        .tls_provider(self.tls_provider.provider.clone())
777                        .tls_context(self.tls_provider.context.clone());
778                    builder.build_with_resolver(resolver.clone())
779                },
780            )
781        }
782
783        /// Configure the TLS context
784        pub fn tls_context(mut self, ctx: TlsContext) -> Self {
785            self.tls_provider.context = ctx;
786            self
787        }
788    }
789}
790
791impl Builder<TlsUnset> {
792    /// Creates a new builder.
793    pub fn new() -> Self {
794        Self::default()
795    }
796
797    /// Returns a [`SharedHttpClient`] that calls the given `connector` function to select an HTTP(S) connector.
798    #[doc(hidden)]
799    pub fn build_with_connector_fn<F>(self, connector_fn: F) -> SharedHttpClient
800    where
801        F: Fn(Option<&HttpConnectorSettings>, Option<&RuntimeComponents>) -> Connector
802            + Send
803            + Sync
804            + 'static,
805    {
806        build_with_conn_fn(
807            self.client_builder,
808            move |_builder, settings, runtime_components| {
809                connector_fn(settings, runtime_components)
810            },
811        )
812    }
813
814    /// Build a new HTTP client without TLS enabled
815    #[doc(hidden)]
816    pub fn build_http(self) -> SharedHttpClient {
817        build_with_conn_fn(
818            self.client_builder,
819            move |client_builder, settings, runtime_components| {
820                let builder = new_conn_builder(client_builder, settings, runtime_components);
821                builder.build_http()
822            },
823        )
824    }
825
826    /// Set the TLS implementation to use
827    pub fn tls_provider(self, provider: tls::Provider) -> Builder<TlsProviderSelected> {
828        Builder {
829            client_builder: self.client_builder,
830            tls_provider: TlsProviderSelected {
831                provider,
832                context: TlsContext::default(),
833            },
834        }
835    }
836}
837
838pub(crate) fn build_with_conn_fn<F>(
839    client_builder: Option<hyper_util::client::legacy::Builder>,
840    connector_fn: F,
841) -> SharedHttpClient
842where
843    F: Fn(
844            hyper_util::client::legacy::Builder,
845            Option<&HttpConnectorSettings>,
846            Option<&RuntimeComponents>,
847        ) -> Connector
848        + Send
849        + Sync
850        + 'static,
851{
852    SharedHttpClient::new(HyperClient {
853        connector_cache: RwLock::new(HashMap::new()),
854        client_builder: client_builder
855            .unwrap_or_else(|| hyper_util::client::legacy::Builder::new(TokioExecutor::new())),
856        connector_fn,
857    })
858}
859
860#[allow(dead_code)]
861pub(crate) fn build_with_tcp_conn_fn<C, F>(
862    client_builder: Option<hyper_util::client::legacy::Builder>,
863    tcp_connector_fn: F,
864) -> SharedHttpClient
865where
866    F: Fn() -> C + Send + Sync + 'static,
867    C: Clone + Send + Sync + 'static,
868    C: tower::Service<Uri>,
869    C::Response: Connection + Read + Write + Send + Sync + Unpin + 'static,
870    C::Future: Unpin + Send + 'static,
871    C::Error: Into<BoxError>,
872    C: Connect,
873{
874    build_with_conn_fn(
875        client_builder,
876        move |client_builder, settings, runtime_components| {
877            let builder = new_conn_builder(client_builder, settings, runtime_components);
878            builder.wrap_connector(tcp_connector_fn())
879        },
880    )
881}
882
883fn new_conn_builder(
884    client_builder: hyper_util::client::legacy::Builder,
885    settings: Option<&HttpConnectorSettings>,
886    runtime_components: Option<&RuntimeComponents>,
887) -> ConnectorBuilder {
888    let mut builder = Connector::builder().hyper_builder(client_builder);
889    builder.set_connector_settings(settings.cloned());
890    if let Some(components) = runtime_components {
891        builder.set_sleep_impl(components.sleep_impl());
892    }
893    builder
894}
895
896#[cfg(test)]
897mod test {
898    use std::io::{Error, ErrorKind};
899    use std::pin::Pin;
900    use std::sync::atomic::{AtomicU32, Ordering};
901    use std::sync::Arc;
902    use std::task::{Context, Poll};
903
904    use crate::client::timeout::test::NeverConnects;
905    use aws_smithy_async::assert_elapsed;
906    use aws_smithy_async::rt::sleep::TokioSleep;
907    use aws_smithy_async::time::SystemTimeSource;
908    use aws_smithy_runtime_api::client::runtime_components::RuntimeComponentsBuilder;
909    use http_1x::Uri;
910    use hyper::rt::ReadBufCursor;
911    use hyper_util::client::legacy::connect::Connected;
912
913    use super::*;
914
915    #[tokio::test]
916    async fn connector_selection() {
917        // Create a client that increments a count every time it creates a new Connector
918        let creation_count = Arc::new(AtomicU32::new(0));
919        let http_client = build_with_tcp_conn_fn(None, {
920            let count = creation_count.clone();
921            move || {
922                count.fetch_add(1, Ordering::Relaxed);
923                NeverConnects
924            }
925        });
926
927        // This configuration should result in 4 separate connectors with different timeout settings
928        let settings = [
929            HttpConnectorSettings::builder()
930                .connect_timeout(Duration::from_secs(3))
931                .build(),
932            HttpConnectorSettings::builder()
933                .read_timeout(Duration::from_secs(3))
934                .build(),
935            HttpConnectorSettings::builder()
936                .connect_timeout(Duration::from_secs(3))
937                .read_timeout(Duration::from_secs(3))
938                .build(),
939            HttpConnectorSettings::builder()
940                .connect_timeout(Duration::from_secs(5))
941                .read_timeout(Duration::from_secs(3))
942                .build(),
943        ];
944
945        // Kick off thousands of parallel tasks that will try to create a connector
946        let components = RuntimeComponentsBuilder::for_tests()
947            .with_time_source(Some(SystemTimeSource::new()))
948            .build()
949            .unwrap();
950        let mut handles = Vec::new();
951        for setting in &settings {
952            for _ in 0..1000 {
953                let client = http_client.clone();
954                handles.push(tokio::spawn({
955                    let setting = setting.clone();
956                    let components = components.clone();
957                    async move {
958                        let _ = client.http_connector(&setting, &components);
959                    }
960                }));
961            }
962        }
963        for handle in handles {
964            handle.await.unwrap();
965        }
966
967        // Verify only 4 connectors were created amidst the chaos
968        assert_eq!(4, creation_count.load(Ordering::Relaxed));
969    }
970
971    #[tokio::test]
972    async fn hyper_io_error() {
973        let connector = TestConnection {
974            inner: HangupStream,
975        };
976        let adapter = Connector::builder().wrap_connector(connector).adapter;
977        let err = adapter
978            .call(HttpRequest::get("https://socket-hangup.com").unwrap())
979            .await
980            .expect_err("socket hangup");
981        assert!(err.is_io(), "unexpected error type: {:?}", err);
982    }
983
984    // ---- machinery to make a Hyper connector that responds with an IO Error
985    #[derive(Clone)]
986    struct HangupStream;
987
988    impl Connection for HangupStream {
989        fn connected(&self) -> Connected {
990            Connected::new()
991        }
992    }
993
994    impl Read for HangupStream {
995        fn poll_read(
996            self: Pin<&mut Self>,
997            _cx: &mut Context<'_>,
998            _buf: ReadBufCursor<'_>,
999        ) -> Poll<std::io::Result<()>> {
1000            Poll::Ready(Err(Error::new(
1001                ErrorKind::ConnectionReset,
1002                "connection reset",
1003            )))
1004        }
1005    }
1006
1007    impl Write for HangupStream {
1008        fn poll_write(
1009            self: Pin<&mut Self>,
1010            _cx: &mut Context<'_>,
1011            _buf: &[u8],
1012        ) -> Poll<Result<usize, Error>> {
1013            Poll::Pending
1014        }
1015
1016        fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
1017            Poll::Pending
1018        }
1019
1020        fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
1021            Poll::Pending
1022        }
1023    }
1024
1025    #[derive(Clone)]
1026    struct TestConnection<T> {
1027        inner: T,
1028    }
1029
1030    impl<T> tower::Service<Uri> for TestConnection<T>
1031    where
1032        T: Clone + Connection,
1033    {
1034        type Response = T;
1035        type Error = BoxError;
1036        type Future = std::future::Ready<Result<Self::Response, Self::Error>>;
1037
1038        fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1039            Poll::Ready(Ok(()))
1040        }
1041
1042        fn call(&mut self, _req: Uri) -> Self::Future {
1043            std::future::ready(Ok(self.inner.clone()))
1044        }
1045    }
1046
1047    #[tokio::test]
1048    async fn http_connect_timeout_works() {
1049        let tcp_connector = NeverConnects::default();
1050        let connector_settings = HttpConnectorSettings::builder()
1051            .connect_timeout(Duration::from_secs(1))
1052            .build();
1053        let hyper = Connector::builder()
1054            .connector_settings(connector_settings)
1055            .sleep_impl(SharedAsyncSleep::new(TokioSleep::new()))
1056            .wrap_connector(tcp_connector)
1057            .adapter;
1058        let now = tokio::time::Instant::now();
1059        tokio::time::pause();
1060        let resp = hyper
1061            .call(HttpRequest::get("https://static-uri.com").unwrap())
1062            .await
1063            .unwrap_err();
1064        assert!(
1065            resp.is_timeout(),
1066            "expected resp.is_timeout() to be true but it was false, resp == {:?}",
1067            resp
1068        );
1069        let message = DisplayErrorContext(&resp).to_string();
1070        let expected = "timeout: client error (Connect): HTTP connect timeout occurred after 1s";
1071        assert!(
1072            message.contains(expected),
1073            "expected '{message}' to contain '{expected}'"
1074        );
1075        assert_elapsed!(now, Duration::from_secs(1));
1076    }
1077
1078    #[tokio::test]
1079    async fn http_read_timeout_works() {
1080        let tcp_connector = crate::client::timeout::test::NeverReplies;
1081        let connector_settings = HttpConnectorSettings::builder()
1082            .connect_timeout(Duration::from_secs(1))
1083            .read_timeout(Duration::from_secs(2))
1084            .build();
1085        let hyper = Connector::builder()
1086            .connector_settings(connector_settings)
1087            .sleep_impl(SharedAsyncSleep::new(TokioSleep::new()))
1088            .wrap_connector(tcp_connector)
1089            .adapter;
1090        let now = tokio::time::Instant::now();
1091        tokio::time::pause();
1092        let err = hyper
1093            .call(HttpRequest::get("https://fake-uri.com").unwrap())
1094            .await
1095            .unwrap_err();
1096        assert!(
1097            err.is_timeout(),
1098            "expected err.is_timeout() to be true but it was false, err == {err:?}",
1099        );
1100        let message = format!("{}", DisplayErrorContext(&err));
1101        let expected = "timeout: HTTP read timeout occurred after 2s";
1102        assert!(
1103            message.contains(expected),
1104            "expected '{message}' to contain '{expected}'"
1105        );
1106        assert_elapsed!(now, Duration::from_secs(2));
1107    }
1108
1109    #[cfg(not(windows))]
1110    #[tokio::test]
1111    async fn connection_refused_works() {
1112        use crate::client::dns::HyperUtilResolver;
1113        use aws_smithy_runtime_api::client::dns::{DnsFuture, ResolveDns};
1114        use std::net::{IpAddr, Ipv4Addr};
1115
1116        #[derive(Debug, Clone, Default)]
1117        struct TestResolver;
1118        impl ResolveDns for TestResolver {
1119            fn resolve_dns<'a>(&'a self, _name: &'a str) -> DnsFuture<'a> {
1120                let localhost_v4 = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
1121                DnsFuture::ready(Ok(vec![localhost_v4]))
1122            }
1123        }
1124
1125        let connector_settings = HttpConnectorSettings::builder()
1126            .connect_timeout(Duration::from_secs(20))
1127            .build();
1128
1129        let resolver = HyperUtilResolver {
1130            resolver: TestResolver,
1131        };
1132        let connector = Connector::builder().base_connector_with_resolver(resolver);
1133
1134        let hyper = Connector::builder()
1135            .connector_settings(connector_settings)
1136            .sleep_impl(SharedAsyncSleep::new(TokioSleep::new()))
1137            .wrap_connector(connector)
1138            .adapter;
1139
1140        let resp = hyper
1141            .call(HttpRequest::get("http://static-uri:50227.com").unwrap())
1142            .await
1143            .unwrap_err();
1144        assert!(
1145            resp.is_io(),
1146            "expected resp.is_io() to be true but it was false, resp == {:?}",
1147            resp
1148        );
1149        let message = DisplayErrorContext(&resp).to_string();
1150        let expected = "Connection refused";
1151        assert!(
1152            message.contains(expected),
1153            "expected '{message}' to contain '{expected}'"
1154        );
1155    }
1156
1157    #[cfg(feature = "s2n-tls")]
1158    #[tokio::test]
1159    async fn s2n_tls_provider() {
1160        // Create an HttpConnector with the s2n-tls provider.
1161        let client = Builder::new()
1162            .tls_provider(tls::Provider::S2nTls)
1163            .build_https();
1164        let connector_settings = HttpConnectorSettings::builder().build();
1165
1166        // HyperClient::http_connector invokes TimeSource::now to determine how long it takes to
1167        // create new HttpConnectors. As such, a real time source must be provided.
1168        let runtime_components = RuntimeComponentsBuilder::for_tests()
1169            .with_time_source(Some(SystemTimeSource::new()))
1170            .build()
1171            .unwrap();
1172
1173        let connector = client.http_connector(&connector_settings, &runtime_components);
1174
1175        // Ensure that s2n-tls is used as the underlying TLS provider when selected.
1176        //
1177        // s2n-tls-hyper will error when given an invalid scheme. Ensure that this error is produced
1178        // from s2n-tls-hyper, and not another TLS provider.
1179        let error = connector
1180            .call(HttpRequest::get("notascheme://amazon.com").unwrap())
1181            .await
1182            .unwrap_err();
1183        let error = error.into_source();
1184        let s2n_error = error
1185            .source()
1186            .unwrap()
1187            .downcast_ref::<s2n_tls_hyper::error::Error>()
1188            .unwrap();
1189        assert!(matches!(
1190            s2n_error,
1191            s2n_tls_hyper::error::Error::InvalidScheme
1192        ));
1193    }
1194}