aws_smithy_http_client/
hyper_legacy.rs

1/*
2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6use crate::hyper_legacy::timeout_middleware::HttpTimeoutError;
7use aws_smithy_async::future::timeout::TimedOutError;
8use aws_smithy_async::rt::sleep::{default_async_sleep, AsyncSleep, SharedAsyncSleep};
9use aws_smithy_runtime_api::box_error::BoxError;
10use aws_smithy_runtime_api::client::connection::CaptureSmithyConnection;
11use aws_smithy_runtime_api::client::connection::ConnectionMetadata;
12use aws_smithy_runtime_api::client::connector_metadata::ConnectorMetadata;
13use aws_smithy_runtime_api::client::http::{
14    HttpClient, HttpConnector, HttpConnectorFuture, HttpConnectorSettings, SharedHttpClient,
15    SharedHttpConnector,
16};
17use aws_smithy_runtime_api::client::orchestrator::{HttpRequest, HttpResponse};
18use aws_smithy_runtime_api::client::result::ConnectorError;
19use aws_smithy_runtime_api::client::runtime_components::{
20    RuntimeComponents, RuntimeComponentsBuilder,
21};
22use aws_smithy_runtime_api::shared::IntoShared;
23use aws_smithy_types::body::SdkBody;
24use aws_smithy_types::config_bag::ConfigBag;
25use aws_smithy_types::error::display::DisplayErrorContext;
26use aws_smithy_types::retry::ErrorKind;
27use h2_0_3::Reason;
28use hyper_0_14::client::connect::{capture_connection, CaptureConnection, Connection, HttpInfo};
29use std::borrow::Cow;
30use std::collections::HashMap;
31use std::error::Error;
32use std::fmt;
33use std::sync::RwLock;
34use std::time::Duration;
35use tokio::io::{AsyncRead, AsyncWrite};
36
37#[cfg(feature = "legacy-rustls-ring")]
38mod default_connector {
39    use aws_smithy_async::rt::sleep::SharedAsyncSleep;
40    use aws_smithy_runtime_api::client::http::HttpConnectorSettings;
41    use legacy_hyper_rustls as hyper_rustls;
42    use legacy_rustls as rustls;
43    use std::sync::LazyLock;
44
45    // Creating a `with_native_roots` HTTP client takes 300ms on OS X. Cache this so that we
46    // don't need to repeatedly incur that cost.
47    pub(crate) static HTTPS_NATIVE_ROOTS: LazyLock<
48        hyper_rustls::HttpsConnector<hyper_0_14::client::HttpConnector>,
49    > = LazyLock::new(default_tls);
50
51    fn default_tls() -> hyper_rustls::HttpsConnector<hyper_0_14::client::HttpConnector> {
52        use legacy_rustls::client::WantsTransparencyPolicyOrClientCert;
53        use legacy_rustls::{ClientConfig, ConfigBuilder, WantsVerifier};
54        use rustls_native_certs;
55        // polyfill with_native_roots from https://docs.rs/hyper-rustls/0.24.2/src/hyper_rustls/config.rs.html#22-70
56        // to use the new rustls_native_certs, since rustls_native_certs 0.6 depends on rustls-pemfile which is deprecated
57        fn with_native_roots(
58            this: ConfigBuilder<ClientConfig, WantsVerifier>,
59        ) -> ConfigBuilder<ClientConfig, WantsTransparencyPolicyOrClientCert> {
60            let mut roots = rustls::RootCertStore::empty();
61            let mut valid_count = 0;
62            let mut invalid_count = 0;
63
64            for cert in
65                rustls_native_certs::load_native_certs().expect("could not load platform certs")
66            {
67                let cert = rustls::Certificate(cert.to_vec());
68                match roots.add(&cert) {
69                    Ok(_) => valid_count += 1,
70                    Err(err) => {
71                        tracing::trace!("invalid cert der {:?}", cert.0);
72                        tracing::debug!("certificate parsing failed: {:?}", err);
73                        invalid_count += 1
74                    }
75                }
76            }
77            tracing::debug!(
78                "with_native_roots processed {} valid and {} invalid certs",
79                valid_count,
80                invalid_count
81            );
82            assert!(!roots.is_empty(), "no CA certificates found");
83
84            this.with_root_certificates(roots)
85        }
86        hyper_rustls::HttpsConnectorBuilder::new()
87               .with_tls_config(
88                with_native_roots(rustls::ClientConfig::builder()
89                    .with_cipher_suites(&[
90                        // TLS1.3 suites
91                        rustls::cipher_suite::TLS13_AES_256_GCM_SHA384,
92                        rustls::cipher_suite::TLS13_AES_128_GCM_SHA256,
93                        // TLS1.2 suites
94                        rustls::cipher_suite::TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
95                        rustls::cipher_suite::TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
96                        rustls::cipher_suite::TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
97                        rustls::cipher_suite::TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
98                        rustls::cipher_suite::TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_SHA256,
99                    ])
100                    .with_safe_default_kx_groups()
101                    .with_safe_default_protocol_versions()
102                    .expect("Error with the TLS configuration. Please file a bug report under https://github.com/smithy-lang/smithy-rs/issues."))
103                    .with_no_client_auth()
104            )
105            .https_or_http()
106            .enable_http1()
107            .enable_http2()
108            .build()
109    }
110
111    pub(super) fn base(
112        settings: &HttpConnectorSettings,
113        sleep: Option<SharedAsyncSleep>,
114    ) -> super::HyperConnectorBuilder {
115        let mut hyper = super::HyperConnector::builder().connector_settings(settings.clone());
116        if let Some(sleep) = sleep {
117            hyper = hyper.sleep_impl(sleep);
118        }
119        hyper
120    }
121
122    /// Return a default HTTPS connector backed by the `rustls` crate.
123    ///
124    /// It requires a minimum TLS version of 1.2.
125    /// It allows you to connect to both `http` and `https` URLs.
126    pub(super) fn https() -> hyper_rustls::HttpsConnector<hyper_0_14::client::HttpConnector> {
127        HTTPS_NATIVE_ROOTS.clone()
128    }
129}
130
131/// Given `HttpConnectorSettings` and an `SharedAsyncSleep`, create a `SharedHttpConnector` from defaults depending on what cargo features are activated.
132pub fn default_connector(
133    settings: &HttpConnectorSettings,
134    sleep: Option<SharedAsyncSleep>,
135) -> Option<SharedHttpConnector> {
136    #[cfg(feature = "legacy-rustls-ring")]
137    {
138        tracing::trace!(settings = ?settings, sleep = ?sleep, "creating a new default connector");
139        let hyper = default_connector::base(settings, sleep).build_https();
140        Some(SharedHttpConnector::new(hyper))
141    }
142    #[cfg(not(feature = "legacy-rustls-ring"))]
143    {
144        tracing::trace!(settings = ?settings, sleep = ?sleep, "no default connector available");
145        None
146    }
147}
148
149/// Creates a hyper-backed HTTPS client from defaults depending on what cargo features are activated.
150pub fn default_client() -> Option<SharedHttpClient> {
151    #[cfg(feature = "legacy-rustls-ring")]
152    {
153        tracing::trace!("creating a new default hyper 0.14.x client");
154        Some(HyperClientBuilder::new().build_https())
155    }
156    #[cfg(not(feature = "legacy-rustls-ring"))]
157    {
158        tracing::trace!("no default connector available");
159        None
160    }
161}
162
163/// [`HttpConnector`] that uses [`hyper_0_14`] to make HTTP requests.
164///
165/// This connector also implements socket connect and read timeouts.
166///
167/// This shouldn't be used directly in most cases.
168/// See the docs on [`HyperClientBuilder`] for examples of how
169/// to customize the Hyper client.
170#[derive(Debug)]
171pub struct HyperConnector {
172    adapter: Box<dyn HttpConnector>,
173}
174
175impl HyperConnector {
176    /// Builder for a Hyper connector.
177    pub fn builder() -> HyperConnectorBuilder {
178        Default::default()
179    }
180}
181
182impl HttpConnector for HyperConnector {
183    fn call(&self, request: HttpRequest) -> HttpConnectorFuture {
184        self.adapter.call(request)
185    }
186}
187
188/// Builder for [`HyperConnector`].
189#[derive(Default, Debug)]
190pub struct HyperConnectorBuilder {
191    connector_settings: Option<HttpConnectorSettings>,
192    sleep_impl: Option<SharedAsyncSleep>,
193    client_builder: Option<hyper_0_14::client::Builder>,
194}
195
196impl HyperConnectorBuilder {
197    /// Create a [`HyperConnector`] from this builder and a given connector.
198    pub fn build<C>(self, tcp_connector: C) -> HyperConnector
199    where
200        C: Clone + Send + Sync + 'static,
201        C: hyper_0_14::service::Service<http_02x::Uri>,
202        C::Response: Connection + AsyncRead + AsyncWrite + Send + Unpin + 'static,
203        C::Future: Unpin + Send + 'static,
204        C::Error: Into<BoxError>,
205    {
206        let client_builder = self.client_builder.unwrap_or_default();
207        let sleep_impl = self.sleep_impl.or_else(default_async_sleep);
208        let (connect_timeout, read_timeout) = self
209            .connector_settings
210            .map(|c| (c.connect_timeout(), c.read_timeout()))
211            .unwrap_or((None, None));
212
213        let connector = match connect_timeout {
214            Some(duration) => timeout_middleware::ConnectTimeout::new(
215                tcp_connector,
216                sleep_impl
217                    .clone()
218                    .expect("a sleep impl must be provided in order to have a connect timeout"),
219                duration,
220            ),
221            None => timeout_middleware::ConnectTimeout::no_timeout(tcp_connector),
222        };
223        let base = client_builder.build(connector);
224        let read_timeout = match read_timeout {
225            Some(duration) => timeout_middleware::HttpReadTimeout::new(
226                base,
227                sleep_impl.expect("a sleep impl must be provided in order to have a read timeout"),
228                duration,
229            ),
230            None => timeout_middleware::HttpReadTimeout::no_timeout(base),
231        };
232        HyperConnector {
233            adapter: Box::new(Adapter {
234                client: read_timeout,
235            }),
236        }
237    }
238
239    /// Create a [`HyperConnector`] with the default rustls HTTPS implementation.
240    #[cfg(feature = "legacy-rustls-ring")]
241    pub fn build_https(self) -> HyperConnector {
242        self.build(default_connector::https())
243    }
244
245    /// Set the async sleep implementation used for timeouts
246    ///
247    /// Calling this is only necessary for testing or to use something other than
248    /// [`default_async_sleep`].
249    pub fn sleep_impl(mut self, sleep_impl: impl AsyncSleep + 'static) -> Self {
250        self.sleep_impl = Some(sleep_impl.into_shared());
251        self
252    }
253
254    /// Set the async sleep implementation used for timeouts
255    ///
256    /// Calling this is only necessary for testing or to use something other than
257    /// [`default_async_sleep`].
258    pub fn set_sleep_impl(&mut self, sleep_impl: Option<SharedAsyncSleep>) -> &mut Self {
259        self.sleep_impl = sleep_impl;
260        self
261    }
262
263    /// Configure the HTTP settings for the `HyperAdapter`
264    pub fn connector_settings(mut self, connector_settings: HttpConnectorSettings) -> Self {
265        self.connector_settings = Some(connector_settings);
266        self
267    }
268
269    /// Configure the HTTP settings for the `HyperAdapter`
270    pub fn set_connector_settings(
271        &mut self,
272        connector_settings: Option<HttpConnectorSettings>,
273    ) -> &mut Self {
274        self.connector_settings = connector_settings;
275        self
276    }
277
278    /// Override the Hyper client [`Builder`](hyper_0_14::client::Builder) used to construct this client.
279    ///
280    /// This enables changing settings like forcing HTTP2 and modifying other default client behavior.
281    pub fn hyper_builder(mut self, hyper_builder: hyper_0_14::client::Builder) -> Self {
282        self.client_builder = Some(hyper_builder);
283        self
284    }
285
286    /// Override the Hyper client [`Builder`](hyper_0_14::client::Builder) used to construct this client.
287    ///
288    /// This enables changing settings like forcing HTTP2 and modifying other default client behavior.
289    pub fn set_hyper_builder(
290        &mut self,
291        hyper_builder: Option<hyper_0_14::client::Builder>,
292    ) -> &mut Self {
293        self.client_builder = hyper_builder;
294        self
295    }
296}
297
298/// Adapter from a [`hyper_0_14::Client`] to [`HttpConnector`].
299///
300/// This adapter also enables TCP `CONNECT` and HTTP `READ` timeouts via [`HyperConnector::builder`].
301struct Adapter<C> {
302    client: timeout_middleware::HttpReadTimeout<
303        hyper_0_14::Client<timeout_middleware::ConnectTimeout<C>, SdkBody>,
304    >,
305}
306
307impl<C> fmt::Debug for Adapter<C> {
308    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
309        f.debug_struct("Adapter")
310            .field("client", &"** hyper client **")
311            .finish()
312    }
313}
314
315/// Extract a smithy connection from a hyper CaptureConnection
316fn extract_smithy_connection(capture_conn: &CaptureConnection) -> Option<ConnectionMetadata> {
317    let capture_conn = capture_conn.clone();
318    if let Some(conn) = capture_conn.clone().connection_metadata().as_ref() {
319        let mut extensions = http_02x::Extensions::new();
320        conn.get_extras(&mut extensions);
321        let http_info = extensions.get::<HttpInfo>();
322        let mut builder = ConnectionMetadata::builder()
323            .proxied(conn.is_proxied())
324            .poison_fn(move || match capture_conn.connection_metadata().as_ref() {
325                Some(conn) => conn.poison(),
326                None => tracing::trace!("no connection existed to poison"),
327            });
328
329        builder
330            .set_local_addr(http_info.map(|info| info.local_addr()))
331            .set_remote_addr(http_info.map(|info| info.remote_addr()));
332
333        let smithy_connection = builder.build();
334
335        Some(smithy_connection)
336    } else {
337        None
338    }
339}
340
341impl<C> HttpConnector for Adapter<C>
342where
343    C: Clone + Send + Sync + 'static,
344    C: hyper_0_14::service::Service<http_02x::Uri>,
345    C::Response: Connection + AsyncRead + AsyncWrite + Send + Unpin + 'static,
346    C::Future: Unpin + Send + 'static,
347    C::Error: Into<BoxError>,
348{
349    fn call(&self, request: HttpRequest) -> HttpConnectorFuture {
350        use hyper_0_14::service::Service;
351
352        let mut request = match request.try_into_http02x() {
353            Ok(request) => request,
354            Err(err) => {
355                return HttpConnectorFuture::ready(Err(ConnectorError::other(err.into(), None)));
356            }
357        };
358        let capture_connection = capture_connection(&mut request);
359        if let Some(capture_smithy_connection) =
360            request.extensions().get::<CaptureSmithyConnection>()
361        {
362            capture_smithy_connection
363                .set_connection_retriever(move || extract_smithy_connection(&capture_connection));
364        }
365        let mut client = self.client.clone();
366        let fut = client.call(request);
367        HttpConnectorFuture::new(async move {
368            let response = fut
369                .await
370                .map_err(downcast_error)?
371                .map(SdkBody::from_body_0_4);
372            match HttpResponse::try_from(response) {
373                Ok(response) => Ok(response),
374                Err(err) => Err(ConnectorError::other(err.into(), None)),
375            }
376        })
377    }
378}
379
380/// Downcast errors coming out of hyper into an appropriate `ConnectorError`
381fn downcast_error(err: BoxError) -> ConnectorError {
382    // is a `TimedOutError` (from aws_smithy_async::timeout) in the chain? if it is, this is a timeout
383    if find_source::<TimedOutError>(err.as_ref()).is_some() {
384        return ConnectorError::timeout(err);
385    }
386    // is the top of chain error actually already a `ConnectorError`? return that directly
387    let err = match err.downcast::<ConnectorError>() {
388        Ok(connector_error) => return *connector_error,
389        Err(box_error) => box_error,
390    };
391    // generally, the top of chain will probably be a hyper error. Go through a set of hyper specific
392    // error classifications
393    let err = match err.downcast::<hyper_0_14::Error>() {
394        Ok(hyper_error) => return to_connector_error(*hyper_error),
395        Err(box_error) => box_error,
396    };
397
398    // otherwise, we have no idea!
399    ConnectorError::other(err, None)
400}
401
402/// Convert a [`hyper_0_14::Error`] into a [`ConnectorError`]
403fn to_connector_error(err: hyper_0_14::Error) -> ConnectorError {
404    if err.is_timeout() || find_source::<HttpTimeoutError>(&err).is_some() {
405        return ConnectorError::timeout(err.into());
406    }
407    if err.is_user() {
408        return ConnectorError::user(err.into());
409    }
410    if err.is_closed() || err.is_canceled() || find_source::<std::io::Error>(&err).is_some() {
411        return ConnectorError::io(err.into());
412    }
413    // We sometimes receive this from S3: hyper::Error(IncompleteMessage)
414    if err.is_incomplete_message() {
415        return ConnectorError::other(err.into(), Some(ErrorKind::TransientError));
416    }
417    if let Some(h2_err) = find_source::<h2_0_3::Error>(&err) {
418        if h2_err.is_go_away()
419            || (h2_err.is_reset() && h2_err.reason() == Some(Reason::REFUSED_STREAM))
420        {
421            return ConnectorError::io(err.into());
422        }
423    }
424
425    tracing::warn!(err = %DisplayErrorContext(&err), "unrecognized error from Hyper. If this error should be retried, please file an issue.");
426    ConnectorError::other(err.into(), None)
427}
428
429fn find_source<'a, E: Error + 'static>(err: &'a (dyn Error + 'static)) -> Option<&'a E> {
430    let mut next = Some(err);
431    while let Some(err) = next {
432        if let Some(matching_err) = err.downcast_ref::<E>() {
433            return Some(matching_err);
434        }
435        next = err.source();
436    }
437    None
438}
439
440#[derive(Clone, Debug, Eq, PartialEq, Hash)]
441struct CacheKey {
442    connect_timeout: Option<Duration>,
443    read_timeout: Option<Duration>,
444}
445
446impl From<&HttpConnectorSettings> for CacheKey {
447    fn from(value: &HttpConnectorSettings) -> Self {
448        Self {
449            connect_timeout: value.connect_timeout(),
450            read_timeout: value.read_timeout(),
451        }
452    }
453}
454
455struct HyperClient<F> {
456    connector_cache: RwLock<HashMap<CacheKey, SharedHttpConnector>>,
457    client_builder: hyper_0_14::client::Builder,
458    tcp_connector_fn: F,
459}
460
461impl<F> fmt::Debug for HyperClient<F> {
462    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
463        f.debug_struct("HyperClient")
464            .field("connector_cache", &self.connector_cache)
465            .field("client_builder", &self.client_builder)
466            .finish()
467    }
468}
469
470impl<C, F> HttpClient for HyperClient<F>
471where
472    F: Fn() -> C + Send + Sync,
473    C: Clone + Send + Sync + 'static,
474    C: hyper_0_14::service::Service<http_02x::Uri>,
475    C::Response: Connection + AsyncRead + AsyncWrite + Send + Unpin + 'static,
476    C::Future: Unpin + Send + 'static,
477    C::Error: Into<BoxError>,
478{
479    fn http_connector(
480        &self,
481        settings: &HttpConnectorSettings,
482        components: &RuntimeComponents,
483    ) -> SharedHttpConnector {
484        let key = CacheKey::from(settings);
485        let mut connector = self.connector_cache.read().unwrap().get(&key).cloned();
486        if connector.is_none() {
487            let mut cache = self.connector_cache.write().unwrap();
488            // Short-circuit if another thread already wrote a connector to the cache for this key
489            if !cache.contains_key(&key) {
490                let mut builder = HyperConnector::builder()
491                    .hyper_builder(self.client_builder.clone())
492                    .connector_settings(settings.clone());
493                builder.set_sleep_impl(components.sleep_impl());
494
495                let start = components.time_source().map(|ts| ts.now());
496                let tcp_connector = (self.tcp_connector_fn)();
497                let end = components.time_source().map(|ts| ts.now());
498                if let (Some(start), Some(end)) = (start, end) {
499                    if let Ok(elapsed) = end.duration_since(start) {
500                        tracing::debug!("new TCP connector created in {:?}", elapsed);
501                    }
502                }
503                let connector = SharedHttpConnector::new(builder.build(tcp_connector));
504                cache.insert(key.clone(), connector);
505            }
506            connector = cache.get(&key).cloned();
507        }
508
509        connector.expect("cache populated above")
510    }
511
512    fn validate_base_client_config(
513        &self,
514        _: &RuntimeComponentsBuilder,
515        _: &ConfigBag,
516    ) -> Result<(), BoxError> {
517        // Initialize the TCP connector at this point so that native certs load
518        // at client initialization time instead of upon first request. We do it
519        // here rather than at construction so that it won't run if this is not
520        // the selected HTTP client for the base config (for example, if this was
521        // the default HTTP client, and it was overridden by a later plugin).
522        let _ = (self.tcp_connector_fn)();
523        Ok(())
524    }
525
526    fn connector_metadata(&self) -> Option<ConnectorMetadata> {
527        Some(ConnectorMetadata::new("hyper", Some(Cow::Borrowed("0.x"))))
528    }
529}
530
531/// Builder for a hyper-backed [`HttpClient`] implementation.
532///
533/// This builder can be used to customize the underlying TCP connector used, as well as
534/// hyper client configuration.
535///
536/// # Examples
537///
538/// Construct a Hyper client with the default TLS implementation (rustls).
539/// This can be useful when you want to share a Hyper connector between multiple
540/// generated Smithy clients.
541///
542/// ```no_run,ignore
543/// use aws_smithy_http_client::hyper_014::HyperClientBuilder;
544///
545/// let http_client = HyperClientBuilder::new().build_https();
546///
547/// // This connector can then be given to a generated service Config
548/// let config = my_service_client::Config::builder()
549///     .endpoint_url("http://localhost:1234")
550///     .http_client(http_client)
551///     .build();
552/// let client = my_service_client::Client::from_conf(config);
553/// ```
554///
555/// ## Use a Hyper client with WebPKI roots
556///
557/// A use case for where you may want to use the [`HyperClientBuilder`] is when
558/// setting Hyper client settings that aren't otherwise exposed by the `Config`
559/// builder interface. Some examples include changing:
560///
561/// - Hyper client settings
562/// - Allowed TLS cipher suites
563/// - Using an alternative TLS connector library (not the default, rustls)
564/// - CA trust root certificates (illustrated using WebPKI below)
565///
566/// ```no_run,ignore
567/// use aws_smithy_http_client::hyper_014::HyperClientBuilder;
568///
569/// let https_connector = hyper_rustls::HttpsConnectorBuilder::new()
570///     .with_webpki_roots()
571///     .https_only()
572///     .enable_http1()
573///     .enable_http2()
574///     .build();
575/// let http_client = HyperClientBuilder::new().build(https_connector);
576///
577/// // This connector can then be given to a generated service Config
578/// let config = my_service_client::Config::builder()
579///     .endpoint_url("https://example.com")
580///     .http_client(http_client)
581///     .build();
582/// let client = my_service_client::Client::from_conf(config);
583/// ```
584#[derive(Clone, Default, Debug)]
585pub struct HyperClientBuilder {
586    client_builder: Option<hyper_0_14::client::Builder>,
587}
588
589impl HyperClientBuilder {
590    /// Creates a new builder.
591    pub fn new() -> Self {
592        Self::default()
593    }
594
595    /// Override the Hyper client [`Builder`](hyper_0_14::client::Builder) used to construct this client.
596    ///
597    /// This enables changing settings like forcing HTTP2 and modifying other default client behavior.
598    pub fn hyper_builder(mut self, hyper_builder: hyper_0_14::client::Builder) -> Self {
599        self.client_builder = Some(hyper_builder);
600        self
601    }
602
603    /// Override the Hyper client [`Builder`](hyper_0_14::client::Builder) used to construct this client.
604    ///
605    /// This enables changing settings like forcing HTTP2 and modifying other default client behavior.
606    pub fn set_hyper_builder(
607        &mut self,
608        hyper_builder: Option<hyper_0_14::client::Builder>,
609    ) -> &mut Self {
610        self.client_builder = hyper_builder;
611        self
612    }
613
614    /// Create a hyper client with the default rustls HTTPS implementation.
615    ///
616    /// The trusted certificates will be loaded later when this becomes the selected
617    /// HTTP client for a Smithy client.
618    #[cfg(feature = "legacy-rustls-ring")]
619    pub fn build_https(self) -> SharedHttpClient {
620        self.build_with_fn(default_connector::https)
621    }
622
623    /// Create a [`SharedHttpClient`] from this builder and a given connector.
624    ///
625    #[cfg_attr(
626        feature = "legacy-rustls-ring",
627        doc = "Use [`build_https`](HyperClientBuilder::build_https) if you don't want to provide a custom TCP connector."
628    )]
629    pub fn build<C>(self, tcp_connector: C) -> SharedHttpClient
630    where
631        C: Clone + Send + Sync + 'static,
632        C: hyper_0_14::service::Service<http_02x::Uri>,
633        C::Response: Connection + AsyncRead + AsyncWrite + Send + Unpin + 'static,
634        C::Future: Unpin + Send + 'static,
635        C::Error: Into<BoxError>,
636    {
637        self.build_with_fn(move || tcp_connector.clone())
638    }
639
640    fn build_with_fn<C, F>(self, tcp_connector_fn: F) -> SharedHttpClient
641    where
642        F: Fn() -> C + Send + Sync + 'static,
643        C: Clone + Send + Sync + 'static,
644        C: hyper_0_14::service::Service<http_02x::Uri>,
645        C::Response: Connection + AsyncRead + AsyncWrite + Send + Unpin + 'static,
646        C::Future: Unpin + Send + 'static,
647        C::Error: Into<BoxError>,
648    {
649        SharedHttpClient::new(HyperClient {
650            connector_cache: RwLock::new(HashMap::new()),
651            client_builder: self.client_builder.unwrap_or_default(),
652            tcp_connector_fn,
653        })
654    }
655}
656
657mod timeout_middleware {
658    use aws_smithy_async::future::timeout::{TimedOutError, Timeout};
659    use aws_smithy_async::rt::sleep::Sleep;
660    use aws_smithy_async::rt::sleep::{AsyncSleep, SharedAsyncSleep};
661    use aws_smithy_runtime_api::box_error::BoxError;
662    use pin_project_lite::pin_project;
663    use std::error::Error;
664    use std::fmt::Formatter;
665    use std::future::Future;
666    use std::pin::Pin;
667    use std::task::{Context, Poll};
668    use std::time::Duration;
669
670    #[derive(Debug)]
671    pub(crate) struct HttpTimeoutError {
672        kind: &'static str,
673        duration: Duration,
674    }
675
676    impl std::fmt::Display for HttpTimeoutError {
677        fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
678            write!(
679                f,
680                "{} timeout occurred after {:?}",
681                self.kind, self.duration
682            )
683        }
684    }
685
686    impl Error for HttpTimeoutError {
687        // We implement the `source` function as returning a `TimedOutError` because when `downcast_error`
688        // or `find_source` is called with an `HttpTimeoutError` (or another error wrapping an `HttpTimeoutError`)
689        // this method will be checked to determine if it's a timeout-related error.
690        fn source(&self) -> Option<&(dyn Error + 'static)> {
691            Some(&TimedOutError)
692        }
693    }
694
695    /// Timeout wrapper that will timeout on the initial TCP connection
696    ///
697    /// # Stability
698    /// This interface is unstable.
699    #[derive(Clone, Debug)]
700    pub(super) struct ConnectTimeout<I> {
701        inner: I,
702        timeout: Option<(SharedAsyncSleep, Duration)>,
703    }
704
705    impl<I> ConnectTimeout<I> {
706        /// Create a new `ConnectTimeout` around `inner`.
707        ///
708        /// Typically, `I` will implement [`hyper_0_14::client::connect::Connect`].
709        pub(crate) fn new(inner: I, sleep: SharedAsyncSleep, timeout: Duration) -> Self {
710            Self {
711                inner,
712                timeout: Some((sleep, timeout)),
713            }
714        }
715
716        pub(crate) fn no_timeout(inner: I) -> Self {
717            Self {
718                inner,
719                timeout: None,
720            }
721        }
722    }
723
724    #[derive(Clone, Debug)]
725    pub(crate) struct HttpReadTimeout<I> {
726        inner: I,
727        timeout: Option<(SharedAsyncSleep, Duration)>,
728    }
729
730    impl<I> HttpReadTimeout<I> {
731        /// Create a new `HttpReadTimeout` around `inner`.
732        ///
733        /// Typically, `I` will implement [`hyper_0_14::service::Service<http::Request<SdkBody>>`].
734        pub(crate) fn new(inner: I, sleep: SharedAsyncSleep, timeout: Duration) -> Self {
735            Self {
736                inner,
737                timeout: Some((sleep, timeout)),
738            }
739        }
740
741        pub(crate) fn no_timeout(inner: I) -> Self {
742            Self {
743                inner,
744                timeout: None,
745            }
746        }
747    }
748
749    pin_project! {
750        /// Timeout future for Tower services
751        ///
752        /// Timeout future to handle timing out, mapping errors, and the possibility of not timing out
753        /// without incurring an additional allocation for each timeout layer.
754        #[project = MaybeTimeoutFutureProj]
755        pub enum MaybeTimeoutFuture<F> {
756            Timeout {
757                #[pin]
758                timeout: Timeout<F, Sleep>,
759                error_type: &'static str,
760                duration: Duration,
761            },
762            NoTimeout {
763                #[pin]
764                future: F
765            }
766        }
767    }
768
769    impl<F, T, E> Future for MaybeTimeoutFuture<F>
770    where
771        F: Future<Output = Result<T, E>>,
772        E: Into<BoxError>,
773    {
774        type Output = Result<T, BoxError>;
775
776        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
777            let (timeout_future, kind, &mut duration) = match self.project() {
778                MaybeTimeoutFutureProj::NoTimeout { future } => {
779                    return future.poll(cx).map_err(|err| err.into());
780                }
781                MaybeTimeoutFutureProj::Timeout {
782                    timeout,
783                    error_type,
784                    duration,
785                } => (timeout, error_type, duration),
786            };
787            match timeout_future.poll(cx) {
788                Poll::Ready(Ok(response)) => Poll::Ready(response.map_err(|err| err.into())),
789                Poll::Ready(Err(_timeout)) => {
790                    Poll::Ready(Err(HttpTimeoutError { kind, duration }.into()))
791                }
792                Poll::Pending => Poll::Pending,
793            }
794        }
795    }
796
797    impl<I> hyper_0_14::service::Service<http_02x::Uri> for ConnectTimeout<I>
798    where
799        I: hyper_0_14::service::Service<http_02x::Uri>,
800        I::Error: Into<BoxError>,
801    {
802        type Response = I::Response;
803        type Error = BoxError;
804        type Future = MaybeTimeoutFuture<I::Future>;
805
806        fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
807            self.inner.poll_ready(cx).map_err(|err| err.into())
808        }
809
810        fn call(&mut self, req: http_02x::Uri) -> Self::Future {
811            match &self.timeout {
812                Some((sleep, duration)) => {
813                    let sleep = sleep.sleep(*duration);
814                    MaybeTimeoutFuture::Timeout {
815                        timeout: Timeout::new(self.inner.call(req), sleep),
816                        error_type: "HTTP connect",
817                        duration: *duration,
818                    }
819                }
820                None => MaybeTimeoutFuture::NoTimeout {
821                    future: self.inner.call(req),
822                },
823            }
824        }
825    }
826
827    impl<I, B> hyper_0_14::service::Service<http_02x::Request<B>> for HttpReadTimeout<I>
828    where
829        I: hyper_0_14::service::Service<http_02x::Request<B>, Error = hyper_0_14::Error>,
830    {
831        type Response = I::Response;
832        type Error = BoxError;
833        type Future = MaybeTimeoutFuture<I::Future>;
834
835        fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
836            self.inner.poll_ready(cx).map_err(|err| err.into())
837        }
838
839        fn call(&mut self, req: http_02x::Request<B>) -> Self::Future {
840            match &self.timeout {
841                Some((sleep, duration)) => {
842                    let sleep = sleep.sleep(*duration);
843                    MaybeTimeoutFuture::Timeout {
844                        timeout: Timeout::new(self.inner.call(req), sleep),
845                        error_type: "HTTP read",
846                        duration: *duration,
847                    }
848                }
849                None => MaybeTimeoutFuture::NoTimeout {
850                    future: self.inner.call(req),
851                },
852            }
853        }
854    }
855
856    #[cfg(test)]
857    pub(crate) mod test {
858        use crate::hyper_014::HyperConnector;
859        use aws_smithy_async::assert_elapsed;
860        use aws_smithy_async::future::never::Never;
861        use aws_smithy_async::rt::sleep::{SharedAsyncSleep, TokioSleep};
862        use aws_smithy_runtime_api::box_error::BoxError;
863        use aws_smithy_runtime_api::client::http::HttpConnectorSettings;
864        use aws_smithy_runtime_api::client::orchestrator::HttpRequest;
865        use aws_smithy_runtime_api::client::result::ConnectorError;
866        use aws_smithy_types::error::display::DisplayErrorContext;
867        use hyper_0_14::client::connect::{Connected, Connection};
868        use std::future::Future;
869        use std::pin::Pin;
870        use std::task::{Context, Poll};
871        use std::time::Duration;
872        use tokio::io::ReadBuf;
873        use tokio::io::{AsyncRead, AsyncWrite};
874        use tokio::net::TcpStream;
875
876        #[allow(unused)]
877        fn connect_timeout_is_correct<T: Send + Sync + Clone + 'static>() {
878            is_send_sync::<super::ConnectTimeout<T>>();
879        }
880
881        #[allow(unused)]
882        fn is_send_sync<T: Send + Sync>() {}
883
884        /// A service that will never return whatever it is you want
885        ///
886        /// Returned futures will return Pending forever
887        #[non_exhaustive]
888        #[derive(Clone, Default, Debug)]
889        pub(crate) struct NeverConnects;
890        impl hyper_0_14::service::Service<http_02x::Uri> for NeverConnects {
891            type Response = TcpStream;
892            type Error = ConnectorError;
893            type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
894
895            fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
896                Poll::Ready(Ok(()))
897            }
898
899            fn call(&mut self, _uri: http_02x::Uri) -> Self::Future {
900                Box::pin(async move {
901                    Never::new().await;
902                    unreachable!()
903                })
904            }
905        }
906
907        /// A service that will connect but never send any data
908        #[derive(Clone, Debug, Default)]
909        struct NeverReplies;
910        impl hyper_0_14::service::Service<http_02x::Uri> for NeverReplies {
911            type Response = EmptyStream;
912            type Error = BoxError;
913            type Future = std::future::Ready<Result<Self::Response, Self::Error>>;
914
915            fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
916                Poll::Ready(Ok(()))
917            }
918
919            fn call(&mut self, _req: http_02x::Uri) -> Self::Future {
920                std::future::ready(Ok(EmptyStream))
921            }
922        }
923
924        /// A stream that will never return or accept any data
925        #[non_exhaustive]
926        #[derive(Debug, Default)]
927        struct EmptyStream;
928        impl AsyncRead for EmptyStream {
929            fn poll_read(
930                self: Pin<&mut Self>,
931                _cx: &mut Context<'_>,
932                _buf: &mut ReadBuf<'_>,
933            ) -> Poll<std::io::Result<()>> {
934                Poll::Pending
935            }
936        }
937        impl AsyncWrite for EmptyStream {
938            fn poll_write(
939                self: Pin<&mut Self>,
940                _cx: &mut Context<'_>,
941                _buf: &[u8],
942            ) -> Poll<Result<usize, std::io::Error>> {
943                Poll::Pending
944            }
945
946            fn poll_flush(
947                self: Pin<&mut Self>,
948                _cx: &mut Context<'_>,
949            ) -> Poll<Result<(), std::io::Error>> {
950                Poll::Pending
951            }
952
953            fn poll_shutdown(
954                self: Pin<&mut Self>,
955                _cx: &mut Context<'_>,
956            ) -> Poll<Result<(), std::io::Error>> {
957                Poll::Pending
958            }
959        }
960        impl Connection for EmptyStream {
961            fn connected(&self) -> Connected {
962                Connected::new()
963            }
964        }
965
966        #[tokio::test]
967        async fn http_connect_timeout_works() {
968            let tcp_connector = NeverConnects::default();
969            let connector_settings = HttpConnectorSettings::builder()
970                .connect_timeout(Duration::from_secs(1))
971                .build();
972            let hyper = HyperConnector::builder()
973                .connector_settings(connector_settings)
974                .sleep_impl(SharedAsyncSleep::new(TokioSleep::new()))
975                .build(tcp_connector)
976                .adapter;
977            let now = tokio::time::Instant::now();
978            tokio::time::pause();
979            let resp = hyper
980                .call(HttpRequest::get("https://static-uri.com").unwrap())
981                .await
982                .unwrap_err();
983            assert!(
984                resp.is_timeout(),
985                "expected resp.is_timeout() to be true but it was false, resp == {:?}",
986                resp
987            );
988            let message = DisplayErrorContext(&resp).to_string();
989            let expected =
990                "timeout: error trying to connect: HTTP connect timeout occurred after 1s";
991            assert!(
992                message.contains(expected),
993                "expected '{message}' to contain '{expected}'"
994            );
995            assert_elapsed!(now, Duration::from_secs(1));
996        }
997
998        #[tokio::test]
999        async fn http_read_timeout_works() {
1000            let tcp_connector = NeverReplies;
1001            let connector_settings = HttpConnectorSettings::builder()
1002                .connect_timeout(Duration::from_secs(1))
1003                .read_timeout(Duration::from_secs(2))
1004                .build();
1005            let hyper = HyperConnector::builder()
1006                .connector_settings(connector_settings)
1007                .sleep_impl(SharedAsyncSleep::new(TokioSleep::new()))
1008                .build(tcp_connector)
1009                .adapter;
1010            let now = tokio::time::Instant::now();
1011            tokio::time::pause();
1012            let err = hyper
1013                .call(HttpRequest::get("https://fake-uri.com").unwrap())
1014                .await
1015                .unwrap_err();
1016            assert!(
1017                err.is_timeout(),
1018                "expected err.is_timeout() to be true but it was false, err == {err:?}",
1019            );
1020            let message = format!("{}", DisplayErrorContext(&err));
1021            let expected = "timeout: HTTP read timeout occurred after 2s";
1022            assert!(
1023                message.contains(expected),
1024                "expected '{message}' to contain '{expected}'"
1025            );
1026            assert_elapsed!(now, Duration::from_secs(2));
1027        }
1028    }
1029}
1030
1031#[cfg(test)]
1032mod test {
1033    use crate::hyper_legacy::timeout_middleware::test::NeverConnects;
1034    use crate::hyper_legacy::{HyperClientBuilder, HyperConnector};
1035    use aws_smithy_async::time::SystemTimeSource;
1036    use aws_smithy_runtime_api::box_error::BoxError;
1037    use aws_smithy_runtime_api::client::http::{HttpClient, HttpConnectorSettings};
1038    use aws_smithy_runtime_api::client::orchestrator::HttpRequest;
1039    use aws_smithy_runtime_api::client::runtime_components::RuntimeComponentsBuilder;
1040    use hyper_0_14::client::connect::{Connected, Connection};
1041    use std::io::{Error, ErrorKind};
1042    use std::pin::Pin;
1043    use std::sync::atomic::{AtomicU32, Ordering};
1044    use std::sync::Arc;
1045    use std::task::{Context, Poll};
1046    use std::time::Duration;
1047    use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
1048
1049    #[tokio::test]
1050    async fn connector_selection() {
1051        // Create a client that increments a count every time it creates a new HyperConnector
1052        let creation_count = Arc::new(AtomicU32::new(0));
1053        let http_client = HyperClientBuilder::new().build_with_fn({
1054            let count = creation_count.clone();
1055            move || {
1056                count.fetch_add(1, Ordering::Relaxed);
1057                NeverConnects::default()
1058            }
1059        });
1060
1061        // This configuration should result in 4 separate connectors with different timeout settings
1062        let settings = [
1063            HttpConnectorSettings::builder()
1064                .connect_timeout(Duration::from_secs(3))
1065                .build(),
1066            HttpConnectorSettings::builder()
1067                .read_timeout(Duration::from_secs(3))
1068                .build(),
1069            HttpConnectorSettings::builder()
1070                .connect_timeout(Duration::from_secs(3))
1071                .read_timeout(Duration::from_secs(3))
1072                .build(),
1073            HttpConnectorSettings::builder()
1074                .connect_timeout(Duration::from_secs(5))
1075                .read_timeout(Duration::from_secs(3))
1076                .build(),
1077        ];
1078
1079        // Kick off thousands of parallel tasks that will try to create a connector
1080        let components = RuntimeComponentsBuilder::for_tests()
1081            .with_time_source(Some(SystemTimeSource::new()))
1082            .build()
1083            .unwrap();
1084        let mut handles = Vec::new();
1085        for setting in &settings {
1086            for _ in 0..1000 {
1087                let client = http_client.clone();
1088                handles.push(tokio::spawn({
1089                    let setting = setting.clone();
1090                    let components = components.clone();
1091                    async move {
1092                        let _ = client.http_connector(&setting, &components);
1093                    }
1094                }));
1095            }
1096        }
1097        for handle in handles {
1098            handle.await.unwrap();
1099        }
1100
1101        // Verify only 4 connectors were created amidst the chaos
1102        assert_eq!(4, creation_count.load(Ordering::Relaxed));
1103    }
1104
1105    #[tokio::test]
1106    async fn hyper_io_error() {
1107        let connector = TestConnection {
1108            inner: HangupStream,
1109        };
1110        let adapter = HyperConnector::builder().build(connector).adapter;
1111        let err = adapter
1112            .call(HttpRequest::get("https://socket-hangup.com").unwrap())
1113            .await
1114            .expect_err("socket hangup");
1115        assert!(err.is_io(), "{:?}", err);
1116    }
1117
1118    // ---- machinery to make a Hyper connector that responds with an IO Error
1119    #[derive(Clone)]
1120    struct HangupStream;
1121
1122    impl Connection for HangupStream {
1123        fn connected(&self) -> Connected {
1124            Connected::new()
1125        }
1126    }
1127
1128    impl AsyncRead for HangupStream {
1129        fn poll_read(
1130            self: Pin<&mut Self>,
1131            _cx: &mut Context<'_>,
1132            _buf: &mut ReadBuf<'_>,
1133        ) -> Poll<std::io::Result<()>> {
1134            Poll::Ready(Err(Error::new(
1135                ErrorKind::ConnectionReset,
1136                "connection reset",
1137            )))
1138        }
1139    }
1140
1141    impl AsyncWrite for HangupStream {
1142        fn poll_write(
1143            self: Pin<&mut Self>,
1144            _cx: &mut Context<'_>,
1145            _buf: &[u8],
1146        ) -> Poll<Result<usize, Error>> {
1147            Poll::Pending
1148        }
1149
1150        fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
1151            Poll::Pending
1152        }
1153
1154        fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
1155            Poll::Pending
1156        }
1157    }
1158
1159    #[derive(Clone)]
1160    struct TestConnection<T> {
1161        inner: T,
1162    }
1163
1164    impl<T> hyper_0_14::service::Service<http_02x::Uri> for TestConnection<T>
1165    where
1166        T: Clone + Connection,
1167    {
1168        type Response = T;
1169        type Error = BoxError;
1170        type Future = std::future::Ready<Result<Self::Response, Self::Error>>;
1171
1172        fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1173            Poll::Ready(Ok(()))
1174        }
1175
1176        fn call(&mut self, _req: http_02x::Uri) -> Self::Future {
1177            std::future::ready(Ok(self.inner.clone()))
1178        }
1179    }
1180}