1mod dns;
7pub mod proxy;
9mod timeout;
10pub mod tls;
12
13pub(crate) mod connect;
14
15use crate::cfg::cfg_tls;
16use crate::tls::TlsContext;
17use aws_smithy_async::future::timeout::TimedOutError;
18use aws_smithy_async::rt::sleep::{default_async_sleep, AsyncSleep, SharedAsyncSleep};
19use aws_smithy_runtime_api::box_error::BoxError;
20use aws_smithy_runtime_api::client::connection::CaptureSmithyConnection;
21use aws_smithy_runtime_api::client::connection::ConnectionMetadata;
22use aws_smithy_runtime_api::client::connector_metadata::ConnectorMetadata;
23use aws_smithy_runtime_api::client::http::{
24 HttpClient, HttpConnector, HttpConnectorFuture, HttpConnectorSettings, SharedHttpClient,
25 SharedHttpConnector,
26};
27use aws_smithy_runtime_api::client::orchestrator::{HttpRequest, HttpResponse};
28use aws_smithy_runtime_api::client::result::ConnectorError;
29use aws_smithy_runtime_api::client::runtime_components::{
30 RuntimeComponents, RuntimeComponentsBuilder,
31};
32use aws_smithy_runtime_api::shared::IntoShared;
33use aws_smithy_types::body::SdkBody;
34use aws_smithy_types::config_bag::ConfigBag;
35use aws_smithy_types::error::display::DisplayErrorContext;
36use aws_smithy_types::retry::ErrorKind;
37use client::connect::Connection;
38use h2::Reason;
39use http_1x::{Extensions, Uri};
40use hyper::rt::{Read, Write};
41use hyper_util::client::legacy as client;
42use hyper_util::client::legacy::connect::dns::GaiResolver;
43use hyper_util::client::legacy::connect::{
44 capture_connection, CaptureConnection, Connect, HttpConnector as HyperHttpConnector, HttpInfo,
45};
46use hyper_util::client::proxy::matcher::Matcher;
47use hyper_util::rt::TokioExecutor;
48use std::borrow::Cow;
49use std::collections::HashMap;
50use std::error::Error;
51use std::fmt;
52use std::sync::RwLock;
53use std::time::Duration;
54
55pub fn default_connector(
57 settings: &HttpConnectorSettings,
58 sleep: Option<SharedAsyncSleep>,
59) -> Option<SharedHttpConnector> {
60 #[cfg(feature = "rustls-aws-lc")]
61 {
62 tracing::trace!(settings = ?settings, sleep = ?sleep, "creating a new default connector");
63 let mut conn_builder = Connector::builder().connector_settings(settings.clone());
64
65 if let Some(sleep) = sleep {
66 conn_builder = conn_builder.sleep_impl(sleep);
67 }
68
69 let conn = conn_builder
70 .tls_provider(tls::Provider::Rustls(
71 tls::rustls_provider::CryptoMode::AwsLc,
72 ))
73 .build();
74 Some(SharedHttpConnector::new(conn))
75 }
76 #[cfg(not(feature = "rustls-aws-lc"))]
77 {
78 tracing::trace!(settings = ?settings, sleep = ?sleep, "no default connector available");
79 None
80 }
81}
82
83#[derive(Debug)]
90pub struct Connector {
91 adapter: Box<dyn HttpConnector>,
92}
93
94impl Connector {
95 pub fn builder() -> ConnectorBuilder {
97 ConnectorBuilder {
98 enable_tcp_nodelay: true,
99 ..Default::default()
100 }
101 }
102}
103
104impl HttpConnector for Connector {
105 fn call(&self, request: HttpRequest) -> HttpConnectorFuture {
106 self.adapter.call(request)
107 }
108}
109
110#[derive(Default, Debug, Clone)]
112pub struct ConnectorBuilder<Tls = TlsUnset> {
113 connector_settings: Option<HttpConnectorSettings>,
114 sleep_impl: Option<SharedAsyncSleep>,
115 client_builder: Option<hyper_util::client::legacy::Builder>,
116 enable_tcp_nodelay: bool,
117 interface: Option<String>,
118 proxy_config: Option<proxy::ProxyConfig>,
119 #[allow(unused)]
120 tls: Tls,
121}
122
123#[derive(Default, Debug, Clone)]
125#[non_exhaustive]
126pub struct TlsUnset {}
127
128#[derive(Debug, Clone)]
130pub struct TlsProviderSelected {
131 #[allow(unused)]
132 provider: tls::Provider,
133 #[allow(unused)]
134 context: TlsContext,
135}
136
137impl ConnectorBuilder<TlsUnset> {
138 pub fn tls_provider(self, provider: tls::Provider) -> ConnectorBuilder<TlsProviderSelected> {
140 ConnectorBuilder {
141 connector_settings: self.connector_settings,
142 sleep_impl: self.sleep_impl,
143 client_builder: self.client_builder,
144 enable_tcp_nodelay: self.enable_tcp_nodelay,
145 interface: self.interface,
146 proxy_config: self.proxy_config,
147 tls: TlsProviderSelected {
148 provider,
149 context: TlsContext::default(),
150 },
151 }
152 }
153
154 #[doc(hidden)]
156 pub fn build_http(self) -> Connector {
157 if let Some(ref proxy_config) = self.proxy_config {
158 if proxy_config.requires_tls() {
159 tracing::warn!(
160 "HTTPS proxy configured but no TLS provider set. \
161 Connections to HTTPS proxy servers will fail. \
162 Consider configuring a TLS provider to enable TLS support."
163 );
164 }
165 }
166
167 let base = self.base_connector();
168
169 let proxy_config = self
171 .proxy_config
172 .clone()
173 .unwrap_or_else(proxy::ProxyConfig::disabled);
174
175 if !proxy_config.is_disabled() {
176 let http_proxy_connector = connect::HttpProxyConnector::new(base, proxy_config);
177 self.wrap_connector(http_proxy_connector)
178 } else {
179 self.wrap_connector(base)
180 }
181 }
182}
183
184impl<Any> ConnectorBuilder<Any> {
185 pub(crate) fn wrap_connector<C>(self, tcp_connector: C) -> Connector
187 where
188 C: Send + Sync + 'static,
189 C: Clone,
190 C: tower::Service<Uri>,
191 C::Response: Read + Write + Connection + Send + Sync + Unpin,
192 C: Connect,
193 C::Future: Unpin + Send + 'static,
194 C::Error: Into<BoxError>,
195 {
196 let client_builder =
197 self.client_builder
198 .unwrap_or(hyper_util::client::legacy::Builder::new(
199 TokioExecutor::new(),
200 ));
201 let sleep_impl = self.sleep_impl.or_else(default_async_sleep);
202 let (connect_timeout, read_timeout) = self
203 .connector_settings
204 .map(|c| (c.connect_timeout(), c.read_timeout()))
205 .unwrap_or((None, None));
206
207 let connector = match connect_timeout {
208 Some(duration) => timeout::ConnectTimeout::new(
209 tcp_connector,
210 sleep_impl
211 .clone()
212 .expect("a sleep impl must be provided in order to have a connect timeout"),
213 duration,
214 ),
215 None => timeout::ConnectTimeout::no_timeout(tcp_connector),
216 };
217 let base = client_builder.build(connector);
218 let read_timeout = match read_timeout {
219 Some(duration) => timeout::HttpReadTimeout::new(
220 base,
221 sleep_impl.expect("a sleep impl must be provided in order to have a read timeout"),
222 duration,
223 ),
224 None => timeout::HttpReadTimeout::no_timeout(base),
225 };
226
227 let proxy_matcher = self
228 .proxy_config
229 .as_ref()
230 .map(|config| config.clone().into_hyper_util_matcher());
231
232 Connector {
233 adapter: Box::new(Adapter {
234 client: read_timeout,
235 proxy_matcher,
236 }),
237 }
238 }
239
240 fn base_connector(&self) -> HyperHttpConnector {
243 self.base_connector_with_resolver(GaiResolver::new())
244 }
245
246 fn base_connector_with_resolver<R>(&self, resolver: R) -> HyperHttpConnector<R> {
249 let mut conn = HyperHttpConnector::new_with_resolver(resolver);
250 conn.set_nodelay(self.enable_tcp_nodelay);
251 #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
252 if let Some(interface) = &self.interface {
253 conn.set_interface(interface);
254 }
255 conn
256 }
257
258 pub fn sleep_impl(mut self, sleep_impl: impl AsyncSleep + 'static) -> Self {
263 self.sleep_impl = Some(sleep_impl.into_shared());
264 self
265 }
266
267 pub fn set_sleep_impl(&mut self, sleep_impl: Option<SharedAsyncSleep>) -> &mut Self {
272 self.sleep_impl = sleep_impl;
273 self
274 }
275
276 pub fn connector_settings(mut self, connector_settings: HttpConnectorSettings) -> Self {
278 self.connector_settings = Some(connector_settings);
279 self
280 }
281
282 pub fn set_connector_settings(
284 &mut self,
285 connector_settings: Option<HttpConnectorSettings>,
286 ) -> &mut Self {
287 self.connector_settings = connector_settings;
288 self
289 }
290
291 pub fn enable_tcp_nodelay(mut self, nodelay: bool) -> Self {
293 self.enable_tcp_nodelay = nodelay;
294 self
295 }
296
297 pub fn set_enable_tcp_nodelay(&mut self, nodelay: bool) -> &mut Self {
299 self.enable_tcp_nodelay = nodelay;
300 self
301 }
302
303 #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
316 pub fn set_interface<S: Into<String>>(&mut self, interface: S) -> &mut Self {
317 self.interface = Some(interface.into());
318 self
319 }
320
321 pub fn proxy_config(mut self, config: proxy::ProxyConfig) -> Self {
343 self.proxy_config = Some(config);
344 self
345 }
346
347 pub fn set_proxy_config(&mut self, config: Option<proxy::ProxyConfig>) -> &mut Self {
351 self.proxy_config = config;
352 self
353 }
354
355 pub(crate) fn hyper_builder(
359 mut self,
360 hyper_builder: hyper_util::client::legacy::Builder,
361 ) -> Self {
362 self.set_hyper_builder(Some(hyper_builder));
363 self
364 }
365
366 pub(crate) fn set_hyper_builder(
370 &mut self,
371 hyper_builder: Option<hyper_util::client::legacy::Builder>,
372 ) -> &mut Self {
373 self.client_builder = hyper_builder;
374 self
375 }
376}
377
378struct Adapter<C> {
382 client: timeout::HttpReadTimeout<
383 hyper_util::client::legacy::Client<timeout::ConnectTimeout<C>, SdkBody>,
384 >,
385 proxy_matcher: Option<Matcher>,
386}
387
388impl<C> fmt::Debug for Adapter<C> {
389 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
390 f.debug_struct("Adapter")
391 .field("client", &"** hyper client **")
392 .field("proxy_matcher", &self.proxy_matcher.is_some())
393 .finish()
394 }
395}
396
397fn extract_smithy_connection(capture_conn: &CaptureConnection) -> Option<ConnectionMetadata> {
399 let capture_conn = capture_conn.clone();
400 if let Some(conn) = capture_conn.clone().connection_metadata().as_ref() {
401 let mut extensions = Extensions::new();
402 conn.get_extras(&mut extensions);
403 let http_info = extensions.get::<HttpInfo>();
404 let mut builder = ConnectionMetadata::builder()
405 .proxied(conn.is_proxied())
406 .poison_fn(move || match capture_conn.connection_metadata().as_ref() {
407 Some(conn) => conn.poison(),
408 None => tracing::trace!("no connection existed to poison"),
409 });
410
411 builder
412 .set_local_addr(http_info.map(|info| info.local_addr()))
413 .set_remote_addr(http_info.map(|info| info.remote_addr()));
414
415 let smithy_connection = builder.build();
416
417 Some(smithy_connection)
418 } else {
419 None
420 }
421}
422
423impl<C> Adapter<C> {
424 fn add_proxy_auth_header(&self, request: &mut http_1x::Request<SdkBody>) {
426 if request.uri().scheme() != Some(&http_1x::uri::Scheme::HTTP) {
428 return;
429 }
430
431 if request
433 .headers()
434 .contains_key(http_1x::header::PROXY_AUTHORIZATION)
435 {
436 return;
437 }
438
439 if let Some(ref matcher) = self.proxy_matcher {
440 if let Some(intercept) = matcher.intercept(request.uri()) {
441 if let Some(auth_header) = intercept.basic_auth() {
443 request
444 .headers_mut()
445 .insert(http_1x::header::PROXY_AUTHORIZATION, auth_header.clone());
446 tracing::debug!("added proxy authentication header for {}", request.uri());
447 }
448 }
449 }
450 }
451}
452
453impl<C> HttpConnector for Adapter<C>
454where
455 C: Clone + Send + Sync + 'static,
456 C: tower::Service<Uri>,
457 C::Response: Connection + Read + Write + Unpin + 'static,
458 timeout::ConnectTimeout<C>: Connect,
459 C::Future: Unpin + Send + 'static,
460 C::Error: Into<BoxError>,
461{
462 fn call(&self, request: HttpRequest) -> HttpConnectorFuture {
463 let mut request = match request.try_into_http1x() {
464 Ok(request) => request,
465 Err(err) => {
466 return HttpConnectorFuture::ready(Err(ConnectorError::user(err.into())));
467 }
468 };
469
470 self.add_proxy_auth_header(&mut request);
471
472 let capture_connection = capture_connection(&mut request);
473 if let Some(capture_smithy_connection) =
474 request.extensions().get::<CaptureSmithyConnection>()
475 {
476 capture_smithy_connection
477 .set_connection_retriever(move || extract_smithy_connection(&capture_connection));
478 }
479 let mut client = self.client.clone();
480 use tower::Service;
481 let fut = client.call(request);
482 HttpConnectorFuture::new(async move {
483 let response = fut
484 .await
485 .map_err(downcast_error)?
486 .map(SdkBody::from_body_1_x);
487 match HttpResponse::try_from(response) {
488 Ok(response) => Ok(response),
489 Err(err) => Err(ConnectorError::other(err.into(), None)),
490 }
491 })
492 }
493}
494
495fn downcast_error(err: BoxError) -> ConnectorError {
497 if find_source::<TimedOutError>(err.as_ref()).is_some() {
499 return ConnectorError::timeout(err);
500 }
501 let err = match err.downcast::<ConnectorError>() {
503 Ok(connector_error) => return *connector_error,
504 Err(box_error) => box_error,
505 };
506 let err = match find_source::<hyper::Error>(err.as_ref()) {
509 Some(hyper_error) => return to_connector_error(hyper_error)(err),
510 None => match find_source::<hyper_util::client::legacy::Error>(err.as_ref()) {
511 Some(hyper_util_err) => {
512 if hyper_util_err.is_connect()
513 || find_source::<std::io::Error>(hyper_util_err).is_some()
514 {
515 return ConnectorError::io(err);
516 }
517 err
518 }
519 None => err,
520 },
521 };
522
523 ConnectorError::other(err, None)
525}
526
527fn to_connector_error(err: &hyper::Error) -> fn(BoxError) -> ConnectorError {
529 if err.is_timeout() || find_source::<timeout::HttpTimeoutError>(err).is_some() {
530 return ConnectorError::timeout;
531 }
532 if err.is_user() {
533 return ConnectorError::user;
534 }
535 if err.is_closed() || err.is_canceled() || find_source::<std::io::Error>(err).is_some() {
536 return ConnectorError::io;
537 }
538 if err.is_incomplete_message() {
540 return |err: BoxError| ConnectorError::other(err, Some(ErrorKind::TransientError));
541 }
542
543 if let Some(h2_err) = find_source::<h2::Error>(err) {
544 if h2_err.is_go_away()
545 || (h2_err.is_reset() && h2_err.reason() == Some(Reason::REFUSED_STREAM))
546 {
547 return ConnectorError::io;
548 }
549 }
550
551 tracing::warn!(err = %DisplayErrorContext(&err), "unrecognized error from Hyper. If this error should be retried, please file an issue.");
552 |err: BoxError| ConnectorError::other(err, None)
553}
554
555fn find_source<'a, E: Error + 'static>(err: &'a (dyn Error + 'static)) -> Option<&'a E> {
556 let mut next = Some(err);
557 while let Some(err) = next {
558 if let Some(matching_err) = err.downcast_ref::<E>() {
559 return Some(matching_err);
560 }
561 next = err.source();
562 }
563 None
564}
565
566#[derive(Clone, Debug, Eq, PartialEq, Hash)]
570struct CacheKey {
571 connect_timeout: Option<Duration>,
572 read_timeout: Option<Duration>,
573}
574
575impl From<&HttpConnectorSettings> for CacheKey {
576 fn from(value: &HttpConnectorSettings) -> Self {
577 Self {
578 connect_timeout: value.connect_timeout(),
579 read_timeout: value.read_timeout(),
580 }
581 }
582}
583
584struct HyperClient<F> {
585 connector_cache: RwLock<HashMap<CacheKey, SharedHttpConnector>>,
586 client_builder: hyper_util::client::legacy::Builder,
587 connector_fn: F,
588}
589
590impl<F> fmt::Debug for HyperClient<F> {
591 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
592 f.debug_struct("HyperClient")
593 .field("connector_cache", &self.connector_cache)
594 .field("client_builder", &self.client_builder)
595 .finish()
596 }
597}
598
599impl<F> HttpClient for HyperClient<F>
600where
601 F: Fn(
602 hyper_util::client::legacy::Builder,
603 Option<&HttpConnectorSettings>,
604 Option<&RuntimeComponents>,
605 ) -> Connector
606 + Send
607 + Sync
608 + 'static,
609{
610 fn http_connector(
611 &self,
612 settings: &HttpConnectorSettings,
613 components: &RuntimeComponents,
614 ) -> SharedHttpConnector {
615 let key = CacheKey::from(settings);
616 let mut connector = self.connector_cache.read().unwrap().get(&key).cloned();
617 if connector.is_none() {
618 let mut cache = self.connector_cache.write().unwrap();
619 if !cache.contains_key(&key) {
621 let start = components.time_source().map(|ts| ts.now());
622 let connector = (self.connector_fn)(
623 self.client_builder.clone(),
624 Some(settings),
625 Some(components),
626 );
627 let end = components.time_source().map(|ts| ts.now());
628 if let (Some(start), Some(end)) = (start, end) {
629 if let Ok(elapsed) = end.duration_since(start) {
630 tracing::debug!("new connector created in {:?}", elapsed);
631 }
632 }
633 let connector = SharedHttpConnector::new(connector);
634 cache.insert(key.clone(), connector);
635 }
636 connector = cache.get(&key).cloned();
637 }
638
639 connector.expect("cache populated above")
640 }
641
642 fn validate_base_client_config(
643 &self,
644 _: &RuntimeComponentsBuilder,
645 _: &ConfigBag,
646 ) -> Result<(), BoxError> {
647 let _ = (self.connector_fn)(self.client_builder.clone(), None, None);
653 Ok(())
654 }
655
656 fn connector_metadata(&self) -> Option<ConnectorMetadata> {
657 Some(ConnectorMetadata::new("hyper", Some(Cow::Borrowed("1.x"))))
658 }
659}
660
661#[derive(Clone, Default, Debug)]
672pub struct Builder<Tls = TlsUnset> {
673 client_builder: Option<hyper_util::client::legacy::Builder>,
674 #[allow(unused)]
675 tls_provider: Tls,
676}
677
678cfg_tls! {
679 use aws_smithy_runtime_api::client::dns::ResolveDns;
680
681 impl ConnectorBuilder<TlsProviderSelected> {
682 pub fn build(self) -> Connector {
684 let http_connector = self.base_connector();
685 self.build_https(http_connector)
686 }
687
688 pub fn tls_context(mut self, ctx: TlsContext) -> Self {
690 self.tls.context = ctx;
691 self
692 }
693
694 pub fn set_tls_context(&mut self, ctx: TlsContext) -> &mut Self {
696 self.tls.context = ctx;
697 self
698 }
699
700 pub fn build_with_resolver<R: ResolveDns + Clone + 'static>(self, resolver: R) -> Connector {
702 use crate::client::dns::HyperUtilResolver;
703 let http_connector = self.base_connector_with_resolver(HyperUtilResolver { resolver });
704 self.build_https(http_connector)
705 }
706
707 fn build_https<R>(self, http_connector: HyperHttpConnector<R>) -> Connector
708 where
709 R: Clone + Send + Sync + 'static,
710 R: tower::Service<hyper_util::client::legacy::connect::dns::Name>,
711 R::Response: Iterator<Item = std::net::SocketAddr>,
712 R::Future: Send,
713 R::Error: Into<Box<dyn Error + Send + Sync>>,
714 {
715 match &self.tls.provider {
716 #[cfg(any(
718 feature = "rustls-aws-lc",
719 feature = "rustls-aws-lc-fips",
720 feature = "rustls-ring"
721 ))]
722 tls::Provider::Rustls(crypto_mode) => {
723 let proxy_config = self.proxy_config.clone()
724 .unwrap_or_else(proxy::ProxyConfig::disabled);
725
726 let https_connector = tls::rustls_provider::build_connector::wrap_connector(
727 http_connector,
728 crypto_mode.clone(),
729 &self.tls.context,
730 proxy_config,
731 );
732 self.wrap_connector(https_connector)
733 },
734 #[cfg(feature = "s2n-tls")]
735 tls::Provider::S2nTls => {
736 let proxy_config = self.proxy_config.clone()
737 .unwrap_or_else(proxy::ProxyConfig::disabled);
738
739 let https_connector = tls::s2n_tls_provider::build_connector::wrap_connector(
740 http_connector,
741 &self.tls.context,
742 proxy_config,
743 );
744 self.wrap_connector(https_connector)
745 }
746 }
747 }
748 }
749
750 impl Builder<TlsProviderSelected> {
751 pub fn build_https(self) -> SharedHttpClient {
756 build_with_conn_fn(
757 self.client_builder,
758 move |client_builder, settings, runtime_components| {
759 let builder = new_conn_builder(client_builder, settings, runtime_components)
760 .tls_provider(self.tls_provider.provider.clone())
761 .tls_context(self.tls_provider.context.clone());
762 builder.build()
763 },
764 )
765 }
766
767 pub fn build_with_resolver(
769 self,
770 resolver: impl ResolveDns + Clone + 'static,
771 ) -> SharedHttpClient {
772 build_with_conn_fn(
773 self.client_builder,
774 move |client_builder, settings, runtime_components| {
775 let builder = new_conn_builder(client_builder, settings, runtime_components)
776 .tls_provider(self.tls_provider.provider.clone())
777 .tls_context(self.tls_provider.context.clone());
778 builder.build_with_resolver(resolver.clone())
779 },
780 )
781 }
782
783 pub fn tls_context(mut self, ctx: TlsContext) -> Self {
785 self.tls_provider.context = ctx;
786 self
787 }
788 }
789}
790
791impl Builder<TlsUnset> {
792 pub fn new() -> Self {
794 Self::default()
795 }
796
797 #[doc(hidden)]
799 pub fn build_with_connector_fn<F>(self, connector_fn: F) -> SharedHttpClient
800 where
801 F: Fn(Option<&HttpConnectorSettings>, Option<&RuntimeComponents>) -> Connector
802 + Send
803 + Sync
804 + 'static,
805 {
806 build_with_conn_fn(
807 self.client_builder,
808 move |_builder, settings, runtime_components| {
809 connector_fn(settings, runtime_components)
810 },
811 )
812 }
813
814 #[doc(hidden)]
816 pub fn build_http(self) -> SharedHttpClient {
817 build_with_conn_fn(
818 self.client_builder,
819 move |client_builder, settings, runtime_components| {
820 let builder = new_conn_builder(client_builder, settings, runtime_components);
821 builder.build_http()
822 },
823 )
824 }
825
826 pub fn tls_provider(self, provider: tls::Provider) -> Builder<TlsProviderSelected> {
828 Builder {
829 client_builder: self.client_builder,
830 tls_provider: TlsProviderSelected {
831 provider,
832 context: TlsContext::default(),
833 },
834 }
835 }
836}
837
838pub(crate) fn build_with_conn_fn<F>(
839 client_builder: Option<hyper_util::client::legacy::Builder>,
840 connector_fn: F,
841) -> SharedHttpClient
842where
843 F: Fn(
844 hyper_util::client::legacy::Builder,
845 Option<&HttpConnectorSettings>,
846 Option<&RuntimeComponents>,
847 ) -> Connector
848 + Send
849 + Sync
850 + 'static,
851{
852 SharedHttpClient::new(HyperClient {
853 connector_cache: RwLock::new(HashMap::new()),
854 client_builder: client_builder
855 .unwrap_or_else(|| hyper_util::client::legacy::Builder::new(TokioExecutor::new())),
856 connector_fn,
857 })
858}
859
860#[allow(dead_code)]
861pub(crate) fn build_with_tcp_conn_fn<C, F>(
862 client_builder: Option<hyper_util::client::legacy::Builder>,
863 tcp_connector_fn: F,
864) -> SharedHttpClient
865where
866 F: Fn() -> C + Send + Sync + 'static,
867 C: Clone + Send + Sync + 'static,
868 C: tower::Service<Uri>,
869 C::Response: Connection + Read + Write + Send + Sync + Unpin + 'static,
870 C::Future: Unpin + Send + 'static,
871 C::Error: Into<BoxError>,
872 C: Connect,
873{
874 build_with_conn_fn(
875 client_builder,
876 move |client_builder, settings, runtime_components| {
877 let builder = new_conn_builder(client_builder, settings, runtime_components);
878 builder.wrap_connector(tcp_connector_fn())
879 },
880 )
881}
882
883fn new_conn_builder(
884 client_builder: hyper_util::client::legacy::Builder,
885 settings: Option<&HttpConnectorSettings>,
886 runtime_components: Option<&RuntimeComponents>,
887) -> ConnectorBuilder {
888 let mut builder = Connector::builder().hyper_builder(client_builder);
889 builder.set_connector_settings(settings.cloned());
890 if let Some(components) = runtime_components {
891 builder.set_sleep_impl(components.sleep_impl());
892 }
893 builder
894}
895
896#[cfg(test)]
897mod test {
898 use std::io::{Error, ErrorKind};
899 use std::pin::Pin;
900 use std::sync::atomic::{AtomicU32, Ordering};
901 use std::sync::Arc;
902 use std::task::{Context, Poll};
903
904 use crate::client::timeout::test::NeverConnects;
905 use aws_smithy_async::assert_elapsed;
906 use aws_smithy_async::rt::sleep::TokioSleep;
907 use aws_smithy_async::time::SystemTimeSource;
908 use aws_smithy_runtime_api::client::runtime_components::RuntimeComponentsBuilder;
909 use http_1x::Uri;
910 use hyper::rt::ReadBufCursor;
911 use hyper_util::client::legacy::connect::Connected;
912
913 use super::*;
914
915 #[tokio::test]
916 async fn connector_selection() {
917 let creation_count = Arc::new(AtomicU32::new(0));
919 let http_client = build_with_tcp_conn_fn(None, {
920 let count = creation_count.clone();
921 move || {
922 count.fetch_add(1, Ordering::Relaxed);
923 NeverConnects
924 }
925 });
926
927 let settings = [
929 HttpConnectorSettings::builder()
930 .connect_timeout(Duration::from_secs(3))
931 .build(),
932 HttpConnectorSettings::builder()
933 .read_timeout(Duration::from_secs(3))
934 .build(),
935 HttpConnectorSettings::builder()
936 .connect_timeout(Duration::from_secs(3))
937 .read_timeout(Duration::from_secs(3))
938 .build(),
939 HttpConnectorSettings::builder()
940 .connect_timeout(Duration::from_secs(5))
941 .read_timeout(Duration::from_secs(3))
942 .build(),
943 ];
944
945 let components = RuntimeComponentsBuilder::for_tests()
947 .with_time_source(Some(SystemTimeSource::new()))
948 .build()
949 .unwrap();
950 let mut handles = Vec::new();
951 for setting in &settings {
952 for _ in 0..1000 {
953 let client = http_client.clone();
954 handles.push(tokio::spawn({
955 let setting = setting.clone();
956 let components = components.clone();
957 async move {
958 let _ = client.http_connector(&setting, &components);
959 }
960 }));
961 }
962 }
963 for handle in handles {
964 handle.await.unwrap();
965 }
966
967 assert_eq!(4, creation_count.load(Ordering::Relaxed));
969 }
970
971 #[tokio::test]
972 async fn hyper_io_error() {
973 let connector = TestConnection {
974 inner: HangupStream,
975 };
976 let adapter = Connector::builder().wrap_connector(connector).adapter;
977 let err = adapter
978 .call(HttpRequest::get("https://socket-hangup.com").unwrap())
979 .await
980 .expect_err("socket hangup");
981 assert!(err.is_io(), "unexpected error type: {:?}", err);
982 }
983
984 #[derive(Clone)]
986 struct HangupStream;
987
988 impl Connection for HangupStream {
989 fn connected(&self) -> Connected {
990 Connected::new()
991 }
992 }
993
994 impl Read for HangupStream {
995 fn poll_read(
996 self: Pin<&mut Self>,
997 _cx: &mut Context<'_>,
998 _buf: ReadBufCursor<'_>,
999 ) -> Poll<std::io::Result<()>> {
1000 Poll::Ready(Err(Error::new(
1001 ErrorKind::ConnectionReset,
1002 "connection reset",
1003 )))
1004 }
1005 }
1006
1007 impl Write for HangupStream {
1008 fn poll_write(
1009 self: Pin<&mut Self>,
1010 _cx: &mut Context<'_>,
1011 _buf: &[u8],
1012 ) -> Poll<Result<usize, Error>> {
1013 Poll::Pending
1014 }
1015
1016 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
1017 Poll::Pending
1018 }
1019
1020 fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
1021 Poll::Pending
1022 }
1023 }
1024
1025 #[derive(Clone)]
1026 struct TestConnection<T> {
1027 inner: T,
1028 }
1029
1030 impl<T> tower::Service<Uri> for TestConnection<T>
1031 where
1032 T: Clone + Connection,
1033 {
1034 type Response = T;
1035 type Error = BoxError;
1036 type Future = std::future::Ready<Result<Self::Response, Self::Error>>;
1037
1038 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1039 Poll::Ready(Ok(()))
1040 }
1041
1042 fn call(&mut self, _req: Uri) -> Self::Future {
1043 std::future::ready(Ok(self.inner.clone()))
1044 }
1045 }
1046
1047 #[tokio::test]
1048 async fn http_connect_timeout_works() {
1049 let tcp_connector = NeverConnects::default();
1050 let connector_settings = HttpConnectorSettings::builder()
1051 .connect_timeout(Duration::from_secs(1))
1052 .build();
1053 let hyper = Connector::builder()
1054 .connector_settings(connector_settings)
1055 .sleep_impl(SharedAsyncSleep::new(TokioSleep::new()))
1056 .wrap_connector(tcp_connector)
1057 .adapter;
1058 let now = tokio::time::Instant::now();
1059 tokio::time::pause();
1060 let resp = hyper
1061 .call(HttpRequest::get("https://static-uri.com").unwrap())
1062 .await
1063 .unwrap_err();
1064 assert!(
1065 resp.is_timeout(),
1066 "expected resp.is_timeout() to be true but it was false, resp == {:?}",
1067 resp
1068 );
1069 let message = DisplayErrorContext(&resp).to_string();
1070 let expected = "timeout: client error (Connect): HTTP connect timeout occurred after 1s";
1071 assert!(
1072 message.contains(expected),
1073 "expected '{message}' to contain '{expected}'"
1074 );
1075 assert_elapsed!(now, Duration::from_secs(1));
1076 }
1077
1078 #[tokio::test]
1079 async fn http_read_timeout_works() {
1080 let tcp_connector = crate::client::timeout::test::NeverReplies;
1081 let connector_settings = HttpConnectorSettings::builder()
1082 .connect_timeout(Duration::from_secs(1))
1083 .read_timeout(Duration::from_secs(2))
1084 .build();
1085 let hyper = Connector::builder()
1086 .connector_settings(connector_settings)
1087 .sleep_impl(SharedAsyncSleep::new(TokioSleep::new()))
1088 .wrap_connector(tcp_connector)
1089 .adapter;
1090 let now = tokio::time::Instant::now();
1091 tokio::time::pause();
1092 let err = hyper
1093 .call(HttpRequest::get("https://fake-uri.com").unwrap())
1094 .await
1095 .unwrap_err();
1096 assert!(
1097 err.is_timeout(),
1098 "expected err.is_timeout() to be true but it was false, err == {err:?}",
1099 );
1100 let message = format!("{}", DisplayErrorContext(&err));
1101 let expected = "timeout: HTTP read timeout occurred after 2s";
1102 assert!(
1103 message.contains(expected),
1104 "expected '{message}' to contain '{expected}'"
1105 );
1106 assert_elapsed!(now, Duration::from_secs(2));
1107 }
1108
1109 #[cfg(not(windows))]
1110 #[tokio::test]
1111 async fn connection_refused_works() {
1112 use crate::client::dns::HyperUtilResolver;
1113 use aws_smithy_runtime_api::client::dns::{DnsFuture, ResolveDns};
1114 use std::net::{IpAddr, Ipv4Addr};
1115
1116 #[derive(Debug, Clone, Default)]
1117 struct TestResolver;
1118 impl ResolveDns for TestResolver {
1119 fn resolve_dns<'a>(&'a self, _name: &'a str) -> DnsFuture<'a> {
1120 let localhost_v4 = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
1121 DnsFuture::ready(Ok(vec![localhost_v4]))
1122 }
1123 }
1124
1125 let connector_settings = HttpConnectorSettings::builder()
1126 .connect_timeout(Duration::from_secs(20))
1127 .build();
1128
1129 let resolver = HyperUtilResolver {
1130 resolver: TestResolver,
1131 };
1132 let connector = Connector::builder().base_connector_with_resolver(resolver);
1133
1134 let hyper = Connector::builder()
1135 .connector_settings(connector_settings)
1136 .sleep_impl(SharedAsyncSleep::new(TokioSleep::new()))
1137 .wrap_connector(connector)
1138 .adapter;
1139
1140 let resp = hyper
1141 .call(HttpRequest::get("http://static-uri:50227.com").unwrap())
1142 .await
1143 .unwrap_err();
1144 assert!(
1145 resp.is_io(),
1146 "expected resp.is_io() to be true but it was false, resp == {:?}",
1147 resp
1148 );
1149 let message = DisplayErrorContext(&resp).to_string();
1150 let expected = "Connection refused";
1151 assert!(
1152 message.contains(expected),
1153 "expected '{message}' to contain '{expected}'"
1154 );
1155 }
1156
1157 #[cfg(feature = "s2n-tls")]
1158 #[tokio::test]
1159 async fn s2n_tls_provider() {
1160 let client = Builder::new()
1162 .tls_provider(tls::Provider::S2nTls)
1163 .build_https();
1164 let connector_settings = HttpConnectorSettings::builder().build();
1165
1166 let runtime_components = RuntimeComponentsBuilder::for_tests()
1169 .with_time_source(Some(SystemTimeSource::new()))
1170 .build()
1171 .unwrap();
1172
1173 let connector = client.http_connector(&connector_settings, &runtime_components);
1174
1175 let error = connector
1180 .call(HttpRequest::get("notascheme://amazon.com").unwrap())
1181 .await
1182 .unwrap_err();
1183 let error = error.into_source();
1184 let s2n_error = error
1185 .source()
1186 .unwrap()
1187 .downcast_ref::<s2n_tls_hyper::error::Error>()
1188 .unwrap();
1189 assert!(matches!(
1190 s2n_error,
1191 s2n_tls_hyper::error::Error::InvalidScheme
1192 ));
1193 }
1194}