1mod dns;
7pub mod proxy;
9mod timeout;
10pub mod tls;
12
13pub(crate) mod connect;
14
15use crate::cfg::cfg_tls;
16use crate::tls::TlsContext;
17use aws_smithy_async::future::timeout::TimedOutError;
18use aws_smithy_async::rt::sleep::{default_async_sleep, AsyncSleep, SharedAsyncSleep};
19use aws_smithy_runtime_api::box_error::BoxError;
20use aws_smithy_runtime_api::client::connection::CaptureSmithyConnection;
21use aws_smithy_runtime_api::client::connection::ConnectionMetadata;
22use aws_smithy_runtime_api::client::connector_metadata::ConnectorMetadata;
23use aws_smithy_runtime_api::client::http::{
24 HttpClient, HttpConnector, HttpConnectorFuture, HttpConnectorSettings, SharedHttpClient,
25 SharedHttpConnector,
26};
27use aws_smithy_runtime_api::client::orchestrator::{HttpRequest, HttpResponse};
28use aws_smithy_runtime_api::client::result::ConnectorError;
29use aws_smithy_runtime_api::client::runtime_components::{
30 RuntimeComponents, RuntimeComponentsBuilder,
31};
32use aws_smithy_runtime_api::shared::IntoShared;
33use aws_smithy_types::body::SdkBody;
34use aws_smithy_types::config_bag::ConfigBag;
35use aws_smithy_types::error::display::DisplayErrorContext;
36use aws_smithy_types::retry::ErrorKind;
37use client::connect::Connection;
38use h2::Reason;
39use http_1x::{Extensions, Uri};
40use hyper::rt::{Read, Write};
41use hyper_util::client::legacy as client;
42use hyper_util::client::legacy::connect::dns::GaiResolver;
43use hyper_util::client::legacy::connect::{
44 capture_connection, CaptureConnection, Connect, HttpConnector as HyperHttpConnector, HttpInfo,
45};
46use hyper_util::client::proxy::matcher::Matcher;
47use hyper_util::rt::{TokioExecutor, TokioTimer};
48use std::borrow::Cow;
49use std::collections::HashMap;
50use std::error::Error;
51use std::fmt;
52use std::sync::RwLock;
53use std::time::Duration;
54
55pub fn default_connector(
57 settings: &HttpConnectorSettings,
58 sleep: Option<SharedAsyncSleep>,
59) -> Option<SharedHttpConnector> {
60 #[cfg(feature = "rustls-aws-lc")]
61 {
62 tracing::trace!(settings = ?settings, sleep = ?sleep, "creating a new default connector");
63 let mut conn_builder = Connector::builder().connector_settings(settings.clone());
64
65 if let Some(sleep) = sleep {
66 conn_builder = conn_builder.sleep_impl(sleep);
67 }
68
69 let conn = conn_builder
70 .tls_provider(tls::Provider::Rustls(
71 tls::rustls_provider::CryptoMode::AwsLc,
72 ))
73 .build();
74 Some(SharedHttpConnector::new(conn))
75 }
76 #[cfg(not(feature = "rustls-aws-lc"))]
77 {
78 tracing::trace!(settings = ?settings, sleep = ?sleep, "no default connector available");
79 None
80 }
81}
82
83#[derive(Debug)]
90pub struct Connector {
91 adapter: Box<dyn HttpConnector>,
92}
93
94impl Connector {
95 pub fn builder() -> ConnectorBuilder {
97 ConnectorBuilder {
98 enable_tcp_nodelay: true,
99 ..Default::default()
100 }
101 }
102}
103
104impl HttpConnector for Connector {
105 fn call(&self, request: HttpRequest) -> HttpConnectorFuture {
106 self.adapter.call(request)
107 }
108}
109
110#[derive(Default, Debug, Clone)]
112pub struct ConnectorBuilder<Tls = TlsUnset> {
113 connector_settings: Option<HttpConnectorSettings>,
114 sleep_impl: Option<SharedAsyncSleep>,
115 client_builder: Option<hyper_util::client::legacy::Builder>,
116 enable_tcp_nodelay: bool,
117 interface: Option<String>,
118 proxy_config: Option<proxy::ProxyConfig>,
119 #[allow(unused)]
120 tls: Tls,
121}
122
123#[derive(Default, Debug, Clone)]
125#[non_exhaustive]
126pub struct TlsUnset {}
127
128#[derive(Debug, Clone)]
130pub struct TlsProviderSelected {
131 #[allow(unused)]
132 provider: tls::Provider,
133 #[allow(unused)]
134 context: TlsContext,
135}
136
137impl ConnectorBuilder<TlsUnset> {
138 pub fn tls_provider(self, provider: tls::Provider) -> ConnectorBuilder<TlsProviderSelected> {
140 ConnectorBuilder {
141 connector_settings: self.connector_settings,
142 sleep_impl: self.sleep_impl,
143 client_builder: self.client_builder,
144 enable_tcp_nodelay: self.enable_tcp_nodelay,
145 interface: self.interface,
146 proxy_config: self.proxy_config,
147 tls: TlsProviderSelected {
148 provider,
149 context: TlsContext::default(),
150 },
151 }
152 }
153
154 #[doc(hidden)]
156 pub fn build_http(self) -> Connector {
157 if let Some(ref proxy_config) = self.proxy_config {
158 if proxy_config.requires_tls() {
159 tracing::warn!(
160 "HTTPS proxy configured but no TLS provider set. \
161 Connections to HTTPS proxy servers will fail. \
162 Consider configuring a TLS provider to enable TLS support."
163 );
164 }
165 }
166
167 let base = self.base_connector();
168
169 let proxy_config = self
171 .proxy_config
172 .clone()
173 .unwrap_or_else(proxy::ProxyConfig::disabled);
174
175 if !proxy_config.is_disabled() {
176 let http_proxy_connector = connect::HttpProxyConnector::new(base, proxy_config);
177 self.wrap_connector(http_proxy_connector)
178 } else {
179 self.wrap_connector(base)
180 }
181 }
182}
183
184impl<Any> ConnectorBuilder<Any> {
185 pub(crate) fn wrap_connector<C>(self, tcp_connector: C) -> Connector
187 where
188 C: Send + Sync + 'static,
189 C: Clone,
190 C: tower::Service<Uri>,
191 C::Response: Read + Write + Connection + Send + Sync + Unpin,
192 C: Connect,
193 C::Future: Unpin + Send + 'static,
194 C::Error: Into<BoxError>,
195 {
196 let client_builder = self.client_builder.unwrap_or_else(|| {
197 let mut builder = hyper_util::client::legacy::Builder::new(TokioExecutor::new());
198 builder.pool_timer(TokioTimer::new());
200
201 builder
202 });
203 let sleep_impl = self.sleep_impl.or_else(default_async_sleep);
204 let (connect_timeout, read_timeout) = self
205 .connector_settings
206 .map(|c| (c.connect_timeout(), c.read_timeout()))
207 .unwrap_or((None, None));
208
209 let connector = match connect_timeout {
210 Some(duration) => timeout::ConnectTimeout::new(
211 tcp_connector,
212 sleep_impl
213 .clone()
214 .expect("a sleep impl must be provided in order to have a connect timeout"),
215 duration,
216 ),
217 None => timeout::ConnectTimeout::no_timeout(tcp_connector),
218 };
219 let base = client_builder.build(connector);
220 let read_timeout = match read_timeout {
221 Some(duration) => timeout::HttpReadTimeout::new(
222 base,
223 sleep_impl.expect("a sleep impl must be provided in order to have a read timeout"),
224 duration,
225 ),
226 None => timeout::HttpReadTimeout::no_timeout(base),
227 };
228
229 let proxy_matcher = self
230 .proxy_config
231 .as_ref()
232 .map(|config| config.clone().into_hyper_util_matcher());
233
234 Connector {
235 adapter: Box::new(Adapter {
236 client: read_timeout,
237 proxy_matcher,
238 }),
239 }
240 }
241
242 fn base_connector(&self) -> HyperHttpConnector {
245 self.base_connector_with_resolver(GaiResolver::new())
246 }
247
248 fn base_connector_with_resolver<R>(&self, resolver: R) -> HyperHttpConnector<R> {
251 let mut conn = HyperHttpConnector::new_with_resolver(resolver);
252 conn.set_nodelay(self.enable_tcp_nodelay);
253 #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
254 if let Some(interface) = &self.interface {
255 conn.set_interface(interface);
256 }
257 conn
258 }
259
260 pub fn sleep_impl(mut self, sleep_impl: impl AsyncSleep + 'static) -> Self {
265 self.sleep_impl = Some(sleep_impl.into_shared());
266 self
267 }
268
269 pub fn set_sleep_impl(&mut self, sleep_impl: Option<SharedAsyncSleep>) -> &mut Self {
274 self.sleep_impl = sleep_impl;
275 self
276 }
277
278 pub fn connector_settings(mut self, connector_settings: HttpConnectorSettings) -> Self {
280 self.connector_settings = Some(connector_settings);
281 self
282 }
283
284 pub fn set_connector_settings(
286 &mut self,
287 connector_settings: Option<HttpConnectorSettings>,
288 ) -> &mut Self {
289 self.connector_settings = connector_settings;
290 self
291 }
292
293 pub fn enable_tcp_nodelay(mut self, nodelay: bool) -> Self {
295 self.enable_tcp_nodelay = nodelay;
296 self
297 }
298
299 pub fn set_enable_tcp_nodelay(&mut self, nodelay: bool) -> &mut Self {
301 self.enable_tcp_nodelay = nodelay;
302 self
303 }
304
305 #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
318 pub fn set_interface<S: Into<String>>(&mut self, interface: S) -> &mut Self {
319 self.interface = Some(interface.into());
320 self
321 }
322
323 pub fn proxy_config(mut self, config: proxy::ProxyConfig) -> Self {
345 self.proxy_config = Some(config);
346 self
347 }
348
349 pub fn set_proxy_config(&mut self, config: Option<proxy::ProxyConfig>) -> &mut Self {
353 self.proxy_config = config;
354 self
355 }
356
357 pub(crate) fn hyper_builder(
361 mut self,
362 hyper_builder: hyper_util::client::legacy::Builder,
363 ) -> Self {
364 self.set_hyper_builder(Some(hyper_builder));
365 self
366 }
367
368 pub(crate) fn set_hyper_builder(
372 &mut self,
373 hyper_builder: Option<hyper_util::client::legacy::Builder>,
374 ) -> &mut Self {
375 self.client_builder = hyper_builder;
376 self
377 }
378}
379
380struct Adapter<C> {
384 client: timeout::HttpReadTimeout<
385 hyper_util::client::legacy::Client<timeout::ConnectTimeout<C>, SdkBody>,
386 >,
387 proxy_matcher: Option<Matcher>,
388}
389
390impl<C> fmt::Debug for Adapter<C> {
391 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
392 f.debug_struct("Adapter")
393 .field("client", &"** hyper client **")
394 .field("proxy_matcher", &self.proxy_matcher.is_some())
395 .finish()
396 }
397}
398
399fn extract_smithy_connection(capture_conn: &CaptureConnection) -> Option<ConnectionMetadata> {
401 let capture_conn = capture_conn.clone();
402 if let Some(conn) = capture_conn.clone().connection_metadata().as_ref() {
403 let mut extensions = Extensions::new();
404 conn.get_extras(&mut extensions);
405 let http_info = extensions.get::<HttpInfo>();
406 let mut builder = ConnectionMetadata::builder()
407 .proxied(conn.is_proxied())
408 .poison_fn(move || match capture_conn.connection_metadata().as_ref() {
409 Some(conn) => conn.poison(),
410 None => tracing::trace!("no connection existed to poison"),
411 });
412
413 builder
414 .set_local_addr(http_info.map(|info| info.local_addr()))
415 .set_remote_addr(http_info.map(|info| info.remote_addr()));
416
417 let smithy_connection = builder.build();
418
419 Some(smithy_connection)
420 } else {
421 None
422 }
423}
424
425impl<C> Adapter<C> {
426 fn add_proxy_auth_header(&self, request: &mut http_1x::Request<SdkBody>) {
428 if request.uri().scheme() != Some(&http_1x::uri::Scheme::HTTP) {
430 return;
431 }
432
433 if request
435 .headers()
436 .contains_key(http_1x::header::PROXY_AUTHORIZATION)
437 {
438 return;
439 }
440
441 if let Some(ref matcher) = self.proxy_matcher {
442 if let Some(intercept) = matcher.intercept(request.uri()) {
443 if let Some(auth_header) = intercept.basic_auth() {
445 request
446 .headers_mut()
447 .insert(http_1x::header::PROXY_AUTHORIZATION, auth_header.clone());
448 tracing::debug!("added proxy authentication header for {}", request.uri());
449 }
450 }
451 }
452 }
453}
454
455impl<C> HttpConnector for Adapter<C>
456where
457 C: Clone + Send + Sync + 'static,
458 C: tower::Service<Uri>,
459 C::Response: Connection + Read + Write + Unpin + 'static,
460 timeout::ConnectTimeout<C>: Connect,
461 C::Future: Unpin + Send + 'static,
462 C::Error: Into<BoxError>,
463{
464 fn call(&self, request: HttpRequest) -> HttpConnectorFuture {
465 let mut request = match request.try_into_http1x() {
466 Ok(request) => request,
467 Err(err) => {
468 return HttpConnectorFuture::ready(Err(ConnectorError::user(err.into())));
469 }
470 };
471
472 self.add_proxy_auth_header(&mut request);
473
474 let capture_connection = capture_connection(&mut request);
475 if let Some(capture_smithy_connection) =
476 request.extensions().get::<CaptureSmithyConnection>()
477 {
478 capture_smithy_connection
479 .set_connection_retriever(move || extract_smithy_connection(&capture_connection));
480 }
481 let mut client = self.client.clone();
482 use tower::Service;
483 let fut = client.call(request);
484 HttpConnectorFuture::new(async move {
485 let response = fut
486 .await
487 .map_err(downcast_error)?
488 .map(SdkBody::from_body_1_x);
489 match HttpResponse::try_from(response) {
490 Ok(response) => Ok(response),
491 Err(err) => Err(ConnectorError::other(err.into(), None)),
492 }
493 })
494 }
495}
496
497fn downcast_error(err: BoxError) -> ConnectorError {
499 if find_source::<TimedOutError>(err.as_ref()).is_some() {
501 return ConnectorError::timeout(err);
502 }
503 let err = match err.downcast::<ConnectorError>() {
505 Ok(connector_error) => return *connector_error,
506 Err(box_error) => box_error,
507 };
508 let err = match find_source::<hyper::Error>(err.as_ref()) {
511 Some(hyper_error) => return to_connector_error(hyper_error)(err),
512 None => match find_source::<hyper_util::client::legacy::Error>(err.as_ref()) {
513 Some(hyper_util_err) => {
514 if hyper_util_err.is_connect()
515 || find_source::<std::io::Error>(hyper_util_err).is_some()
516 {
517 return ConnectorError::io(err);
518 }
519 err
520 }
521 None => err,
522 },
523 };
524
525 ConnectorError::other(err, None)
527}
528
529fn to_connector_error(err: &hyper::Error) -> fn(BoxError) -> ConnectorError {
531 if err.is_timeout() || find_source::<timeout::HttpTimeoutError>(err).is_some() {
532 return ConnectorError::timeout;
533 }
534 if err.is_user() {
535 return ConnectorError::user;
536 }
537 if err.is_closed() || err.is_canceled() || find_source::<std::io::Error>(err).is_some() {
538 return ConnectorError::io;
539 }
540 if err.is_incomplete_message() {
542 return |err: BoxError| ConnectorError::other(err, Some(ErrorKind::TransientError));
543 }
544
545 if let Some(h2_err) = find_source::<h2::Error>(err) {
546 if h2_err.is_go_away()
547 || (h2_err.is_reset() && h2_err.reason() == Some(Reason::REFUSED_STREAM))
548 {
549 return ConnectorError::io;
550 }
551 }
552
553 tracing::warn!(err = %DisplayErrorContext(&err), "unrecognized error from Hyper. If this error should be retried, please file an issue.");
554 |err: BoxError| ConnectorError::other(err, None)
555}
556
557fn find_source<'a, E: Error + 'static>(err: &'a (dyn Error + 'static)) -> Option<&'a E> {
558 let mut next = Some(err);
559 while let Some(err) = next {
560 if let Some(matching_err) = err.downcast_ref::<E>() {
561 return Some(matching_err);
562 }
563 next = err.source();
564 }
565 None
566}
567
568#[derive(Clone, Debug, Eq, PartialEq, Hash)]
572struct CacheKey {
573 connect_timeout: Option<Duration>,
574 read_timeout: Option<Duration>,
575}
576
577impl From<&HttpConnectorSettings> for CacheKey {
578 fn from(value: &HttpConnectorSettings) -> Self {
579 Self {
580 connect_timeout: value.connect_timeout(),
581 read_timeout: value.read_timeout(),
582 }
583 }
584}
585
586struct HyperClient<F> {
587 connector_cache: RwLock<HashMap<CacheKey, SharedHttpConnector>>,
588 client_builder: hyper_util::client::legacy::Builder,
589 connector_fn: F,
590}
591
592impl<F> fmt::Debug for HyperClient<F> {
593 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
594 f.debug_struct("HyperClient")
595 .field("connector_cache", &self.connector_cache)
596 .field("client_builder", &self.client_builder)
597 .finish()
598 }
599}
600
601impl<F> HttpClient for HyperClient<F>
602where
603 F: Fn(
604 hyper_util::client::legacy::Builder,
605 Option<&HttpConnectorSettings>,
606 Option<&RuntimeComponents>,
607 ) -> Connector
608 + Send
609 + Sync
610 + 'static,
611{
612 fn http_connector(
613 &self,
614 settings: &HttpConnectorSettings,
615 components: &RuntimeComponents,
616 ) -> SharedHttpConnector {
617 let key = CacheKey::from(settings);
618 let mut connector = self.connector_cache.read().unwrap().get(&key).cloned();
619 if connector.is_none() {
620 let mut cache = self.connector_cache.write().unwrap();
621 if !cache.contains_key(&key) {
623 let start = components.time_source().map(|ts| ts.now());
624 let connector = (self.connector_fn)(
625 self.client_builder.clone(),
626 Some(settings),
627 Some(components),
628 );
629 let end = components.time_source().map(|ts| ts.now());
630 if let (Some(start), Some(end)) = (start, end) {
631 if let Ok(elapsed) = end.duration_since(start) {
632 tracing::debug!("new connector created in {:?}", elapsed);
633 }
634 }
635 let connector = SharedHttpConnector::new(connector);
636 cache.insert(key.clone(), connector);
637 }
638 connector = cache.get(&key).cloned();
639 }
640
641 connector.expect("cache populated above")
642 }
643
644 fn validate_base_client_config(
645 &self,
646 _: &RuntimeComponentsBuilder,
647 _: &ConfigBag,
648 ) -> Result<(), BoxError> {
649 let _ = (self.connector_fn)(self.client_builder.clone(), None, None);
655 Ok(())
656 }
657
658 fn connector_metadata(&self) -> Option<ConnectorMetadata> {
659 Some(ConnectorMetadata::new("hyper", Some(Cow::Borrowed("1.x"))))
660 }
661}
662
663#[derive(Clone, Default, Debug)]
674pub struct Builder<Tls = TlsUnset> {
675 client_builder: Option<hyper_util::client::legacy::Builder>,
676 #[allow(unused)]
677 tls_provider: Tls,
678}
679
680cfg_tls! {
681 use aws_smithy_runtime_api::client::dns::ResolveDns;
682
683 impl ConnectorBuilder<TlsProviderSelected> {
684 pub fn build(self) -> Connector {
686 let http_connector = self.base_connector();
687 self.build_https(http_connector)
688 }
689
690 pub fn tls_context(mut self, ctx: TlsContext) -> Self {
692 self.tls.context = ctx;
693 self
694 }
695
696 pub fn set_tls_context(&mut self, ctx: TlsContext) -> &mut Self {
698 self.tls.context = ctx;
699 self
700 }
701
702 pub fn build_with_resolver<R: ResolveDns + Clone + 'static>(self, resolver: R) -> Connector {
704 use crate::client::dns::HyperUtilResolver;
705 let http_connector = self.base_connector_with_resolver(HyperUtilResolver { resolver });
706 self.build_https(http_connector)
707 }
708
709 fn build_https<R>(self, http_connector: HyperHttpConnector<R>) -> Connector
710 where
711 R: Clone + Send + Sync + 'static,
712 R: tower::Service<hyper_util::client::legacy::connect::dns::Name>,
713 R::Response: Iterator<Item = std::net::SocketAddr>,
714 R::Future: Send,
715 R::Error: Into<Box<dyn Error + Send + Sync>>,
716 {
717 match &self.tls.provider {
718 #[cfg(any(
720 feature = "rustls-aws-lc",
721 feature = "rustls-aws-lc-fips",
722 feature = "rustls-ring"
723 ))]
724 tls::Provider::Rustls(crypto_mode) => {
725 let proxy_config = self.proxy_config.clone()
726 .unwrap_or_else(proxy::ProxyConfig::disabled);
727
728 let https_connector = tls::rustls_provider::build_connector::wrap_connector(
729 http_connector,
730 crypto_mode.clone(),
731 &self.tls.context,
732 proxy_config,
733 );
734 self.wrap_connector(https_connector)
735 },
736 #[cfg(feature = "s2n-tls")]
737 tls::Provider::S2nTls => {
738 let proxy_config = self.proxy_config.clone()
739 .unwrap_or_else(proxy::ProxyConfig::disabled);
740
741 let https_connector = tls::s2n_tls_provider::build_connector::wrap_connector(
742 http_connector,
743 &self.tls.context,
744 proxy_config,
745 );
746 self.wrap_connector(https_connector)
747 }
748 }
749 }
750 }
751
752 impl Builder<TlsProviderSelected> {
753 pub fn build_https(self) -> SharedHttpClient {
758 build_with_conn_fn(
759 self.client_builder,
760 move |client_builder, settings, runtime_components| {
761 let builder = new_conn_builder(client_builder, settings, runtime_components)
762 .tls_provider(self.tls_provider.provider.clone())
763 .tls_context(self.tls_provider.context.clone());
764 builder.build()
765 },
766 )
767 }
768
769 pub fn build_with_resolver(
771 self,
772 resolver: impl ResolveDns + Clone + 'static,
773 ) -> SharedHttpClient {
774 build_with_conn_fn(
775 self.client_builder,
776 move |client_builder, settings, runtime_components| {
777 let builder = new_conn_builder(client_builder, settings, runtime_components)
778 .tls_provider(self.tls_provider.provider.clone())
779 .tls_context(self.tls_provider.context.clone());
780 builder.build_with_resolver(resolver.clone())
781 },
782 )
783 }
784
785 pub fn tls_context(mut self, ctx: TlsContext) -> Self {
787 self.tls_provider.context = ctx;
788 self
789 }
790 }
791}
792
793impl Builder<TlsUnset> {
794 pub fn new() -> Self {
796 Self::default()
797 }
798
799 #[doc(hidden)]
801 pub fn build_with_connector_fn<F>(self, connector_fn: F) -> SharedHttpClient
802 where
803 F: Fn(Option<&HttpConnectorSettings>, Option<&RuntimeComponents>) -> Connector
804 + Send
805 + Sync
806 + 'static,
807 {
808 build_with_conn_fn(
809 self.client_builder,
810 move |_builder, settings, runtime_components| {
811 connector_fn(settings, runtime_components)
812 },
813 )
814 }
815
816 #[doc(hidden)]
818 pub fn build_http(self) -> SharedHttpClient {
819 build_with_conn_fn(
820 self.client_builder,
821 move |client_builder, settings, runtime_components| {
822 let builder = new_conn_builder(client_builder, settings, runtime_components);
823 builder.build_http()
824 },
825 )
826 }
827
828 pub fn tls_provider(self, provider: tls::Provider) -> Builder<TlsProviderSelected> {
830 Builder {
831 client_builder: self.client_builder,
832 tls_provider: TlsProviderSelected {
833 provider,
834 context: TlsContext::default(),
835 },
836 }
837 }
838}
839
840pub(crate) fn build_with_conn_fn<F>(
841 client_builder: Option<hyper_util::client::legacy::Builder>,
842 connector_fn: F,
843) -> SharedHttpClient
844where
845 F: Fn(
846 hyper_util::client::legacy::Builder,
847 Option<&HttpConnectorSettings>,
848 Option<&RuntimeComponents>,
849 ) -> Connector
850 + Send
851 + Sync
852 + 'static,
853{
854 SharedHttpClient::new(HyperClient {
855 connector_cache: RwLock::new(HashMap::new()),
856 client_builder: client_builder
857 .unwrap_or_else(|| hyper_util::client::legacy::Builder::new(TokioExecutor::new())),
858 connector_fn,
859 })
860}
861
862#[allow(dead_code)]
863pub(crate) fn build_with_tcp_conn_fn<C, F>(
864 client_builder: Option<hyper_util::client::legacy::Builder>,
865 tcp_connector_fn: F,
866) -> SharedHttpClient
867where
868 F: Fn() -> C + Send + Sync + 'static,
869 C: Clone + Send + Sync + 'static,
870 C: tower::Service<Uri>,
871 C::Response: Connection + Read + Write + Send + Sync + Unpin + 'static,
872 C::Future: Unpin + Send + 'static,
873 C::Error: Into<BoxError>,
874 C: Connect,
875{
876 build_with_conn_fn(
877 client_builder,
878 move |client_builder, settings, runtime_components| {
879 let builder = new_conn_builder(client_builder, settings, runtime_components);
880 builder.wrap_connector(tcp_connector_fn())
881 },
882 )
883}
884
885fn new_conn_builder(
886 client_builder: hyper_util::client::legacy::Builder,
887 settings: Option<&HttpConnectorSettings>,
888 runtime_components: Option<&RuntimeComponents>,
889) -> ConnectorBuilder {
890 let mut builder = Connector::builder().hyper_builder(client_builder);
891 builder.set_connector_settings(settings.cloned());
892 if let Some(components) = runtime_components {
893 builder.set_sleep_impl(components.sleep_impl());
894 }
895 builder
896}
897
898#[cfg(test)]
899mod test {
900 use std::io::{Error, ErrorKind};
901 use std::pin::Pin;
902 use std::sync::atomic::{AtomicU32, Ordering};
903 use std::sync::Arc;
904 use std::task::{Context, Poll};
905
906 use crate::client::timeout::test::NeverConnects;
907 use aws_smithy_async::assert_elapsed;
908 use aws_smithy_async::rt::sleep::TokioSleep;
909 use aws_smithy_async::time::SystemTimeSource;
910 use aws_smithy_runtime_api::client::runtime_components::RuntimeComponentsBuilder;
911 use http_1x::Uri;
912 use hyper::rt::ReadBufCursor;
913 use hyper_util::client::legacy::connect::Connected;
914
915 use super::*;
916
917 #[tokio::test]
918 async fn connector_selection() {
919 let creation_count = Arc::new(AtomicU32::new(0));
921 let http_client = build_with_tcp_conn_fn(None, {
922 let count = creation_count.clone();
923 move || {
924 count.fetch_add(1, Ordering::Relaxed);
925 NeverConnects
926 }
927 });
928
929 let settings = [
931 HttpConnectorSettings::builder()
932 .connect_timeout(Duration::from_secs(3))
933 .build(),
934 HttpConnectorSettings::builder()
935 .read_timeout(Duration::from_secs(3))
936 .build(),
937 HttpConnectorSettings::builder()
938 .connect_timeout(Duration::from_secs(3))
939 .read_timeout(Duration::from_secs(3))
940 .build(),
941 HttpConnectorSettings::builder()
942 .connect_timeout(Duration::from_secs(5))
943 .read_timeout(Duration::from_secs(3))
944 .build(),
945 ];
946
947 let components = RuntimeComponentsBuilder::for_tests()
949 .with_time_source(Some(SystemTimeSource::new()))
950 .build()
951 .unwrap();
952 let mut handles = Vec::new();
953 for setting in &settings {
954 for _ in 0..1000 {
955 let client = http_client.clone();
956 handles.push(tokio::spawn({
957 let setting = setting.clone();
958 let components = components.clone();
959 async move {
960 let _ = client.http_connector(&setting, &components);
961 }
962 }));
963 }
964 }
965 for handle in handles {
966 handle.await.unwrap();
967 }
968
969 assert_eq!(4, creation_count.load(Ordering::Relaxed));
971 }
972
973 #[tokio::test]
974 async fn hyper_io_error() {
975 let connector = TestConnection {
976 inner: HangupStream,
977 };
978 let adapter = Connector::builder().wrap_connector(connector).adapter;
979 let err = adapter
980 .call(HttpRequest::get("https://socket-hangup.com").unwrap())
981 .await
982 .expect_err("socket hangup");
983 assert!(err.is_io(), "unexpected error type: {:?}", err);
984 }
985
986 #[derive(Clone)]
988 struct HangupStream;
989
990 impl Connection for HangupStream {
991 fn connected(&self) -> Connected {
992 Connected::new()
993 }
994 }
995
996 impl Read for HangupStream {
997 fn poll_read(
998 self: Pin<&mut Self>,
999 _cx: &mut Context<'_>,
1000 _buf: ReadBufCursor<'_>,
1001 ) -> Poll<std::io::Result<()>> {
1002 Poll::Ready(Err(Error::new(
1003 ErrorKind::ConnectionReset,
1004 "connection reset",
1005 )))
1006 }
1007 }
1008
1009 impl Write for HangupStream {
1010 fn poll_write(
1011 self: Pin<&mut Self>,
1012 _cx: &mut Context<'_>,
1013 _buf: &[u8],
1014 ) -> Poll<Result<usize, Error>> {
1015 Poll::Pending
1016 }
1017
1018 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
1019 Poll::Pending
1020 }
1021
1022 fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
1023 Poll::Pending
1024 }
1025 }
1026
1027 #[derive(Clone)]
1028 struct TestConnection<T> {
1029 inner: T,
1030 }
1031
1032 impl<T> tower::Service<Uri> for TestConnection<T>
1033 where
1034 T: Clone + Connection,
1035 {
1036 type Response = T;
1037 type Error = BoxError;
1038 type Future = std::future::Ready<Result<Self::Response, Self::Error>>;
1039
1040 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1041 Poll::Ready(Ok(()))
1042 }
1043
1044 fn call(&mut self, _req: Uri) -> Self::Future {
1045 std::future::ready(Ok(self.inner.clone()))
1046 }
1047 }
1048
1049 #[tokio::test]
1050 async fn http_connect_timeout_works() {
1051 let tcp_connector = NeverConnects::default();
1052 let connector_settings = HttpConnectorSettings::builder()
1053 .connect_timeout(Duration::from_secs(1))
1054 .build();
1055 let hyper = Connector::builder()
1056 .connector_settings(connector_settings)
1057 .sleep_impl(SharedAsyncSleep::new(TokioSleep::new()))
1058 .wrap_connector(tcp_connector)
1059 .adapter;
1060 let now = tokio::time::Instant::now();
1061 tokio::time::pause();
1062 let resp = hyper
1063 .call(HttpRequest::get("https://static-uri.com").unwrap())
1064 .await
1065 .unwrap_err();
1066 assert!(
1067 resp.is_timeout(),
1068 "expected resp.is_timeout() to be true but it was false, resp == {:?}",
1069 resp
1070 );
1071 let message = DisplayErrorContext(&resp).to_string();
1072 let expected = "timeout: client error (Connect): HTTP connect timeout occurred after 1s";
1073 assert!(
1074 message.contains(expected),
1075 "expected '{message}' to contain '{expected}'"
1076 );
1077 assert_elapsed!(now, Duration::from_secs(1));
1078 }
1079
1080 #[tokio::test]
1081 async fn http_read_timeout_works() {
1082 let tcp_connector = crate::client::timeout::test::NeverReplies;
1083 let connector_settings = HttpConnectorSettings::builder()
1084 .connect_timeout(Duration::from_secs(1))
1085 .read_timeout(Duration::from_secs(2))
1086 .build();
1087 let hyper = Connector::builder()
1088 .connector_settings(connector_settings)
1089 .sleep_impl(SharedAsyncSleep::new(TokioSleep::new()))
1090 .wrap_connector(tcp_connector)
1091 .adapter;
1092 let now = tokio::time::Instant::now();
1093 tokio::time::pause();
1094 let err = hyper
1095 .call(HttpRequest::get("https://fake-uri.com").unwrap())
1096 .await
1097 .unwrap_err();
1098 assert!(
1099 err.is_timeout(),
1100 "expected err.is_timeout() to be true but it was false, err == {err:?}",
1101 );
1102 let message = format!("{}", DisplayErrorContext(&err));
1103 let expected = "timeout: HTTP read timeout occurred after 2s";
1104 assert!(
1105 message.contains(expected),
1106 "expected '{message}' to contain '{expected}'"
1107 );
1108 assert_elapsed!(now, Duration::from_secs(2));
1109 }
1110
1111 #[cfg(not(windows))]
1112 #[tokio::test]
1113 async fn connection_refused_works() {
1114 use crate::client::dns::HyperUtilResolver;
1115 use aws_smithy_runtime_api::client::dns::{DnsFuture, ResolveDns};
1116 use std::net::{IpAddr, Ipv4Addr};
1117
1118 #[derive(Debug, Clone, Default)]
1119 struct TestResolver;
1120 impl ResolveDns for TestResolver {
1121 fn resolve_dns<'a>(&'a self, _name: &'a str) -> DnsFuture<'a> {
1122 let localhost_v4 = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
1123 DnsFuture::ready(Ok(vec![localhost_v4]))
1124 }
1125 }
1126
1127 let connector_settings = HttpConnectorSettings::builder()
1128 .connect_timeout(Duration::from_secs(20))
1129 .build();
1130
1131 let resolver = HyperUtilResolver {
1132 resolver: TestResolver,
1133 };
1134 let connector = Connector::builder().base_connector_with_resolver(resolver);
1135
1136 let hyper = Connector::builder()
1137 .connector_settings(connector_settings)
1138 .sleep_impl(SharedAsyncSleep::new(TokioSleep::new()))
1139 .wrap_connector(connector)
1140 .adapter;
1141
1142 let resp = hyper
1143 .call(HttpRequest::get("http://static-uri:50227.com").unwrap())
1144 .await
1145 .unwrap_err();
1146 assert!(
1147 resp.is_io(),
1148 "expected resp.is_io() to be true but it was false, resp == {:?}",
1149 resp
1150 );
1151 let message = DisplayErrorContext(&resp).to_string();
1152 let expected = "Connection refused";
1153 assert!(
1154 message.contains(expected),
1155 "expected '{message}' to contain '{expected}'"
1156 );
1157 }
1158
1159 #[cfg(feature = "s2n-tls")]
1160 #[tokio::test]
1161 async fn s2n_tls_provider() {
1162 let client = Builder::new()
1164 .tls_provider(tls::Provider::S2nTls)
1165 .build_https();
1166 let connector_settings = HttpConnectorSettings::builder().build();
1167
1168 let runtime_components = RuntimeComponentsBuilder::for_tests()
1171 .with_time_source(Some(SystemTimeSource::new()))
1172 .build()
1173 .unwrap();
1174
1175 let connector = client.http_connector(&connector_settings, &runtime_components);
1176
1177 let error = connector
1182 .call(HttpRequest::get("notascheme://amazon.com").unwrap())
1183 .await
1184 .unwrap_err();
1185 let error = error.into_source();
1186 let s2n_error = error
1187 .source()
1188 .unwrap()
1189 .downcast_ref::<s2n_tls_hyper::error::Error>()
1190 .unwrap();
1191 assert!(matches!(
1192 s2n_error,
1193 s2n_tls_hyper::error::Error::InvalidScheme
1194 ));
1195 }
1196}