1mod dns;
7pub mod proxy;
9mod timeout;
10pub mod tls;
12
13pub(crate) mod connect;
14
15use crate::cfg::cfg_tls;
16use crate::tls::TlsContext;
17use aws_smithy_async::future::timeout::TimedOutError;
18use aws_smithy_async::rt::sleep::{default_async_sleep, AsyncSleep, SharedAsyncSleep};
19use aws_smithy_runtime_api::box_error::BoxError;
20use aws_smithy_runtime_api::client::connection::CaptureSmithyConnection;
21use aws_smithy_runtime_api::client::connection::ConnectionMetadata;
22use aws_smithy_runtime_api::client::connector_metadata::ConnectorMetadata;
23use aws_smithy_runtime_api::client::http::{
24 HttpClient, HttpConnector, HttpConnectorFuture, HttpConnectorSettings, SharedHttpClient,
25 SharedHttpConnector,
26};
27use aws_smithy_runtime_api::client::orchestrator::{HttpRequest, HttpResponse};
28use aws_smithy_runtime_api::client::result::ConnectorError;
29use aws_smithy_runtime_api::client::runtime_components::{
30 RuntimeComponents, RuntimeComponentsBuilder,
31};
32use aws_smithy_runtime_api::shared::IntoShared;
33use aws_smithy_types::body::SdkBody;
34use aws_smithy_types::config_bag::ConfigBag;
35use aws_smithy_types::error::display::DisplayErrorContext;
36use aws_smithy_types::retry::ErrorKind;
37use client::connect::Connection;
38use h2::Reason;
39use http_1x::{Extensions, Uri};
40use hyper::rt::{Read, Write};
41use hyper_util::client::legacy as client;
42use hyper_util::client::legacy::connect::dns::GaiResolver;
43use hyper_util::client::legacy::connect::{
44 capture_connection, CaptureConnection, Connect, HttpConnector as HyperHttpConnector, HttpInfo,
45};
46use hyper_util::client::proxy::matcher::Matcher;
47use hyper_util::rt::{TokioExecutor, TokioTimer};
48use std::borrow::Cow;
49use std::collections::HashMap;
50use std::error::Error;
51use std::fmt;
52use std::sync::RwLock;
53use std::time::Duration;
54
55pub fn default_connector(
57 settings: &HttpConnectorSettings,
58 sleep: Option<SharedAsyncSleep>,
59) -> Option<SharedHttpConnector> {
60 #[cfg(feature = "rustls-aws-lc")]
61 {
62 tracing::trace!(settings = ?settings, sleep = ?sleep, "creating a new default connector");
63 let mut conn_builder = Connector::builder().connector_settings(settings.clone());
64
65 if let Some(sleep) = sleep {
66 conn_builder = conn_builder.sleep_impl(sleep);
67 }
68
69 let conn = conn_builder
70 .tls_provider(tls::Provider::Rustls(
71 tls::rustls_provider::CryptoMode::AwsLc,
72 ))
73 .build();
74 Some(SharedHttpConnector::new(conn))
75 }
76 #[cfg(not(feature = "rustls-aws-lc"))]
77 {
78 tracing::trace!(settings = ?settings, sleep = ?sleep, "no default connector available");
79 None
80 }
81}
82
83#[derive(Debug)]
90pub struct Connector {
91 adapter: Box<dyn HttpConnector>,
92}
93
94impl Connector {
95 pub fn builder() -> ConnectorBuilder {
97 ConnectorBuilder {
98 enable_tcp_nodelay: true,
99 ..Default::default()
100 }
101 }
102}
103
104impl HttpConnector for Connector {
105 fn call(&self, request: HttpRequest) -> HttpConnectorFuture {
106 self.adapter.call(request)
107 }
108}
109
110#[derive(Default, Debug, Clone)]
112pub struct ConnectorBuilder<Tls = TlsUnset> {
113 connector_settings: Option<HttpConnectorSettings>,
114 sleep_impl: Option<SharedAsyncSleep>,
115 client_builder: Option<hyper_util::client::legacy::Builder>,
116 pool_idle_timeout: Option<Option<Duration>>,
117 enable_tcp_nodelay: bool,
118 interface: Option<String>,
119 proxy_config: Option<proxy::ProxyConfig>,
120 #[allow(unused)]
121 tls: Tls,
122}
123
124#[derive(Default, Debug, Clone)]
126#[non_exhaustive]
127pub struct TlsUnset {}
128
129#[derive(Debug, Clone)]
131pub struct TlsProviderSelected {
132 #[allow(unused)]
133 provider: tls::Provider,
134 #[allow(unused)]
135 context: TlsContext,
136}
137
138impl ConnectorBuilder<TlsUnset> {
139 pub fn tls_provider(self, provider: tls::Provider) -> ConnectorBuilder<TlsProviderSelected> {
141 ConnectorBuilder {
142 connector_settings: self.connector_settings,
143 sleep_impl: self.sleep_impl,
144 client_builder: self.client_builder,
145 enable_tcp_nodelay: self.enable_tcp_nodelay,
146 interface: self.interface,
147 proxy_config: self.proxy_config,
148 pool_idle_timeout: self.pool_idle_timeout,
149 tls: TlsProviderSelected {
150 provider,
151 context: TlsContext::default(),
152 },
153 }
154 }
155
156 #[doc(hidden)]
158 pub fn build_http(self) -> Connector {
159 if let Some(ref proxy_config) = self.proxy_config {
160 if proxy_config.requires_tls() {
161 tracing::warn!(
162 "HTTPS proxy configured but no TLS provider set. \
163 Connections to HTTPS proxy servers will fail. \
164 Consider configuring a TLS provider to enable TLS support."
165 );
166 }
167 }
168
169 let base = self.base_connector();
170
171 let proxy_config = self
173 .proxy_config
174 .clone()
175 .unwrap_or_else(proxy::ProxyConfig::disabled);
176
177 if !proxy_config.is_disabled() {
178 let http_proxy_connector = connect::HttpProxyConnector::new(base, proxy_config);
179 self.wrap_connector(http_proxy_connector)
180 } else {
181 self.wrap_connector(base)
182 }
183 }
184}
185
186impl<Any> ConnectorBuilder<Any> {
187 pub(crate) fn wrap_connector<C>(self, tcp_connector: C) -> Connector
189 where
190 C: Send + Sync + 'static,
191 C: Clone,
192 C: tower::Service<Uri>,
193 C::Response: Read + Write + Connection + Send + Sync + Unpin,
194 C: Connect,
195 C::Future: Unpin + Send + 'static,
196 C::Error: Into<BoxError>,
197 {
198 let client_builder = self
199 .client_builder
200 .unwrap_or_else(|| new_tokio_hyper_builder(self.pool_idle_timeout));
201 let sleep_impl = self.sleep_impl.or_else(default_async_sleep);
202 let (connect_timeout, read_timeout) = self
203 .connector_settings
204 .map(|c| (c.connect_timeout(), c.read_timeout()))
205 .unwrap_or((None, None));
206
207 let connector = match connect_timeout {
208 Some(duration) => timeout::ConnectTimeout::new(
209 tcp_connector,
210 sleep_impl
211 .clone()
212 .expect("a sleep impl must be provided in order to have a connect timeout"),
213 duration,
214 ),
215 None => timeout::ConnectTimeout::no_timeout(tcp_connector),
216 };
217 let base = client_builder.build(connector);
218 let read_timeout = match read_timeout {
219 Some(duration) => timeout::HttpReadTimeout::new(
220 base,
221 sleep_impl.expect("a sleep impl must be provided in order to have a read timeout"),
222 duration,
223 ),
224 None => timeout::HttpReadTimeout::no_timeout(base),
225 };
226
227 let proxy_matcher = self
228 .proxy_config
229 .as_ref()
230 .map(|config| config.clone().into_hyper_util_matcher());
231
232 Connector {
233 adapter: Box::new(Adapter {
234 client: read_timeout,
235 proxy_matcher,
236 }),
237 }
238 }
239
240 fn base_connector(&self) -> HyperHttpConnector {
243 self.base_connector_with_resolver(GaiResolver::new())
244 }
245
246 fn base_connector_with_resolver<R>(&self, resolver: R) -> HyperHttpConnector<R> {
249 let mut conn = HyperHttpConnector::new_with_resolver(resolver);
250 conn.set_nodelay(self.enable_tcp_nodelay);
251 #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
252 if let Some(interface) = &self.interface {
253 conn.set_interface(interface);
254 }
255 conn
256 }
257
258 pub fn sleep_impl(mut self, sleep_impl: impl AsyncSleep + 'static) -> Self {
263 self.sleep_impl = Some(sleep_impl.into_shared());
264 self
265 }
266
267 pub fn set_sleep_impl(&mut self, sleep_impl: Option<SharedAsyncSleep>) -> &mut Self {
272 self.sleep_impl = sleep_impl;
273 self
274 }
275
276 pub fn connector_settings(mut self, connector_settings: HttpConnectorSettings) -> Self {
278 self.connector_settings = Some(connector_settings);
279 self
280 }
281
282 pub fn set_connector_settings(
284 &mut self,
285 connector_settings: Option<HttpConnectorSettings>,
286 ) -> &mut Self {
287 self.connector_settings = connector_settings;
288 self
289 }
290
291 pub fn enable_tcp_nodelay(mut self, nodelay: bool) -> Self {
293 self.enable_tcp_nodelay = nodelay;
294 self
295 }
296
297 pub fn set_enable_tcp_nodelay(&mut self, nodelay: bool) -> &mut Self {
299 self.enable_tcp_nodelay = nodelay;
300 self
301 }
302
303 #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
316 pub fn set_interface<S: Into<String>>(&mut self, interface: S) -> &mut Self {
317 self.interface = Some(interface.into());
318 self
319 }
320
321 pub fn proxy_config(mut self, config: proxy::ProxyConfig) -> Self {
343 self.proxy_config = Some(config);
344 self
345 }
346
347 pub fn set_proxy_config(&mut self, config: Option<proxy::ProxyConfig>) -> &mut Self {
351 self.proxy_config = config;
352 self
353 }
354
355 pub fn pool_idle_timeout<D>(mut self, val: D) -> Self
379 where
380 D: Into<Option<Duration>>,
381 {
382 self.pool_idle_timeout = Some(val.into());
383 self
384 }
385
386 pub fn set_pool_idle_timeout(&mut self, val: Option<Option<Duration>>) -> &mut Self {
410 self.pool_idle_timeout = val;
411 self
412 }
413
414 pub(crate) fn hyper_builder(
418 mut self,
419 hyper_builder: hyper_util::client::legacy::Builder,
420 ) -> Self {
421 self.set_hyper_builder(Some(hyper_builder));
422 self
423 }
424
425 pub(crate) fn set_hyper_builder(
429 &mut self,
430 hyper_builder: Option<hyper_util::client::legacy::Builder>,
431 ) -> &mut Self {
432 self.client_builder = hyper_builder;
433 self
434 }
435}
436
437struct Adapter<C> {
441 client: timeout::HttpReadTimeout<
442 hyper_util::client::legacy::Client<timeout::ConnectTimeout<C>, SdkBody>,
443 >,
444 proxy_matcher: Option<Matcher>,
445}
446
447impl<C> fmt::Debug for Adapter<C> {
448 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
449 f.debug_struct("Adapter")
450 .field("client", &"** hyper client **")
451 .field("proxy_matcher", &self.proxy_matcher.is_some())
452 .finish()
453 }
454}
455
456fn extract_smithy_connection(capture_conn: &CaptureConnection) -> Option<ConnectionMetadata> {
458 let capture_conn = capture_conn.clone();
459 if let Some(conn) = capture_conn.clone().connection_metadata().as_ref() {
460 let mut extensions = Extensions::new();
461 conn.get_extras(&mut extensions);
462 let http_info = extensions.get::<HttpInfo>();
463 let mut builder = ConnectionMetadata::builder()
464 .proxied(conn.is_proxied())
465 .poison_fn(move || match capture_conn.connection_metadata().as_ref() {
466 Some(conn) => conn.poison(),
467 None => tracing::trace!("no connection existed to poison"),
468 });
469
470 builder
471 .set_local_addr(http_info.map(|info| info.local_addr()))
472 .set_remote_addr(http_info.map(|info| info.remote_addr()));
473
474 let smithy_connection = builder.build();
475
476 Some(smithy_connection)
477 } else {
478 None
479 }
480}
481
482fn new_tokio_hyper_builder(
483 pool_idle_timeout: Option<Option<Duration>>,
484) -> hyper_util::client::legacy::Builder {
485 let mut builder = hyper_util::client::legacy::Builder::new(TokioExecutor::new());
486 builder.pool_timer(TokioTimer::new());
488
489 if let Some(pool_idle_timeout) = pool_idle_timeout {
490 builder.pool_idle_timeout(pool_idle_timeout);
491 }
492
493 builder
494}
495
496impl<C> Adapter<C> {
497 fn add_proxy_auth_header(&self, request: &mut http_1x::Request<SdkBody>) {
499 if request.uri().scheme() != Some(&http_1x::uri::Scheme::HTTP) {
501 return;
502 }
503
504 if request
506 .headers()
507 .contains_key(http_1x::header::PROXY_AUTHORIZATION)
508 {
509 return;
510 }
511
512 if let Some(ref matcher) = self.proxy_matcher {
513 if let Some(intercept) = matcher.intercept(request.uri()) {
514 if let Some(auth_header) = intercept.basic_auth() {
516 request
517 .headers_mut()
518 .insert(http_1x::header::PROXY_AUTHORIZATION, auth_header.clone());
519 tracing::debug!("added proxy authentication header for {}", request.uri());
520 }
521 }
522 }
523 }
524}
525
526impl<C> HttpConnector for Adapter<C>
527where
528 C: Clone + Send + Sync + 'static,
529 C: tower::Service<Uri>,
530 C::Response: Connection + Read + Write + Unpin + 'static,
531 timeout::ConnectTimeout<C>: Connect,
532 C::Future: Unpin + Send + 'static,
533 C::Error: Into<BoxError>,
534{
535 fn call(&self, request: HttpRequest) -> HttpConnectorFuture {
536 let mut request = match request.try_into_http1x() {
537 Ok(request) => request,
538 Err(err) => {
539 return HttpConnectorFuture::ready(Err(ConnectorError::user(err.into())));
540 }
541 };
542
543 self.add_proxy_auth_header(&mut request);
544
545 let capture_connection = capture_connection(&mut request);
546 if let Some(capture_smithy_connection) =
547 request.extensions().get::<CaptureSmithyConnection>()
548 {
549 capture_smithy_connection
550 .set_connection_retriever(move || extract_smithy_connection(&capture_connection));
551 }
552 let mut client = self.client.clone();
553 use tower::Service;
554 let fut = client.call(request);
555 HttpConnectorFuture::new(async move {
556 let response = fut
557 .await
558 .map_err(downcast_error)?
559 .map(SdkBody::from_body_1_x);
560 match HttpResponse::try_from(response) {
561 Ok(response) => Ok(response),
562 Err(err) => Err(ConnectorError::other(err.into(), None)),
563 }
564 })
565 }
566}
567
568fn downcast_error(err: BoxError) -> ConnectorError {
570 if find_source::<TimedOutError>(err.as_ref()).is_some() {
572 return ConnectorError::timeout(err);
573 }
574 let err = match err.downcast::<ConnectorError>() {
576 Ok(connector_error) => return *connector_error,
577 Err(box_error) => box_error,
578 };
579 let err = match find_source::<hyper::Error>(err.as_ref()) {
582 Some(hyper_error) => return to_connector_error(hyper_error)(err),
583 None => match find_source::<hyper_util::client::legacy::Error>(err.as_ref()) {
584 Some(hyper_util_err) => {
585 if hyper_util_err.is_connect()
586 || find_source::<std::io::Error>(hyper_util_err).is_some()
587 {
588 return ConnectorError::io(err);
589 }
590 err
591 }
592 None => err,
593 },
594 };
595
596 ConnectorError::other(err, None)
598}
599
600fn to_connector_error(err: &hyper::Error) -> fn(BoxError) -> ConnectorError {
602 if err.is_timeout() || find_source::<timeout::HttpTimeoutError>(err).is_some() {
603 return ConnectorError::timeout;
604 }
605 if err.is_user() {
606 return ConnectorError::user;
607 }
608 if err.is_closed() || err.is_canceled() || find_source::<std::io::Error>(err).is_some() {
609 return ConnectorError::io;
610 }
611 if err.is_incomplete_message() {
613 return |err: BoxError| ConnectorError::other(err, Some(ErrorKind::TransientError));
614 }
615
616 if let Some(h2_err) = find_source::<h2::Error>(err) {
617 if h2_err.is_go_away()
618 || (h2_err.is_reset() && h2_err.reason() == Some(Reason::REFUSED_STREAM))
619 {
620 return ConnectorError::io;
621 }
622 }
623
624 tracing::warn!(err = %DisplayErrorContext(&err), "unrecognized error from Hyper. If this error should be retried, please file an issue.");
625 |err: BoxError| ConnectorError::other(err, None)
626}
627
628fn find_source<'a, E: Error + 'static>(err: &'a (dyn Error + 'static)) -> Option<&'a E> {
629 let mut next = Some(err);
630 while let Some(err) = next {
631 if let Some(matching_err) = err.downcast_ref::<E>() {
632 return Some(matching_err);
633 }
634 next = err.source();
635 }
636 None
637}
638
639#[derive(Clone, Debug, Eq, PartialEq, Hash)]
643struct CacheKey {
644 connect_timeout: Option<Duration>,
645 read_timeout: Option<Duration>,
646}
647
648impl From<&HttpConnectorSettings> for CacheKey {
649 fn from(value: &HttpConnectorSettings) -> Self {
650 Self {
651 connect_timeout: value.connect_timeout(),
652 read_timeout: value.read_timeout(),
653 }
654 }
655}
656
657struct HyperClient<F> {
658 connector_cache: RwLock<HashMap<CacheKey, SharedHttpConnector>>,
659 client_builder: hyper_util::client::legacy::Builder,
660 connector_fn: F,
661}
662
663impl<F> fmt::Debug for HyperClient<F> {
664 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
665 f.debug_struct("HyperClient")
666 .field("connector_cache", &self.connector_cache)
667 .field("client_builder", &self.client_builder)
668 .finish()
669 }
670}
671
672impl<F> HttpClient for HyperClient<F>
673where
674 F: Fn(
675 hyper_util::client::legacy::Builder,
676 Option<&HttpConnectorSettings>,
677 Option<&RuntimeComponents>,
678 ) -> Connector
679 + Send
680 + Sync
681 + 'static,
682{
683 fn http_connector(
684 &self,
685 settings: &HttpConnectorSettings,
686 components: &RuntimeComponents,
687 ) -> SharedHttpConnector {
688 let key = CacheKey::from(settings);
689 let mut connector = self.connector_cache.read().unwrap().get(&key).cloned();
690 if connector.is_none() {
691 let mut cache = self.connector_cache.write().unwrap();
692 if !cache.contains_key(&key) {
694 let start = components.time_source().map(|ts| ts.now());
695 let connector = (self.connector_fn)(
696 self.client_builder.clone(),
697 Some(settings),
698 Some(components),
699 );
700 let end = components.time_source().map(|ts| ts.now());
701 if let (Some(start), Some(end)) = (start, end) {
702 if let Ok(elapsed) = end.duration_since(start) {
703 tracing::debug!("new connector created in {:?}", elapsed);
704 }
705 }
706 let connector = SharedHttpConnector::new(connector);
707 cache.insert(key.clone(), connector);
708 }
709 connector = cache.get(&key).cloned();
710 }
711
712 connector.expect("cache populated above")
713 }
714
715 fn validate_base_client_config(
716 &self,
717 _: &RuntimeComponentsBuilder,
718 _: &ConfigBag,
719 ) -> Result<(), BoxError> {
720 let _ = (self.connector_fn)(self.client_builder.clone(), None, None);
726 Ok(())
727 }
728
729 fn connector_metadata(&self) -> Option<ConnectorMetadata> {
730 Some(ConnectorMetadata::new("hyper", Some(Cow::Borrowed("1.x"))))
731 }
732}
733
734#[derive(Clone, Default, Debug)]
745pub struct Builder<Tls = TlsUnset> {
746 client_builder: Option<hyper_util::client::legacy::Builder>,
747 pool_idle_timeout: Option<Option<Duration>>,
748 #[allow(unused)]
749 tls_provider: Tls,
750}
751
752cfg_tls! {
753 use aws_smithy_runtime_api::client::dns::ResolveDns;
754
755 impl ConnectorBuilder<TlsProviderSelected> {
756 pub fn build(self) -> Connector {
758 let http_connector = self.base_connector();
759 self.build_https(http_connector)
760 }
761
762 pub fn tls_context(mut self, ctx: TlsContext) -> Self {
764 self.tls.context = ctx;
765 self
766 }
767
768 pub fn set_tls_context(&mut self, ctx: TlsContext) -> &mut Self {
770 self.tls.context = ctx;
771 self
772 }
773
774 pub fn build_with_resolver<R: ResolveDns + Clone + 'static>(self, resolver: R) -> Connector {
776 use crate::client::dns::HyperUtilResolver;
777 let http_connector = self.base_connector_with_resolver(HyperUtilResolver { resolver });
778 self.build_https(http_connector)
779 }
780
781 fn build_https<R>(self, http_connector: HyperHttpConnector<R>) -> Connector
782 where
783 R: Clone + Send + Sync + 'static,
784 R: tower::Service<hyper_util::client::legacy::connect::dns::Name>,
785 R::Response: Iterator<Item = std::net::SocketAddr>,
786 R::Future: Send,
787 R::Error: Into<Box<dyn Error + Send + Sync>>,
788 {
789 match &self.tls.provider {
790 #[cfg(any(
792 feature = "rustls-aws-lc",
793 feature = "rustls-aws-lc-fips",
794 feature = "rustls-ring"
795 ))]
796 tls::Provider::Rustls(crypto_mode) => {
797 let proxy_config = self.proxy_config.clone()
798 .unwrap_or_else(proxy::ProxyConfig::disabled);
799
800 let https_connector = tls::rustls_provider::build_connector::wrap_connector(
801 http_connector,
802 crypto_mode.clone(),
803 &self.tls.context,
804 proxy_config,
805 );
806 self.wrap_connector(https_connector)
807 },
808 #[cfg(feature = "s2n-tls")]
809 tls::Provider::S2nTls => {
810 let proxy_config = self.proxy_config.clone()
811 .unwrap_or_else(proxy::ProxyConfig::disabled);
812
813 let https_connector = tls::s2n_tls_provider::build_connector::wrap_connector(
814 http_connector,
815 &self.tls.context,
816 proxy_config,
817 );
818 self.wrap_connector(https_connector)
819 }
820 }
821 }
822 }
823
824 impl Builder<TlsProviderSelected> {
825 pub fn build_https(self) -> SharedHttpClient {
830 build_with_conn_fn(
831 self.client_builder,
832 self.pool_idle_timeout,
833 move |client_builder, settings, runtime_components| {
834 let builder = new_conn_builder(client_builder, settings, runtime_components)
835 .tls_provider(self.tls_provider.provider.clone())
836 .tls_context(self.tls_provider.context.clone());
837 builder.build()
838 },
839 )
840 }
841
842 pub fn build_with_resolver(
844 self,
845 resolver: impl ResolveDns + Clone + 'static,
846 ) -> SharedHttpClient {
847 build_with_conn_fn(
848 self.client_builder,
849 self.pool_idle_timeout,
850 move |client_builder, settings, runtime_components| {
851 let builder = new_conn_builder(client_builder, settings, runtime_components)
852 .tls_provider(self.tls_provider.provider.clone())
853 .tls_context(self.tls_provider.context.clone());
854 builder.build_with_resolver(resolver.clone())
855 },
856 )
857 }
858
859 pub fn tls_context(mut self, ctx: TlsContext) -> Self {
861 self.tls_provider.context = ctx;
862 self
863 }
864 }
865}
866
867impl<Any> Builder<Any> {
868 pub fn pool_idle_timeout<D>(mut self, val: D) -> Self
892 where
893 D: Into<Option<Duration>>,
894 {
895 self.pool_idle_timeout = Some(val.into());
896 self
897 }
898
899 pub fn set_pool_idle_timeout(&mut self, val: Option<Option<Duration>>) -> &mut Self {
922 self.pool_idle_timeout = val;
923 self
924 }
925}
926
927impl Builder<TlsUnset> {
928 pub fn new() -> Self {
930 Self::default()
931 }
932
933 #[doc(hidden)]
935 pub fn build_with_connector_fn<F>(self, connector_fn: F) -> SharedHttpClient
936 where
937 F: Fn(Option<&HttpConnectorSettings>, Option<&RuntimeComponents>) -> Connector
938 + Send
939 + Sync
940 + 'static,
941 {
942 build_with_conn_fn(
943 self.client_builder,
944 self.pool_idle_timeout,
945 move |_builder, settings, runtime_components| {
946 connector_fn(settings, runtime_components)
947 },
948 )
949 }
950
951 #[doc(hidden)]
953 pub fn build_http(self) -> SharedHttpClient {
954 build_with_conn_fn(
955 self.client_builder,
956 self.pool_idle_timeout,
957 move |client_builder, settings, runtime_components| {
958 let builder = new_conn_builder(client_builder, settings, runtime_components);
959 builder.build_http()
960 },
961 )
962 }
963
964 pub fn tls_provider(self, provider: tls::Provider) -> Builder<TlsProviderSelected> {
966 Builder {
967 client_builder: self.client_builder,
968 pool_idle_timeout: self.pool_idle_timeout,
969 tls_provider: TlsProviderSelected {
970 provider,
971 context: TlsContext::default(),
972 },
973 }
974 }
975}
976
977pub(crate) fn build_with_conn_fn<F>(
978 client_builder: Option<hyper_util::client::legacy::Builder>,
979 pool_idle_timeout: Option<Option<Duration>>,
980 connector_fn: F,
981) -> SharedHttpClient
982where
983 F: Fn(
984 hyper_util::client::legacy::Builder,
985 Option<&HttpConnectorSettings>,
986 Option<&RuntimeComponents>,
987 ) -> Connector
988 + Send
989 + Sync
990 + 'static,
991{
992 let client_builder =
993 client_builder.unwrap_or_else(|| new_tokio_hyper_builder(pool_idle_timeout));
994 SharedHttpClient::new(HyperClient {
995 connector_cache: RwLock::new(HashMap::new()),
996 client_builder,
997 connector_fn,
998 })
999}
1000
1001#[allow(dead_code)]
1002pub(crate) fn build_with_tcp_conn_fn<C, F>(
1003 client_builder: Option<hyper_util::client::legacy::Builder>,
1004 pool_idle_timeout: Option<Option<Duration>>,
1005 tcp_connector_fn: F,
1006) -> SharedHttpClient
1007where
1008 F: Fn() -> C + Send + Sync + 'static,
1009 C: Clone + Send + Sync + 'static,
1010 C: tower::Service<Uri>,
1011 C::Response: Connection + Read + Write + Send + Sync + Unpin + 'static,
1012 C::Future: Unpin + Send + 'static,
1013 C::Error: Into<BoxError>,
1014 C: Connect,
1015{
1016 build_with_conn_fn(
1017 client_builder,
1018 pool_idle_timeout,
1019 move |client_builder, settings, runtime_components| {
1020 let builder = new_conn_builder(client_builder, settings, runtime_components);
1021 builder.wrap_connector(tcp_connector_fn())
1022 },
1023 )
1024}
1025
1026fn new_conn_builder(
1027 client_builder: hyper_util::client::legacy::Builder,
1028 settings: Option<&HttpConnectorSettings>,
1029 runtime_components: Option<&RuntimeComponents>,
1030) -> ConnectorBuilder {
1031 let mut builder = Connector::builder().hyper_builder(client_builder);
1032 builder.set_connector_settings(settings.cloned());
1033 if let Some(components) = runtime_components {
1034 builder.set_sleep_impl(components.sleep_impl());
1035 }
1036 builder
1037}
1038
1039#[cfg(test)]
1040mod test {
1041 use std::io::{Error, ErrorKind};
1042 use std::pin::Pin;
1043 use std::sync::atomic::{AtomicU32, Ordering};
1044 use std::sync::Arc;
1045 use std::task::{Context, Poll};
1046
1047 use crate::client::timeout::test::NeverConnects;
1048 use aws_smithy_async::assert_elapsed;
1049 use aws_smithy_async::rt::sleep::TokioSleep;
1050 use aws_smithy_async::time::SystemTimeSource;
1051 use aws_smithy_runtime_api::client::runtime_components::RuntimeComponentsBuilder;
1052 use http_1x::Uri;
1053 use hyper::rt::ReadBufCursor;
1054 use hyper_util::client::legacy::connect::Connected;
1055
1056 use super::*;
1057
1058 #[tokio::test]
1059 async fn connector_selection() {
1060 let creation_count = Arc::new(AtomicU32::new(0));
1062 let http_client = build_with_tcp_conn_fn(None, None, {
1063 let count = creation_count.clone();
1064 move || {
1065 count.fetch_add(1, Ordering::Relaxed);
1066 NeverConnects
1067 }
1068 });
1069
1070 let settings = [
1072 HttpConnectorSettings::builder()
1073 .connect_timeout(Duration::from_secs(3))
1074 .build(),
1075 HttpConnectorSettings::builder()
1076 .read_timeout(Duration::from_secs(3))
1077 .build(),
1078 HttpConnectorSettings::builder()
1079 .connect_timeout(Duration::from_secs(3))
1080 .read_timeout(Duration::from_secs(3))
1081 .build(),
1082 HttpConnectorSettings::builder()
1083 .connect_timeout(Duration::from_secs(5))
1084 .read_timeout(Duration::from_secs(3))
1085 .build(),
1086 ];
1087
1088 let components = RuntimeComponentsBuilder::for_tests()
1090 .with_time_source(Some(SystemTimeSource::new()))
1091 .build()
1092 .unwrap();
1093 let mut handles = Vec::new();
1094 for setting in &settings {
1095 for _ in 0..1000 {
1096 let client = http_client.clone();
1097 handles.push(tokio::spawn({
1098 let setting = setting.clone();
1099 let components = components.clone();
1100 async move {
1101 let _ = client.http_connector(&setting, &components);
1102 }
1103 }));
1104 }
1105 }
1106 for handle in handles {
1107 handle.await.unwrap();
1108 }
1109
1110 assert_eq!(4, creation_count.load(Ordering::Relaxed));
1112 }
1113
1114 #[tokio::test]
1115 async fn hyper_io_error() {
1116 let connector = TestConnection {
1117 inner: HangupStream,
1118 };
1119 let adapter = Connector::builder().wrap_connector(connector).adapter;
1120 let err = adapter
1121 .call(HttpRequest::get("https://socket-hangup.com").unwrap())
1122 .await
1123 .expect_err("socket hangup");
1124 assert!(err.is_io(), "unexpected error type: {:?}", err);
1125 }
1126
1127 #[derive(Clone)]
1129 struct HangupStream;
1130
1131 impl Connection for HangupStream {
1132 fn connected(&self) -> Connected {
1133 Connected::new()
1134 }
1135 }
1136
1137 impl Read for HangupStream {
1138 fn poll_read(
1139 self: Pin<&mut Self>,
1140 _cx: &mut Context<'_>,
1141 _buf: ReadBufCursor<'_>,
1142 ) -> Poll<std::io::Result<()>> {
1143 Poll::Ready(Err(Error::new(
1144 ErrorKind::ConnectionReset,
1145 "connection reset",
1146 )))
1147 }
1148 }
1149
1150 impl Write for HangupStream {
1151 fn poll_write(
1152 self: Pin<&mut Self>,
1153 _cx: &mut Context<'_>,
1154 _buf: &[u8],
1155 ) -> Poll<Result<usize, Error>> {
1156 Poll::Pending
1157 }
1158
1159 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
1160 Poll::Pending
1161 }
1162
1163 fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
1164 Poll::Pending
1165 }
1166 }
1167
1168 #[derive(Clone)]
1169 struct TestConnection<T> {
1170 inner: T,
1171 }
1172
1173 impl<T> tower::Service<Uri> for TestConnection<T>
1174 where
1175 T: Clone + Connection,
1176 {
1177 type Response = T;
1178 type Error = BoxError;
1179 type Future = std::future::Ready<Result<Self::Response, Self::Error>>;
1180
1181 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1182 Poll::Ready(Ok(()))
1183 }
1184
1185 fn call(&mut self, _req: Uri) -> Self::Future {
1186 std::future::ready(Ok(self.inner.clone()))
1187 }
1188 }
1189
1190 #[tokio::test]
1191 async fn http_connect_timeout_works() {
1192 let tcp_connector = NeverConnects::default();
1193 let connector_settings = HttpConnectorSettings::builder()
1194 .connect_timeout(Duration::from_secs(1))
1195 .build();
1196 let hyper = Connector::builder()
1197 .connector_settings(connector_settings)
1198 .sleep_impl(SharedAsyncSleep::new(TokioSleep::new()))
1199 .wrap_connector(tcp_connector)
1200 .adapter;
1201 let now = tokio::time::Instant::now();
1202 tokio::time::pause();
1203 let resp = hyper
1204 .call(HttpRequest::get("https://static-uri.com").unwrap())
1205 .await
1206 .unwrap_err();
1207 assert!(
1208 resp.is_timeout(),
1209 "expected resp.is_timeout() to be true but it was false, resp == {:?}",
1210 resp
1211 );
1212 let message = DisplayErrorContext(&resp).to_string();
1213 let expected = "timeout: client error (Connect): HTTP connect timeout occurred after 1s";
1214 assert!(
1215 message.contains(expected),
1216 "expected '{message}' to contain '{expected}'"
1217 );
1218 assert_elapsed!(now, Duration::from_secs(1));
1219 }
1220
1221 #[tokio::test]
1222 async fn http_read_timeout_works() {
1223 let tcp_connector = crate::client::timeout::test::NeverReplies;
1224 let connector_settings = HttpConnectorSettings::builder()
1225 .connect_timeout(Duration::from_secs(1))
1226 .read_timeout(Duration::from_secs(2))
1227 .build();
1228 let hyper = Connector::builder()
1229 .connector_settings(connector_settings)
1230 .sleep_impl(SharedAsyncSleep::new(TokioSleep::new()))
1231 .wrap_connector(tcp_connector)
1232 .adapter;
1233 let now = tokio::time::Instant::now();
1234 tokio::time::pause();
1235 let err = hyper
1236 .call(HttpRequest::get("https://fake-uri.com").unwrap())
1237 .await
1238 .unwrap_err();
1239 assert!(
1240 err.is_timeout(),
1241 "expected err.is_timeout() to be true but it was false, err == {err:?}",
1242 );
1243 let message = format!("{}", DisplayErrorContext(&err));
1244 let expected = "timeout: HTTP read timeout occurred after 2s";
1245 assert!(
1246 message.contains(expected),
1247 "expected '{message}' to contain '{expected}'"
1248 );
1249 assert_elapsed!(now, Duration::from_secs(2));
1250 }
1251
1252 #[cfg(not(windows))]
1253 #[tokio::test]
1254 async fn connection_refused_works() {
1255 use crate::client::dns::HyperUtilResolver;
1256 use aws_smithy_runtime_api::client::dns::{DnsFuture, ResolveDns};
1257 use std::net::{IpAddr, Ipv4Addr};
1258
1259 #[derive(Debug, Clone, Default)]
1260 struct TestResolver;
1261 impl ResolveDns for TestResolver {
1262 fn resolve_dns<'a>(&'a self, _name: &'a str) -> DnsFuture<'a> {
1263 let localhost_v4 = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
1264 DnsFuture::ready(Ok(vec![localhost_v4]))
1265 }
1266 }
1267
1268 let connector_settings = HttpConnectorSettings::builder()
1269 .connect_timeout(Duration::from_secs(20))
1270 .build();
1271
1272 let resolver = HyperUtilResolver {
1273 resolver: TestResolver,
1274 };
1275 let connector = Connector::builder().base_connector_with_resolver(resolver);
1276
1277 let hyper = Connector::builder()
1278 .connector_settings(connector_settings)
1279 .sleep_impl(SharedAsyncSleep::new(TokioSleep::new()))
1280 .wrap_connector(connector)
1281 .adapter;
1282
1283 let resp = hyper
1284 .call(HttpRequest::get("http://static-uri:50227.com").unwrap())
1285 .await
1286 .unwrap_err();
1287 assert!(
1288 resp.is_io(),
1289 "expected resp.is_io() to be true but it was false, resp == {:?}",
1290 resp
1291 );
1292 let message = DisplayErrorContext(&resp).to_string();
1293 let expected = "Connection refused";
1294 assert!(
1295 message.contains(expected),
1296 "expected '{message}' to contain '{expected}'"
1297 );
1298 }
1299
1300 #[cfg(feature = "s2n-tls")]
1301 #[tokio::test]
1302 async fn s2n_tls_provider() {
1303 let client = Builder::new()
1305 .tls_provider(tls::Provider::S2nTls)
1306 .build_https();
1307 let connector_settings = HttpConnectorSettings::builder().build();
1308
1309 let runtime_components = RuntimeComponentsBuilder::for_tests()
1312 .with_time_source(Some(SystemTimeSource::new()))
1313 .build()
1314 .unwrap();
1315
1316 let connector = client.http_connector(&connector_settings, &runtime_components);
1317
1318 let error = connector
1323 .call(HttpRequest::get("notascheme://amazon.com").unwrap())
1324 .await
1325 .unwrap_err();
1326 let error = error.into_source();
1327 let s2n_error = error
1328 .source()
1329 .unwrap()
1330 .downcast_ref::<s2n_tls_hyper::error::Error>()
1331 .unwrap();
1332 assert!(matches!(
1333 s2n_error,
1334 s2n_tls_hyper::error::Error::InvalidScheme
1335 ));
1336 }
1337}