1use crate::hyper_legacy::timeout_middleware::HttpTimeoutError;
7use aws_smithy_async::future::timeout::TimedOutError;
8use aws_smithy_async::rt::sleep::{default_async_sleep, AsyncSleep, SharedAsyncSleep};
9use aws_smithy_runtime_api::box_error::BoxError;
10use aws_smithy_runtime_api::client::connection::CaptureSmithyConnection;
11use aws_smithy_runtime_api::client::connection::ConnectionMetadata;
12use aws_smithy_runtime_api::client::connector_metadata::ConnectorMetadata;
13use aws_smithy_runtime_api::client::http::{
14 HttpClient, HttpConnector, HttpConnectorFuture, HttpConnectorSettings, SharedHttpClient,
15 SharedHttpConnector,
16};
17use aws_smithy_runtime_api::client::orchestrator::{HttpRequest, HttpResponse};
18use aws_smithy_runtime_api::client::result::ConnectorError;
19use aws_smithy_runtime_api::client::runtime_components::{
20 RuntimeComponents, RuntimeComponentsBuilder,
21};
22use aws_smithy_runtime_api::shared::IntoShared;
23use aws_smithy_types::body::SdkBody;
24use aws_smithy_types::config_bag::ConfigBag;
25use aws_smithy_types::error::display::DisplayErrorContext;
26use aws_smithy_types::retry::ErrorKind;
27use h2_0_3::Reason;
28use hyper_0_14::client::connect::{capture_connection, CaptureConnection, Connection, HttpInfo};
29use std::borrow::Cow;
30use std::collections::HashMap;
31use std::error::Error;
32use std::fmt;
33use std::sync::RwLock;
34use std::time::Duration;
35use tokio::io::{AsyncRead, AsyncWrite};
36
37#[cfg(feature = "legacy-rustls-ring")]
38mod default_connector {
39 use aws_smithy_async::rt::sleep::SharedAsyncSleep;
40 use aws_smithy_runtime_api::client::http::HttpConnectorSettings;
41 use legacy_hyper_rustls as hyper_rustls;
42 use legacy_rustls as rustls;
43 use std::sync::LazyLock;
44
45 pub(crate) static HTTPS_NATIVE_ROOTS: LazyLock<
48 hyper_rustls::HttpsConnector<hyper_0_14::client::HttpConnector>,
49 > = LazyLock::new(default_tls);
50
51 fn default_tls() -> hyper_rustls::HttpsConnector<hyper_0_14::client::HttpConnector> {
52 use legacy_rustls::client::WantsTransparencyPolicyOrClientCert;
53 use legacy_rustls::{ClientConfig, ConfigBuilder, WantsVerifier};
54 use rustls_native_certs;
55 fn with_native_roots(
58 this: ConfigBuilder<ClientConfig, WantsVerifier>,
59 ) -> ConfigBuilder<ClientConfig, WantsTransparencyPolicyOrClientCert> {
60 let mut roots = rustls::RootCertStore::empty();
61 let mut valid_count = 0;
62 let mut invalid_count = 0;
63
64 for cert in
65 rustls_native_certs::load_native_certs().expect("could not load platform certs")
66 {
67 let cert = rustls::Certificate(cert.to_vec());
68 match roots.add(&cert) {
69 Ok(_) => valid_count += 1,
70 Err(err) => {
71 tracing::trace!("invalid cert der {:?}", cert.0);
72 tracing::debug!("certificate parsing failed: {:?}", err);
73 invalid_count += 1
74 }
75 }
76 }
77 tracing::debug!(
78 "with_native_roots processed {} valid and {} invalid certs",
79 valid_count,
80 invalid_count
81 );
82 assert!(!roots.is_empty(), "no CA certificates found");
83
84 this.with_root_certificates(roots)
85 }
86 hyper_rustls::HttpsConnectorBuilder::new()
87 .with_tls_config(
88 with_native_roots(rustls::ClientConfig::builder()
89 .with_cipher_suites(&[
90 rustls::cipher_suite::TLS13_AES_256_GCM_SHA384,
92 rustls::cipher_suite::TLS13_AES_128_GCM_SHA256,
93 rustls::cipher_suite::TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
95 rustls::cipher_suite::TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
96 rustls::cipher_suite::TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
97 rustls::cipher_suite::TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
98 rustls::cipher_suite::TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_SHA256,
99 ])
100 .with_safe_default_kx_groups()
101 .with_safe_default_protocol_versions()
102 .expect("Error with the TLS configuration. Please file a bug report under https://github.com/smithy-lang/smithy-rs/issues."))
103 .with_no_client_auth()
104 )
105 .https_or_http()
106 .enable_http1()
107 .enable_http2()
108 .build()
109 }
110
111 pub(super) fn base(
112 settings: &HttpConnectorSettings,
113 sleep: Option<SharedAsyncSleep>,
114 ) -> super::HyperConnectorBuilder {
115 let mut hyper = super::HyperConnector::builder().connector_settings(settings.clone());
116 if let Some(sleep) = sleep {
117 hyper = hyper.sleep_impl(sleep);
118 }
119 hyper
120 }
121
122 pub(super) fn https() -> hyper_rustls::HttpsConnector<hyper_0_14::client::HttpConnector> {
127 HTTPS_NATIVE_ROOTS.clone()
128 }
129}
130
131pub fn default_connector(
133 settings: &HttpConnectorSettings,
134 sleep: Option<SharedAsyncSleep>,
135) -> Option<SharedHttpConnector> {
136 #[cfg(feature = "legacy-rustls-ring")]
137 {
138 tracing::trace!(settings = ?settings, sleep = ?sleep, "creating a new default connector");
139 let hyper = default_connector::base(settings, sleep).build_https();
140 Some(SharedHttpConnector::new(hyper))
141 }
142 #[cfg(not(feature = "legacy-rustls-ring"))]
143 {
144 tracing::trace!(settings = ?settings, sleep = ?sleep, "no default connector available");
145 None
146 }
147}
148
149pub fn default_client() -> Option<SharedHttpClient> {
151 #[cfg(feature = "legacy-rustls-ring")]
152 {
153 tracing::trace!("creating a new default hyper 0.14.x client");
154 Some(HyperClientBuilder::new().build_https())
155 }
156 #[cfg(not(feature = "legacy-rustls-ring"))]
157 {
158 tracing::trace!("no default connector available");
159 None
160 }
161}
162
163#[derive(Debug)]
171pub struct HyperConnector {
172 adapter: Box<dyn HttpConnector>,
173}
174
175impl HyperConnector {
176 pub fn builder() -> HyperConnectorBuilder {
178 Default::default()
179 }
180}
181
182impl HttpConnector for HyperConnector {
183 fn call(&self, request: HttpRequest) -> HttpConnectorFuture {
184 self.adapter.call(request)
185 }
186}
187
188#[derive(Default, Debug)]
190pub struct HyperConnectorBuilder {
191 connector_settings: Option<HttpConnectorSettings>,
192 sleep_impl: Option<SharedAsyncSleep>,
193 client_builder: Option<hyper_0_14::client::Builder>,
194}
195
196impl HyperConnectorBuilder {
197 pub fn build<C>(self, tcp_connector: C) -> HyperConnector
199 where
200 C: Clone + Send + Sync + 'static,
201 C: hyper_0_14::service::Service<http_02x::Uri>,
202 C::Response: Connection + AsyncRead + AsyncWrite + Send + Unpin + 'static,
203 C::Future: Unpin + Send + 'static,
204 C::Error: Into<BoxError>,
205 {
206 let client_builder = self.client_builder.unwrap_or_default();
207 let sleep_impl = self.sleep_impl.or_else(default_async_sleep);
208 let (connect_timeout, read_timeout) = self
209 .connector_settings
210 .map(|c| (c.connect_timeout(), c.read_timeout()))
211 .unwrap_or((None, None));
212
213 let connector = match connect_timeout {
214 Some(duration) => timeout_middleware::ConnectTimeout::new(
215 tcp_connector,
216 sleep_impl
217 .clone()
218 .expect("a sleep impl must be provided in order to have a connect timeout"),
219 duration,
220 ),
221 None => timeout_middleware::ConnectTimeout::no_timeout(tcp_connector),
222 };
223 let base = client_builder.build(connector);
224 let read_timeout = match read_timeout {
225 Some(duration) => timeout_middleware::HttpReadTimeout::new(
226 base,
227 sleep_impl.expect("a sleep impl must be provided in order to have a read timeout"),
228 duration,
229 ),
230 None => timeout_middleware::HttpReadTimeout::no_timeout(base),
231 };
232 HyperConnector {
233 adapter: Box::new(Adapter {
234 client: read_timeout,
235 }),
236 }
237 }
238
239 #[cfg(feature = "legacy-rustls-ring")]
241 pub fn build_https(self) -> HyperConnector {
242 self.build(default_connector::https())
243 }
244
245 pub fn sleep_impl(mut self, sleep_impl: impl AsyncSleep + 'static) -> Self {
250 self.sleep_impl = Some(sleep_impl.into_shared());
251 self
252 }
253
254 pub fn set_sleep_impl(&mut self, sleep_impl: Option<SharedAsyncSleep>) -> &mut Self {
259 self.sleep_impl = sleep_impl;
260 self
261 }
262
263 pub fn connector_settings(mut self, connector_settings: HttpConnectorSettings) -> Self {
265 self.connector_settings = Some(connector_settings);
266 self
267 }
268
269 pub fn set_connector_settings(
271 &mut self,
272 connector_settings: Option<HttpConnectorSettings>,
273 ) -> &mut Self {
274 self.connector_settings = connector_settings;
275 self
276 }
277
278 pub fn hyper_builder(mut self, hyper_builder: hyper_0_14::client::Builder) -> Self {
282 self.client_builder = Some(hyper_builder);
283 self
284 }
285
286 pub fn set_hyper_builder(
290 &mut self,
291 hyper_builder: Option<hyper_0_14::client::Builder>,
292 ) -> &mut Self {
293 self.client_builder = hyper_builder;
294 self
295 }
296}
297
298struct Adapter<C> {
302 client: timeout_middleware::HttpReadTimeout<
303 hyper_0_14::Client<timeout_middleware::ConnectTimeout<C>, SdkBody>,
304 >,
305}
306
307impl<C> fmt::Debug for Adapter<C> {
308 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
309 f.debug_struct("Adapter")
310 .field("client", &"** hyper client **")
311 .finish()
312 }
313}
314
315fn extract_smithy_connection(capture_conn: &CaptureConnection) -> Option<ConnectionMetadata> {
317 let capture_conn = capture_conn.clone();
318 if let Some(conn) = capture_conn.clone().connection_metadata().as_ref() {
319 let mut extensions = http_02x::Extensions::new();
320 conn.get_extras(&mut extensions);
321 let http_info = extensions.get::<HttpInfo>();
322 let mut builder = ConnectionMetadata::builder()
323 .proxied(conn.is_proxied())
324 .poison_fn(move || match capture_conn.connection_metadata().as_ref() {
325 Some(conn) => conn.poison(),
326 None => tracing::trace!("no connection existed to poison"),
327 });
328
329 builder
330 .set_local_addr(http_info.map(|info| info.local_addr()))
331 .set_remote_addr(http_info.map(|info| info.remote_addr()));
332
333 let smithy_connection = builder.build();
334
335 Some(smithy_connection)
336 } else {
337 None
338 }
339}
340
341impl<C> HttpConnector for Adapter<C>
342where
343 C: Clone + Send + Sync + 'static,
344 C: hyper_0_14::service::Service<http_02x::Uri>,
345 C::Response: Connection + AsyncRead + AsyncWrite + Send + Unpin + 'static,
346 C::Future: Unpin + Send + 'static,
347 C::Error: Into<BoxError>,
348{
349 fn call(&self, request: HttpRequest) -> HttpConnectorFuture {
350 use hyper_0_14::service::Service;
351
352 let mut request = match request.try_into_http02x() {
353 Ok(request) => request,
354 Err(err) => {
355 return HttpConnectorFuture::ready(Err(ConnectorError::other(err.into(), None)));
356 }
357 };
358 let capture_connection = capture_connection(&mut request);
359 if let Some(capture_smithy_connection) =
360 request.extensions().get::<CaptureSmithyConnection>()
361 {
362 capture_smithy_connection
363 .set_connection_retriever(move || extract_smithy_connection(&capture_connection));
364 }
365 let mut client = self.client.clone();
366 let fut = client.call(request);
367 HttpConnectorFuture::new(async move {
368 let response = fut
369 .await
370 .map_err(downcast_error)?
371 .map(SdkBody::from_body_0_4);
372 match HttpResponse::try_from(response) {
373 Ok(response) => Ok(response),
374 Err(err) => Err(ConnectorError::other(err.into(), None)),
375 }
376 })
377 }
378}
379
380fn downcast_error(err: BoxError) -> ConnectorError {
382 if find_source::<TimedOutError>(err.as_ref()).is_some() {
384 return ConnectorError::timeout(err);
385 }
386 let err = match err.downcast::<ConnectorError>() {
388 Ok(connector_error) => return *connector_error,
389 Err(box_error) => box_error,
390 };
391 let err = match err.downcast::<hyper_0_14::Error>() {
394 Ok(hyper_error) => return to_connector_error(*hyper_error),
395 Err(box_error) => box_error,
396 };
397
398 ConnectorError::other(err, None)
400}
401
402fn to_connector_error(err: hyper_0_14::Error) -> ConnectorError {
404 if err.is_timeout() || find_source::<HttpTimeoutError>(&err).is_some() {
405 return ConnectorError::timeout(err.into());
406 }
407 if err.is_user() {
408 return ConnectorError::user(err.into());
409 }
410 if err.is_closed() || err.is_canceled() || find_source::<std::io::Error>(&err).is_some() {
411 return ConnectorError::io(err.into());
412 }
413 if err.is_incomplete_message() {
415 return ConnectorError::other(err.into(), Some(ErrorKind::TransientError));
416 }
417 if let Some(h2_err) = find_source::<h2_0_3::Error>(&err) {
418 if h2_err.is_go_away()
419 || (h2_err.is_reset() && h2_err.reason() == Some(Reason::REFUSED_STREAM))
420 {
421 return ConnectorError::io(err.into());
422 }
423 }
424
425 tracing::warn!(err = %DisplayErrorContext(&err), "unrecognized error from Hyper. If this error should be retried, please file an issue.");
426 ConnectorError::other(err.into(), None)
427}
428
429fn find_source<'a, E: Error + 'static>(err: &'a (dyn Error + 'static)) -> Option<&'a E> {
430 let mut next = Some(err);
431 while let Some(err) = next {
432 if let Some(matching_err) = err.downcast_ref::<E>() {
433 return Some(matching_err);
434 }
435 next = err.source();
436 }
437 None
438}
439
440#[derive(Clone, Debug, Eq, PartialEq, Hash)]
441struct CacheKey {
442 connect_timeout: Option<Duration>,
443 read_timeout: Option<Duration>,
444}
445
446impl From<&HttpConnectorSettings> for CacheKey {
447 fn from(value: &HttpConnectorSettings) -> Self {
448 Self {
449 connect_timeout: value.connect_timeout(),
450 read_timeout: value.read_timeout(),
451 }
452 }
453}
454
455struct HyperClient<F> {
456 connector_cache: RwLock<HashMap<CacheKey, SharedHttpConnector>>,
457 client_builder: hyper_0_14::client::Builder,
458 tcp_connector_fn: F,
459}
460
461impl<F> fmt::Debug for HyperClient<F> {
462 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
463 f.debug_struct("HyperClient")
464 .field("connector_cache", &self.connector_cache)
465 .field("client_builder", &self.client_builder)
466 .finish()
467 }
468}
469
470impl<C, F> HttpClient for HyperClient<F>
471where
472 F: Fn() -> C + Send + Sync,
473 C: Clone + Send + Sync + 'static,
474 C: hyper_0_14::service::Service<http_02x::Uri>,
475 C::Response: Connection + AsyncRead + AsyncWrite + Send + Unpin + 'static,
476 C::Future: Unpin + Send + 'static,
477 C::Error: Into<BoxError>,
478{
479 fn http_connector(
480 &self,
481 settings: &HttpConnectorSettings,
482 components: &RuntimeComponents,
483 ) -> SharedHttpConnector {
484 let key = CacheKey::from(settings);
485 let mut connector = self.connector_cache.read().unwrap().get(&key).cloned();
486 if connector.is_none() {
487 let mut cache = self.connector_cache.write().unwrap();
488 if !cache.contains_key(&key) {
490 let mut builder = HyperConnector::builder()
491 .hyper_builder(self.client_builder.clone())
492 .connector_settings(settings.clone());
493 builder.set_sleep_impl(components.sleep_impl());
494
495 let start = components.time_source().map(|ts| ts.now());
496 let tcp_connector = (self.tcp_connector_fn)();
497 let end = components.time_source().map(|ts| ts.now());
498 if let (Some(start), Some(end)) = (start, end) {
499 if let Ok(elapsed) = end.duration_since(start) {
500 tracing::debug!("new TCP connector created in {:?}", elapsed);
501 }
502 }
503 let connector = SharedHttpConnector::new(builder.build(tcp_connector));
504 cache.insert(key.clone(), connector);
505 }
506 connector = cache.get(&key).cloned();
507 }
508
509 connector.expect("cache populated above")
510 }
511
512 fn validate_base_client_config(
513 &self,
514 _: &RuntimeComponentsBuilder,
515 _: &ConfigBag,
516 ) -> Result<(), BoxError> {
517 let _ = (self.tcp_connector_fn)();
523 Ok(())
524 }
525
526 fn connector_metadata(&self) -> Option<ConnectorMetadata> {
527 Some(ConnectorMetadata::new("hyper", Some(Cow::Borrowed("0.x"))))
528 }
529}
530
531#[derive(Clone, Default, Debug)]
585pub struct HyperClientBuilder {
586 client_builder: Option<hyper_0_14::client::Builder>,
587}
588
589impl HyperClientBuilder {
590 pub fn new() -> Self {
592 Self::default()
593 }
594
595 pub fn hyper_builder(mut self, hyper_builder: hyper_0_14::client::Builder) -> Self {
599 self.client_builder = Some(hyper_builder);
600 self
601 }
602
603 pub fn set_hyper_builder(
607 &mut self,
608 hyper_builder: Option<hyper_0_14::client::Builder>,
609 ) -> &mut Self {
610 self.client_builder = hyper_builder;
611 self
612 }
613
614 #[cfg(feature = "legacy-rustls-ring")]
619 pub fn build_https(self) -> SharedHttpClient {
620 self.build_with_fn(default_connector::https)
621 }
622
623 #[cfg_attr(
626 feature = "legacy-rustls-ring",
627 doc = "Use [`build_https`](HyperClientBuilder::build_https) if you don't want to provide a custom TCP connector."
628 )]
629 pub fn build<C>(self, tcp_connector: C) -> SharedHttpClient
630 where
631 C: Clone + Send + Sync + 'static,
632 C: hyper_0_14::service::Service<http_02x::Uri>,
633 C::Response: Connection + AsyncRead + AsyncWrite + Send + Unpin + 'static,
634 C::Future: Unpin + Send + 'static,
635 C::Error: Into<BoxError>,
636 {
637 self.build_with_fn(move || tcp_connector.clone())
638 }
639
640 fn build_with_fn<C, F>(self, tcp_connector_fn: F) -> SharedHttpClient
641 where
642 F: Fn() -> C + Send + Sync + 'static,
643 C: Clone + Send + Sync + 'static,
644 C: hyper_0_14::service::Service<http_02x::Uri>,
645 C::Response: Connection + AsyncRead + AsyncWrite + Send + Unpin + 'static,
646 C::Future: Unpin + Send + 'static,
647 C::Error: Into<BoxError>,
648 {
649 SharedHttpClient::new(HyperClient {
650 connector_cache: RwLock::new(HashMap::new()),
651 client_builder: self.client_builder.unwrap_or_default(),
652 tcp_connector_fn,
653 })
654 }
655}
656
657mod timeout_middleware {
658 use aws_smithy_async::future::timeout::{TimedOutError, Timeout};
659 use aws_smithy_async::rt::sleep::Sleep;
660 use aws_smithy_async::rt::sleep::{AsyncSleep, SharedAsyncSleep};
661 use aws_smithy_runtime_api::box_error::BoxError;
662 use pin_project_lite::pin_project;
663 use std::error::Error;
664 use std::fmt::Formatter;
665 use std::future::Future;
666 use std::pin::Pin;
667 use std::task::{Context, Poll};
668 use std::time::Duration;
669
670 #[derive(Debug)]
671 pub(crate) struct HttpTimeoutError {
672 kind: &'static str,
673 duration: Duration,
674 }
675
676 impl std::fmt::Display for HttpTimeoutError {
677 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
678 write!(
679 f,
680 "{} timeout occurred after {:?}",
681 self.kind, self.duration
682 )
683 }
684 }
685
686 impl Error for HttpTimeoutError {
687 fn source(&self) -> Option<&(dyn Error + 'static)> {
691 Some(&TimedOutError)
692 }
693 }
694
695 #[derive(Clone, Debug)]
700 pub(super) struct ConnectTimeout<I> {
701 inner: I,
702 timeout: Option<(SharedAsyncSleep, Duration)>,
703 }
704
705 impl<I> ConnectTimeout<I> {
706 pub(crate) fn new(inner: I, sleep: SharedAsyncSleep, timeout: Duration) -> Self {
710 Self {
711 inner,
712 timeout: Some((sleep, timeout)),
713 }
714 }
715
716 pub(crate) fn no_timeout(inner: I) -> Self {
717 Self {
718 inner,
719 timeout: None,
720 }
721 }
722 }
723
724 #[derive(Clone, Debug)]
725 pub(crate) struct HttpReadTimeout<I> {
726 inner: I,
727 timeout: Option<(SharedAsyncSleep, Duration)>,
728 }
729
730 impl<I> HttpReadTimeout<I> {
731 pub(crate) fn new(inner: I, sleep: SharedAsyncSleep, timeout: Duration) -> Self {
735 Self {
736 inner,
737 timeout: Some((sleep, timeout)),
738 }
739 }
740
741 pub(crate) fn no_timeout(inner: I) -> Self {
742 Self {
743 inner,
744 timeout: None,
745 }
746 }
747 }
748
749 pin_project! {
750 #[project = MaybeTimeoutFutureProj]
755 pub enum MaybeTimeoutFuture<F> {
756 Timeout {
757 #[pin]
758 timeout: Timeout<F, Sleep>,
759 error_type: &'static str,
760 duration: Duration,
761 },
762 NoTimeout {
763 #[pin]
764 future: F
765 }
766 }
767 }
768
769 impl<F, T, E> Future for MaybeTimeoutFuture<F>
770 where
771 F: Future<Output = Result<T, E>>,
772 E: Into<BoxError>,
773 {
774 type Output = Result<T, BoxError>;
775
776 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
777 let (timeout_future, kind, &mut duration) = match self.project() {
778 MaybeTimeoutFutureProj::NoTimeout { future } => {
779 return future.poll(cx).map_err(|err| err.into());
780 }
781 MaybeTimeoutFutureProj::Timeout {
782 timeout,
783 error_type,
784 duration,
785 } => (timeout, error_type, duration),
786 };
787 match timeout_future.poll(cx) {
788 Poll::Ready(Ok(response)) => Poll::Ready(response.map_err(|err| err.into())),
789 Poll::Ready(Err(_timeout)) => {
790 Poll::Ready(Err(HttpTimeoutError { kind, duration }.into()))
791 }
792 Poll::Pending => Poll::Pending,
793 }
794 }
795 }
796
797 impl<I> hyper_0_14::service::Service<http_02x::Uri> for ConnectTimeout<I>
798 where
799 I: hyper_0_14::service::Service<http_02x::Uri>,
800 I::Error: Into<BoxError>,
801 {
802 type Response = I::Response;
803 type Error = BoxError;
804 type Future = MaybeTimeoutFuture<I::Future>;
805
806 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
807 self.inner.poll_ready(cx).map_err(|err| err.into())
808 }
809
810 fn call(&mut self, req: http_02x::Uri) -> Self::Future {
811 match &self.timeout {
812 Some((sleep, duration)) => {
813 let sleep = sleep.sleep(*duration);
814 MaybeTimeoutFuture::Timeout {
815 timeout: Timeout::new(self.inner.call(req), sleep),
816 error_type: "HTTP connect",
817 duration: *duration,
818 }
819 }
820 None => MaybeTimeoutFuture::NoTimeout {
821 future: self.inner.call(req),
822 },
823 }
824 }
825 }
826
827 impl<I, B> hyper_0_14::service::Service<http_02x::Request<B>> for HttpReadTimeout<I>
828 where
829 I: hyper_0_14::service::Service<http_02x::Request<B>, Error = hyper_0_14::Error>,
830 {
831 type Response = I::Response;
832 type Error = BoxError;
833 type Future = MaybeTimeoutFuture<I::Future>;
834
835 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
836 self.inner.poll_ready(cx).map_err(|err| err.into())
837 }
838
839 fn call(&mut self, req: http_02x::Request<B>) -> Self::Future {
840 match &self.timeout {
841 Some((sleep, duration)) => {
842 let sleep = sleep.sleep(*duration);
843 MaybeTimeoutFuture::Timeout {
844 timeout: Timeout::new(self.inner.call(req), sleep),
845 error_type: "HTTP read",
846 duration: *duration,
847 }
848 }
849 None => MaybeTimeoutFuture::NoTimeout {
850 future: self.inner.call(req),
851 },
852 }
853 }
854 }
855
856 #[cfg(test)]
857 pub(crate) mod test {
858 use crate::hyper_014::HyperConnector;
859 use aws_smithy_async::assert_elapsed;
860 use aws_smithy_async::future::never::Never;
861 use aws_smithy_async::rt::sleep::{SharedAsyncSleep, TokioSleep};
862 use aws_smithy_runtime_api::box_error::BoxError;
863 use aws_smithy_runtime_api::client::http::HttpConnectorSettings;
864 use aws_smithy_runtime_api::client::orchestrator::HttpRequest;
865 use aws_smithy_runtime_api::client::result::ConnectorError;
866 use aws_smithy_types::error::display::DisplayErrorContext;
867 use hyper_0_14::client::connect::{Connected, Connection};
868 use std::future::Future;
869 use std::pin::Pin;
870 use std::task::{Context, Poll};
871 use std::time::Duration;
872 use tokio::io::ReadBuf;
873 use tokio::io::{AsyncRead, AsyncWrite};
874 use tokio::net::TcpStream;
875
876 #[allow(unused)]
877 fn connect_timeout_is_correct<T: Send + Sync + Clone + 'static>() {
878 is_send_sync::<super::ConnectTimeout<T>>();
879 }
880
881 #[allow(unused)]
882 fn is_send_sync<T: Send + Sync>() {}
883
884 #[non_exhaustive]
888 #[derive(Clone, Default, Debug)]
889 pub(crate) struct NeverConnects;
890 impl hyper_0_14::service::Service<http_02x::Uri> for NeverConnects {
891 type Response = TcpStream;
892 type Error = ConnectorError;
893 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
894
895 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
896 Poll::Ready(Ok(()))
897 }
898
899 fn call(&mut self, _uri: http_02x::Uri) -> Self::Future {
900 Box::pin(async move {
901 Never::new().await;
902 unreachable!()
903 })
904 }
905 }
906
907 #[derive(Clone, Debug, Default)]
909 struct NeverReplies;
910 impl hyper_0_14::service::Service<http_02x::Uri> for NeverReplies {
911 type Response = EmptyStream;
912 type Error = BoxError;
913 type Future = std::future::Ready<Result<Self::Response, Self::Error>>;
914
915 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
916 Poll::Ready(Ok(()))
917 }
918
919 fn call(&mut self, _req: http_02x::Uri) -> Self::Future {
920 std::future::ready(Ok(EmptyStream))
921 }
922 }
923
924 #[non_exhaustive]
926 #[derive(Debug, Default)]
927 struct EmptyStream;
928 impl AsyncRead for EmptyStream {
929 fn poll_read(
930 self: Pin<&mut Self>,
931 _cx: &mut Context<'_>,
932 _buf: &mut ReadBuf<'_>,
933 ) -> Poll<std::io::Result<()>> {
934 Poll::Pending
935 }
936 }
937 impl AsyncWrite for EmptyStream {
938 fn poll_write(
939 self: Pin<&mut Self>,
940 _cx: &mut Context<'_>,
941 _buf: &[u8],
942 ) -> Poll<Result<usize, std::io::Error>> {
943 Poll::Pending
944 }
945
946 fn poll_flush(
947 self: Pin<&mut Self>,
948 _cx: &mut Context<'_>,
949 ) -> Poll<Result<(), std::io::Error>> {
950 Poll::Pending
951 }
952
953 fn poll_shutdown(
954 self: Pin<&mut Self>,
955 _cx: &mut Context<'_>,
956 ) -> Poll<Result<(), std::io::Error>> {
957 Poll::Pending
958 }
959 }
960 impl Connection for EmptyStream {
961 fn connected(&self) -> Connected {
962 Connected::new()
963 }
964 }
965
966 #[tokio::test]
967 async fn http_connect_timeout_works() {
968 let tcp_connector = NeverConnects::default();
969 let connector_settings = HttpConnectorSettings::builder()
970 .connect_timeout(Duration::from_secs(1))
971 .build();
972 let hyper = HyperConnector::builder()
973 .connector_settings(connector_settings)
974 .sleep_impl(SharedAsyncSleep::new(TokioSleep::new()))
975 .build(tcp_connector)
976 .adapter;
977 let now = tokio::time::Instant::now();
978 tokio::time::pause();
979 let resp = hyper
980 .call(HttpRequest::get("https://static-uri.com").unwrap())
981 .await
982 .unwrap_err();
983 assert!(
984 resp.is_timeout(),
985 "expected resp.is_timeout() to be true but it was false, resp == {:?}",
986 resp
987 );
988 let message = DisplayErrorContext(&resp).to_string();
989 let expected =
990 "timeout: error trying to connect: HTTP connect timeout occurred after 1s";
991 assert!(
992 message.contains(expected),
993 "expected '{message}' to contain '{expected}'"
994 );
995 assert_elapsed!(now, Duration::from_secs(1));
996 }
997
998 #[tokio::test]
999 async fn http_read_timeout_works() {
1000 let tcp_connector = NeverReplies;
1001 let connector_settings = HttpConnectorSettings::builder()
1002 .connect_timeout(Duration::from_secs(1))
1003 .read_timeout(Duration::from_secs(2))
1004 .build();
1005 let hyper = HyperConnector::builder()
1006 .connector_settings(connector_settings)
1007 .sleep_impl(SharedAsyncSleep::new(TokioSleep::new()))
1008 .build(tcp_connector)
1009 .adapter;
1010 let now = tokio::time::Instant::now();
1011 tokio::time::pause();
1012 let err = hyper
1013 .call(HttpRequest::get("https://fake-uri.com").unwrap())
1014 .await
1015 .unwrap_err();
1016 assert!(
1017 err.is_timeout(),
1018 "expected err.is_timeout() to be true but it was false, err == {err:?}",
1019 );
1020 let message = format!("{}", DisplayErrorContext(&err));
1021 let expected = "timeout: HTTP read timeout occurred after 2s";
1022 assert!(
1023 message.contains(expected),
1024 "expected '{message}' to contain '{expected}'"
1025 );
1026 assert_elapsed!(now, Duration::from_secs(2));
1027 }
1028 }
1029}
1030
1031#[cfg(test)]
1032mod test {
1033 use crate::hyper_legacy::timeout_middleware::test::NeverConnects;
1034 use crate::hyper_legacy::{HyperClientBuilder, HyperConnector};
1035 use aws_smithy_async::time::SystemTimeSource;
1036 use aws_smithy_runtime_api::box_error::BoxError;
1037 use aws_smithy_runtime_api::client::http::{HttpClient, HttpConnectorSettings};
1038 use aws_smithy_runtime_api::client::orchestrator::HttpRequest;
1039 use aws_smithy_runtime_api::client::runtime_components::RuntimeComponentsBuilder;
1040 use hyper_0_14::client::connect::{Connected, Connection};
1041 use std::io::{Error, ErrorKind};
1042 use std::pin::Pin;
1043 use std::sync::atomic::{AtomicU32, Ordering};
1044 use std::sync::Arc;
1045 use std::task::{Context, Poll};
1046 use std::time::Duration;
1047 use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
1048
1049 #[tokio::test]
1050 async fn connector_selection() {
1051 let creation_count = Arc::new(AtomicU32::new(0));
1053 let http_client = HyperClientBuilder::new().build_with_fn({
1054 let count = creation_count.clone();
1055 move || {
1056 count.fetch_add(1, Ordering::Relaxed);
1057 NeverConnects::default()
1058 }
1059 });
1060
1061 let settings = [
1063 HttpConnectorSettings::builder()
1064 .connect_timeout(Duration::from_secs(3))
1065 .build(),
1066 HttpConnectorSettings::builder()
1067 .read_timeout(Duration::from_secs(3))
1068 .build(),
1069 HttpConnectorSettings::builder()
1070 .connect_timeout(Duration::from_secs(3))
1071 .read_timeout(Duration::from_secs(3))
1072 .build(),
1073 HttpConnectorSettings::builder()
1074 .connect_timeout(Duration::from_secs(5))
1075 .read_timeout(Duration::from_secs(3))
1076 .build(),
1077 ];
1078
1079 let components = RuntimeComponentsBuilder::for_tests()
1081 .with_time_source(Some(SystemTimeSource::new()))
1082 .build()
1083 .unwrap();
1084 let mut handles = Vec::new();
1085 for setting in &settings {
1086 for _ in 0..1000 {
1087 let client = http_client.clone();
1088 handles.push(tokio::spawn({
1089 let setting = setting.clone();
1090 let components = components.clone();
1091 async move {
1092 let _ = client.http_connector(&setting, &components);
1093 }
1094 }));
1095 }
1096 }
1097 for handle in handles {
1098 handle.await.unwrap();
1099 }
1100
1101 assert_eq!(4, creation_count.load(Ordering::Relaxed));
1103 }
1104
1105 #[tokio::test]
1106 async fn hyper_io_error() {
1107 let connector = TestConnection {
1108 inner: HangupStream,
1109 };
1110 let adapter = HyperConnector::builder().build(connector).adapter;
1111 let err = adapter
1112 .call(HttpRequest::get("https://socket-hangup.com").unwrap())
1113 .await
1114 .expect_err("socket hangup");
1115 assert!(err.is_io(), "{:?}", err);
1116 }
1117
1118 #[derive(Clone)]
1120 struct HangupStream;
1121
1122 impl Connection for HangupStream {
1123 fn connected(&self) -> Connected {
1124 Connected::new()
1125 }
1126 }
1127
1128 impl AsyncRead for HangupStream {
1129 fn poll_read(
1130 self: Pin<&mut Self>,
1131 _cx: &mut Context<'_>,
1132 _buf: &mut ReadBuf<'_>,
1133 ) -> Poll<std::io::Result<()>> {
1134 Poll::Ready(Err(Error::new(
1135 ErrorKind::ConnectionReset,
1136 "connection reset",
1137 )))
1138 }
1139 }
1140
1141 impl AsyncWrite for HangupStream {
1142 fn poll_write(
1143 self: Pin<&mut Self>,
1144 _cx: &mut Context<'_>,
1145 _buf: &[u8],
1146 ) -> Poll<Result<usize, Error>> {
1147 Poll::Pending
1148 }
1149
1150 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
1151 Poll::Pending
1152 }
1153
1154 fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
1155 Poll::Pending
1156 }
1157 }
1158
1159 #[derive(Clone)]
1160 struct TestConnection<T> {
1161 inner: T,
1162 }
1163
1164 impl<T> hyper_0_14::service::Service<http_02x::Uri> for TestConnection<T>
1165 where
1166 T: Clone + Connection,
1167 {
1168 type Response = T;
1169 type Error = BoxError;
1170 type Future = std::future::Ready<Result<Self::Response, Self::Error>>;
1171
1172 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1173 Poll::Ready(Ok(()))
1174 }
1175
1176 fn call(&mut self, _req: http_02x::Uri) -> Self::Future {
1177 std::future::ready(Ok(self.inner.clone()))
1178 }
1179 }
1180}