1use crate::hyper_legacy::timeout_middleware::HttpTimeoutError;
7use aws_smithy_async::future::timeout::TimedOutError;
8use aws_smithy_async::rt::sleep::{default_async_sleep, AsyncSleep, SharedAsyncSleep};
9use aws_smithy_runtime_api::box_error::BoxError;
10use aws_smithy_runtime_api::client::connection::CaptureSmithyConnection;
11use aws_smithy_runtime_api::client::connection::ConnectionMetadata;
12use aws_smithy_runtime_api::client::connector_metadata::ConnectorMetadata;
13use aws_smithy_runtime_api::client::http::{
14 HttpClient, HttpConnector, HttpConnectorFuture, HttpConnectorSettings, SharedHttpClient,
15 SharedHttpConnector,
16};
17use aws_smithy_runtime_api::client::orchestrator::{HttpRequest, HttpResponse};
18use aws_smithy_runtime_api::client::result::ConnectorError;
19use aws_smithy_runtime_api::client::runtime_components::{
20 RuntimeComponents, RuntimeComponentsBuilder,
21};
22use aws_smithy_runtime_api::shared::IntoShared;
23use aws_smithy_types::body::SdkBody;
24use aws_smithy_types::config_bag::ConfigBag;
25use aws_smithy_types::error::display::DisplayErrorContext;
26use aws_smithy_types::retry::ErrorKind;
27use h2::Reason;
28use hyper_0_14::client::connect::{capture_connection, CaptureConnection, Connection, HttpInfo};
29use std::borrow::Cow;
30use std::collections::HashMap;
31use std::error::Error;
32use std::fmt;
33use std::sync::RwLock;
34use std::time::Duration;
35use tokio::io::{AsyncRead, AsyncWrite};
36
37#[cfg(feature = "legacy-rustls-ring")]
38mod default_connector {
39 use aws_smithy_async::rt::sleep::SharedAsyncSleep;
40 use aws_smithy_runtime_api::client::http::HttpConnectorSettings;
41 use legacy_hyper_rustls as hyper_rustls;
42 use legacy_rustls as rustls;
43 use std::sync::LazyLock;
44
45 pub(crate) static HTTPS_NATIVE_ROOTS: LazyLock<
48 hyper_rustls::HttpsConnector<hyper_0_14::client::HttpConnector>,
49 > = LazyLock::new(default_tls);
50
51 fn default_tls() -> hyper_rustls::HttpsConnector<hyper_0_14::client::HttpConnector> {
52 use hyper_rustls::ConfigBuilderExt;
53 hyper_rustls::HttpsConnectorBuilder::new()
54 .with_tls_config(
55 rustls::ClientConfig::builder()
56 .with_cipher_suites(&[
57 rustls::cipher_suite::TLS13_AES_256_GCM_SHA384,
59 rustls::cipher_suite::TLS13_AES_128_GCM_SHA256,
60 rustls::cipher_suite::TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
62 rustls::cipher_suite::TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
63 rustls::cipher_suite::TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
64 rustls::cipher_suite::TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
65 rustls::cipher_suite::TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_SHA256,
66 ])
67 .with_safe_default_kx_groups()
68 .with_safe_default_protocol_versions()
69 .expect("Error with the TLS configuration. Please file a bug report under https://github.com/smithy-lang/smithy-rs/issues.")
70 .with_native_roots()
71 .with_no_client_auth()
72 )
73 .https_or_http()
74 .enable_http1()
75 .enable_http2()
76 .build()
77 }
78
79 pub(super) fn base(
80 settings: &HttpConnectorSettings,
81 sleep: Option<SharedAsyncSleep>,
82 ) -> super::HyperConnectorBuilder {
83 let mut hyper = super::HyperConnector::builder().connector_settings(settings.clone());
84 if let Some(sleep) = sleep {
85 hyper = hyper.sleep_impl(sleep);
86 }
87 hyper
88 }
89
90 pub(super) fn https() -> hyper_rustls::HttpsConnector<hyper_0_14::client::HttpConnector> {
95 HTTPS_NATIVE_ROOTS.clone()
96 }
97}
98
99pub fn default_connector(
101 settings: &HttpConnectorSettings,
102 sleep: Option<SharedAsyncSleep>,
103) -> Option<SharedHttpConnector> {
104 #[cfg(feature = "legacy-rustls-ring")]
105 {
106 tracing::trace!(settings = ?settings, sleep = ?sleep, "creating a new default connector");
107 let hyper = default_connector::base(settings, sleep).build_https();
108 Some(SharedHttpConnector::new(hyper))
109 }
110 #[cfg(not(feature = "legacy-rustls-ring"))]
111 {
112 tracing::trace!(settings = ?settings, sleep = ?sleep, "no default connector available");
113 None
114 }
115}
116
117pub fn default_client() -> Option<SharedHttpClient> {
119 #[cfg(feature = "legacy-rustls-ring")]
120 {
121 tracing::trace!("creating a new default hyper 0.14.x client");
122 Some(HyperClientBuilder::new().build_https())
123 }
124 #[cfg(not(feature = "legacy-rustls-ring"))]
125 {
126 tracing::trace!("no default connector available");
127 None
128 }
129}
130
131#[derive(Debug)]
139pub struct HyperConnector {
140 adapter: Box<dyn HttpConnector>,
141}
142
143impl HyperConnector {
144 pub fn builder() -> HyperConnectorBuilder {
146 Default::default()
147 }
148}
149
150impl HttpConnector for HyperConnector {
151 fn call(&self, request: HttpRequest) -> HttpConnectorFuture {
152 self.adapter.call(request)
153 }
154}
155
156#[derive(Default, Debug)]
158pub struct HyperConnectorBuilder {
159 connector_settings: Option<HttpConnectorSettings>,
160 sleep_impl: Option<SharedAsyncSleep>,
161 client_builder: Option<hyper_0_14::client::Builder>,
162}
163
164impl HyperConnectorBuilder {
165 pub fn build<C>(self, tcp_connector: C) -> HyperConnector
167 where
168 C: Clone + Send + Sync + 'static,
169 C: hyper_0_14::service::Service<http_02x::Uri>,
170 C::Response: Connection + AsyncRead + AsyncWrite + Send + Unpin + 'static,
171 C::Future: Unpin + Send + 'static,
172 C::Error: Into<BoxError>,
173 {
174 let client_builder = self.client_builder.unwrap_or_default();
175 let sleep_impl = self.sleep_impl.or_else(default_async_sleep);
176 let (connect_timeout, read_timeout) = self
177 .connector_settings
178 .map(|c| (c.connect_timeout(), c.read_timeout()))
179 .unwrap_or((None, None));
180
181 let connector = match connect_timeout {
182 Some(duration) => timeout_middleware::ConnectTimeout::new(
183 tcp_connector,
184 sleep_impl
185 .clone()
186 .expect("a sleep impl must be provided in order to have a connect timeout"),
187 duration,
188 ),
189 None => timeout_middleware::ConnectTimeout::no_timeout(tcp_connector),
190 };
191 let base = client_builder.build(connector);
192 let read_timeout = match read_timeout {
193 Some(duration) => timeout_middleware::HttpReadTimeout::new(
194 base,
195 sleep_impl.expect("a sleep impl must be provided in order to have a read timeout"),
196 duration,
197 ),
198 None => timeout_middleware::HttpReadTimeout::no_timeout(base),
199 };
200 HyperConnector {
201 adapter: Box::new(Adapter {
202 client: read_timeout,
203 }),
204 }
205 }
206
207 #[cfg(feature = "legacy-rustls-ring")]
209 pub fn build_https(self) -> HyperConnector {
210 self.build(default_connector::https())
211 }
212
213 pub fn sleep_impl(mut self, sleep_impl: impl AsyncSleep + 'static) -> Self {
218 self.sleep_impl = Some(sleep_impl.into_shared());
219 self
220 }
221
222 pub fn set_sleep_impl(&mut self, sleep_impl: Option<SharedAsyncSleep>) -> &mut Self {
227 self.sleep_impl = sleep_impl;
228 self
229 }
230
231 pub fn connector_settings(mut self, connector_settings: HttpConnectorSettings) -> Self {
233 self.connector_settings = Some(connector_settings);
234 self
235 }
236
237 pub fn set_connector_settings(
239 &mut self,
240 connector_settings: Option<HttpConnectorSettings>,
241 ) -> &mut Self {
242 self.connector_settings = connector_settings;
243 self
244 }
245
246 pub fn hyper_builder(mut self, hyper_builder: hyper_0_14::client::Builder) -> Self {
250 self.client_builder = Some(hyper_builder);
251 self
252 }
253
254 pub fn set_hyper_builder(
258 &mut self,
259 hyper_builder: Option<hyper_0_14::client::Builder>,
260 ) -> &mut Self {
261 self.client_builder = hyper_builder;
262 self
263 }
264}
265
266struct Adapter<C> {
270 client: timeout_middleware::HttpReadTimeout<
271 hyper_0_14::Client<timeout_middleware::ConnectTimeout<C>, SdkBody>,
272 >,
273}
274
275impl<C> fmt::Debug for Adapter<C> {
276 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
277 f.debug_struct("Adapter")
278 .field("client", &"** hyper client **")
279 .finish()
280 }
281}
282
283fn extract_smithy_connection(capture_conn: &CaptureConnection) -> Option<ConnectionMetadata> {
285 let capture_conn = capture_conn.clone();
286 if let Some(conn) = capture_conn.clone().connection_metadata().as_ref() {
287 let mut extensions = http_02x::Extensions::new();
288 conn.get_extras(&mut extensions);
289 let http_info = extensions.get::<HttpInfo>();
290 let mut builder = ConnectionMetadata::builder()
291 .proxied(conn.is_proxied())
292 .poison_fn(move || match capture_conn.connection_metadata().as_ref() {
293 Some(conn) => conn.poison(),
294 None => tracing::trace!("no connection existed to poison"),
295 });
296
297 builder
298 .set_local_addr(http_info.map(|info| info.local_addr()))
299 .set_remote_addr(http_info.map(|info| info.remote_addr()));
300
301 let smithy_connection = builder.build();
302
303 Some(smithy_connection)
304 } else {
305 None
306 }
307}
308
309impl<C> HttpConnector for Adapter<C>
310where
311 C: Clone + Send + Sync + 'static,
312 C: hyper_0_14::service::Service<http_02x::Uri>,
313 C::Response: Connection + AsyncRead + AsyncWrite + Send + Unpin + 'static,
314 C::Future: Unpin + Send + 'static,
315 C::Error: Into<BoxError>,
316{
317 fn call(&self, request: HttpRequest) -> HttpConnectorFuture {
318 use hyper_0_14::service::Service;
319
320 let mut request = match request.try_into_http02x() {
321 Ok(request) => request,
322 Err(err) => {
323 return HttpConnectorFuture::ready(Err(ConnectorError::other(err.into(), None)));
324 }
325 };
326 let capture_connection = capture_connection(&mut request);
327 if let Some(capture_smithy_connection) =
328 request.extensions().get::<CaptureSmithyConnection>()
329 {
330 capture_smithy_connection
331 .set_connection_retriever(move || extract_smithy_connection(&capture_connection));
332 }
333 let mut client = self.client.clone();
334 let fut = client.call(request);
335 HttpConnectorFuture::new(async move {
336 let response = fut
337 .await
338 .map_err(downcast_error)?
339 .map(SdkBody::from_body_0_4);
340 match HttpResponse::try_from(response) {
341 Ok(response) => Ok(response),
342 Err(err) => Err(ConnectorError::other(err.into(), None)),
343 }
344 })
345 }
346}
347
348fn downcast_error(err: BoxError) -> ConnectorError {
350 if find_source::<TimedOutError>(err.as_ref()).is_some() {
352 return ConnectorError::timeout(err);
353 }
354 let err = match err.downcast::<ConnectorError>() {
356 Ok(connector_error) => return *connector_error,
357 Err(box_error) => box_error,
358 };
359 let err = match err.downcast::<hyper_0_14::Error>() {
362 Ok(hyper_error) => return to_connector_error(*hyper_error),
363 Err(box_error) => box_error,
364 };
365
366 ConnectorError::other(err, None)
368}
369
370fn to_connector_error(err: hyper_0_14::Error) -> ConnectorError {
372 if err.is_timeout() || find_source::<HttpTimeoutError>(&err).is_some() {
373 return ConnectorError::timeout(err.into());
374 }
375 if err.is_user() {
376 return ConnectorError::user(err.into());
377 }
378 if err.is_closed() || err.is_canceled() || find_source::<std::io::Error>(&err).is_some() {
379 return ConnectorError::io(err.into());
380 }
381 if err.is_incomplete_message() {
383 return ConnectorError::other(err.into(), Some(ErrorKind::TransientError));
384 }
385 if let Some(h2_err) = find_source::<h2::Error>(&err) {
386 if h2_err.is_go_away()
387 || (h2_err.is_reset() && h2_err.reason() == Some(Reason::REFUSED_STREAM))
388 {
389 return ConnectorError::io(err.into());
390 }
391 }
392
393 tracing::warn!(err = %DisplayErrorContext(&err), "unrecognized error from Hyper. If this error should be retried, please file an issue.");
394 ConnectorError::other(err.into(), None)
395}
396
397fn find_source<'a, E: Error + 'static>(err: &'a (dyn Error + 'static)) -> Option<&'a E> {
398 let mut next = Some(err);
399 while let Some(err) = next {
400 if let Some(matching_err) = err.downcast_ref::<E>() {
401 return Some(matching_err);
402 }
403 next = err.source();
404 }
405 None
406}
407
408#[derive(Clone, Debug, Eq, PartialEq, Hash)]
409struct CacheKey {
410 connect_timeout: Option<Duration>,
411 read_timeout: Option<Duration>,
412}
413
414impl From<&HttpConnectorSettings> for CacheKey {
415 fn from(value: &HttpConnectorSettings) -> Self {
416 Self {
417 connect_timeout: value.connect_timeout(),
418 read_timeout: value.read_timeout(),
419 }
420 }
421}
422
423struct HyperClient<F> {
424 connector_cache: RwLock<HashMap<CacheKey, SharedHttpConnector>>,
425 client_builder: hyper_0_14::client::Builder,
426 tcp_connector_fn: F,
427}
428
429impl<F> fmt::Debug for HyperClient<F> {
430 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
431 f.debug_struct("HyperClient")
432 .field("connector_cache", &self.connector_cache)
433 .field("client_builder", &self.client_builder)
434 .finish()
435 }
436}
437
438impl<C, F> HttpClient for HyperClient<F>
439where
440 F: Fn() -> C + Send + Sync,
441 C: Clone + Send + Sync + 'static,
442 C: hyper_0_14::service::Service<http_02x::Uri>,
443 C::Response: Connection + AsyncRead + AsyncWrite + Send + Unpin + 'static,
444 C::Future: Unpin + Send + 'static,
445 C::Error: Into<BoxError>,
446{
447 fn http_connector(
448 &self,
449 settings: &HttpConnectorSettings,
450 components: &RuntimeComponents,
451 ) -> SharedHttpConnector {
452 let key = CacheKey::from(settings);
453 let mut connector = self.connector_cache.read().unwrap().get(&key).cloned();
454 if connector.is_none() {
455 let mut cache = self.connector_cache.write().unwrap();
456 if !cache.contains_key(&key) {
458 let mut builder = HyperConnector::builder()
459 .hyper_builder(self.client_builder.clone())
460 .connector_settings(settings.clone());
461 builder.set_sleep_impl(components.sleep_impl());
462
463 let start = components.time_source().map(|ts| ts.now());
464 let tcp_connector = (self.tcp_connector_fn)();
465 let end = components.time_source().map(|ts| ts.now());
466 if let (Some(start), Some(end)) = (start, end) {
467 if let Ok(elapsed) = end.duration_since(start) {
468 tracing::debug!("new TCP connector created in {:?}", elapsed);
469 }
470 }
471 let connector = SharedHttpConnector::new(builder.build(tcp_connector));
472 cache.insert(key.clone(), connector);
473 }
474 connector = cache.get(&key).cloned();
475 }
476
477 connector.expect("cache populated above")
478 }
479
480 fn validate_base_client_config(
481 &self,
482 _: &RuntimeComponentsBuilder,
483 _: &ConfigBag,
484 ) -> Result<(), BoxError> {
485 let _ = (self.tcp_connector_fn)();
491 Ok(())
492 }
493
494 fn connector_metadata(&self) -> Option<ConnectorMetadata> {
495 Some(ConnectorMetadata::new("hyper", Some(Cow::Borrowed("0.x"))))
496 }
497}
498
499#[derive(Clone, Default, Debug)]
553pub struct HyperClientBuilder {
554 client_builder: Option<hyper_0_14::client::Builder>,
555}
556
557impl HyperClientBuilder {
558 pub fn new() -> Self {
560 Self::default()
561 }
562
563 pub fn hyper_builder(mut self, hyper_builder: hyper_0_14::client::Builder) -> Self {
567 self.client_builder = Some(hyper_builder);
568 self
569 }
570
571 pub fn set_hyper_builder(
575 &mut self,
576 hyper_builder: Option<hyper_0_14::client::Builder>,
577 ) -> &mut Self {
578 self.client_builder = hyper_builder;
579 self
580 }
581
582 #[cfg(feature = "legacy-rustls-ring")]
587 pub fn build_https(self) -> SharedHttpClient {
588 self.build_with_fn(default_connector::https)
589 }
590
591 #[cfg_attr(
594 feature = "legacy-rustls-ring",
595 doc = "Use [`build_https`](HyperClientBuilder::build_https) if you don't want to provide a custom TCP connector."
596 )]
597 pub fn build<C>(self, tcp_connector: C) -> SharedHttpClient
598 where
599 C: Clone + Send + Sync + 'static,
600 C: hyper_0_14::service::Service<http_02x::Uri>,
601 C::Response: Connection + AsyncRead + AsyncWrite + Send + Unpin + 'static,
602 C::Future: Unpin + Send + 'static,
603 C::Error: Into<BoxError>,
604 {
605 self.build_with_fn(move || tcp_connector.clone())
606 }
607
608 fn build_with_fn<C, F>(self, tcp_connector_fn: F) -> SharedHttpClient
609 where
610 F: Fn() -> C + Send + Sync + 'static,
611 C: Clone + Send + Sync + 'static,
612 C: hyper_0_14::service::Service<http_02x::Uri>,
613 C::Response: Connection + AsyncRead + AsyncWrite + Send + Unpin + 'static,
614 C::Future: Unpin + Send + 'static,
615 C::Error: Into<BoxError>,
616 {
617 SharedHttpClient::new(HyperClient {
618 connector_cache: RwLock::new(HashMap::new()),
619 client_builder: self.client_builder.unwrap_or_default(),
620 tcp_connector_fn,
621 })
622 }
623}
624
625mod timeout_middleware {
626 use aws_smithy_async::future::timeout::{TimedOutError, Timeout};
627 use aws_smithy_async::rt::sleep::Sleep;
628 use aws_smithy_async::rt::sleep::{AsyncSleep, SharedAsyncSleep};
629 use aws_smithy_runtime_api::box_error::BoxError;
630 use pin_project_lite::pin_project;
631 use std::error::Error;
632 use std::fmt::Formatter;
633 use std::future::Future;
634 use std::pin::Pin;
635 use std::task::{Context, Poll};
636 use std::time::Duration;
637
638 #[derive(Debug)]
639 pub(crate) struct HttpTimeoutError {
640 kind: &'static str,
641 duration: Duration,
642 }
643
644 impl std::fmt::Display for HttpTimeoutError {
645 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
646 write!(
647 f,
648 "{} timeout occurred after {:?}",
649 self.kind, self.duration
650 )
651 }
652 }
653
654 impl Error for HttpTimeoutError {
655 fn source(&self) -> Option<&(dyn Error + 'static)> {
659 Some(&TimedOutError)
660 }
661 }
662
663 #[derive(Clone, Debug)]
668 pub(super) struct ConnectTimeout<I> {
669 inner: I,
670 timeout: Option<(SharedAsyncSleep, Duration)>,
671 }
672
673 impl<I> ConnectTimeout<I> {
674 pub(crate) fn new(inner: I, sleep: SharedAsyncSleep, timeout: Duration) -> Self {
678 Self {
679 inner,
680 timeout: Some((sleep, timeout)),
681 }
682 }
683
684 pub(crate) fn no_timeout(inner: I) -> Self {
685 Self {
686 inner,
687 timeout: None,
688 }
689 }
690 }
691
692 #[derive(Clone, Debug)]
693 pub(crate) struct HttpReadTimeout<I> {
694 inner: I,
695 timeout: Option<(SharedAsyncSleep, Duration)>,
696 }
697
698 impl<I> HttpReadTimeout<I> {
699 pub(crate) fn new(inner: I, sleep: SharedAsyncSleep, timeout: Duration) -> Self {
703 Self {
704 inner,
705 timeout: Some((sleep, timeout)),
706 }
707 }
708
709 pub(crate) fn no_timeout(inner: I) -> Self {
710 Self {
711 inner,
712 timeout: None,
713 }
714 }
715 }
716
717 pin_project! {
718 #[project = MaybeTimeoutFutureProj]
723 pub enum MaybeTimeoutFuture<F> {
724 Timeout {
725 #[pin]
726 timeout: Timeout<F, Sleep>,
727 error_type: &'static str,
728 duration: Duration,
729 },
730 NoTimeout {
731 #[pin]
732 future: F
733 }
734 }
735 }
736
737 impl<F, T, E> Future for MaybeTimeoutFuture<F>
738 where
739 F: Future<Output = Result<T, E>>,
740 E: Into<BoxError>,
741 {
742 type Output = Result<T, BoxError>;
743
744 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
745 let (timeout_future, kind, &mut duration) = match self.project() {
746 MaybeTimeoutFutureProj::NoTimeout { future } => {
747 return future.poll(cx).map_err(|err| err.into());
748 }
749 MaybeTimeoutFutureProj::Timeout {
750 timeout,
751 error_type,
752 duration,
753 } => (timeout, error_type, duration),
754 };
755 match timeout_future.poll(cx) {
756 Poll::Ready(Ok(response)) => Poll::Ready(response.map_err(|err| err.into())),
757 Poll::Ready(Err(_timeout)) => {
758 Poll::Ready(Err(HttpTimeoutError { kind, duration }.into()))
759 }
760 Poll::Pending => Poll::Pending,
761 }
762 }
763 }
764
765 impl<I> hyper_0_14::service::Service<http_02x::Uri> for ConnectTimeout<I>
766 where
767 I: hyper_0_14::service::Service<http_02x::Uri>,
768 I::Error: Into<BoxError>,
769 {
770 type Response = I::Response;
771 type Error = BoxError;
772 type Future = MaybeTimeoutFuture<I::Future>;
773
774 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
775 self.inner.poll_ready(cx).map_err(|err| err.into())
776 }
777
778 fn call(&mut self, req: http_02x::Uri) -> Self::Future {
779 match &self.timeout {
780 Some((sleep, duration)) => {
781 let sleep = sleep.sleep(*duration);
782 MaybeTimeoutFuture::Timeout {
783 timeout: Timeout::new(self.inner.call(req), sleep),
784 error_type: "HTTP connect",
785 duration: *duration,
786 }
787 }
788 None => MaybeTimeoutFuture::NoTimeout {
789 future: self.inner.call(req),
790 },
791 }
792 }
793 }
794
795 impl<I, B> hyper_0_14::service::Service<http_02x::Request<B>> for HttpReadTimeout<I>
796 where
797 I: hyper_0_14::service::Service<http_02x::Request<B>, Error = hyper_0_14::Error>,
798 {
799 type Response = I::Response;
800 type Error = BoxError;
801 type Future = MaybeTimeoutFuture<I::Future>;
802
803 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
804 self.inner.poll_ready(cx).map_err(|err| err.into())
805 }
806
807 fn call(&mut self, req: http_02x::Request<B>) -> Self::Future {
808 match &self.timeout {
809 Some((sleep, duration)) => {
810 let sleep = sleep.sleep(*duration);
811 MaybeTimeoutFuture::Timeout {
812 timeout: Timeout::new(self.inner.call(req), sleep),
813 error_type: "HTTP read",
814 duration: *duration,
815 }
816 }
817 None => MaybeTimeoutFuture::NoTimeout {
818 future: self.inner.call(req),
819 },
820 }
821 }
822 }
823
824 #[cfg(test)]
825 pub(crate) mod test {
826 use crate::hyper_014::HyperConnector;
827 use aws_smithy_async::assert_elapsed;
828 use aws_smithy_async::future::never::Never;
829 use aws_smithy_async::rt::sleep::{SharedAsyncSleep, TokioSleep};
830 use aws_smithy_runtime_api::box_error::BoxError;
831 use aws_smithy_runtime_api::client::http::HttpConnectorSettings;
832 use aws_smithy_runtime_api::client::orchestrator::HttpRequest;
833 use aws_smithy_runtime_api::client::result::ConnectorError;
834 use aws_smithy_types::error::display::DisplayErrorContext;
835 use hyper_0_14::client::connect::{Connected, Connection};
836 use std::future::Future;
837 use std::pin::Pin;
838 use std::task::{Context, Poll};
839 use std::time::Duration;
840 use tokio::io::ReadBuf;
841 use tokio::io::{AsyncRead, AsyncWrite};
842 use tokio::net::TcpStream;
843
844 #[allow(unused)]
845 fn connect_timeout_is_correct<T: Send + Sync + Clone + 'static>() {
846 is_send_sync::<super::ConnectTimeout<T>>();
847 }
848
849 #[allow(unused)]
850 fn is_send_sync<T: Send + Sync>() {}
851
852 #[non_exhaustive]
856 #[derive(Clone, Default, Debug)]
857 pub(crate) struct NeverConnects;
858 impl hyper_0_14::service::Service<http_02x::Uri> for NeverConnects {
859 type Response = TcpStream;
860 type Error = ConnectorError;
861 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
862
863 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
864 Poll::Ready(Ok(()))
865 }
866
867 fn call(&mut self, _uri: http_02x::Uri) -> Self::Future {
868 Box::pin(async move {
869 Never::new().await;
870 unreachable!()
871 })
872 }
873 }
874
875 #[derive(Clone, Debug, Default)]
877 struct NeverReplies;
878 impl hyper_0_14::service::Service<http_02x::Uri> for NeverReplies {
879 type Response = EmptyStream;
880 type Error = BoxError;
881 type Future = std::future::Ready<Result<Self::Response, Self::Error>>;
882
883 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
884 Poll::Ready(Ok(()))
885 }
886
887 fn call(&mut self, _req: http_02x::Uri) -> Self::Future {
888 std::future::ready(Ok(EmptyStream))
889 }
890 }
891
892 #[non_exhaustive]
894 #[derive(Debug, Default)]
895 struct EmptyStream;
896 impl AsyncRead for EmptyStream {
897 fn poll_read(
898 self: Pin<&mut Self>,
899 _cx: &mut Context<'_>,
900 _buf: &mut ReadBuf<'_>,
901 ) -> Poll<std::io::Result<()>> {
902 Poll::Pending
903 }
904 }
905 impl AsyncWrite for EmptyStream {
906 fn poll_write(
907 self: Pin<&mut Self>,
908 _cx: &mut Context<'_>,
909 _buf: &[u8],
910 ) -> Poll<Result<usize, std::io::Error>> {
911 Poll::Pending
912 }
913
914 fn poll_flush(
915 self: Pin<&mut Self>,
916 _cx: &mut Context<'_>,
917 ) -> Poll<Result<(), std::io::Error>> {
918 Poll::Pending
919 }
920
921 fn poll_shutdown(
922 self: Pin<&mut Self>,
923 _cx: &mut Context<'_>,
924 ) -> Poll<Result<(), std::io::Error>> {
925 Poll::Pending
926 }
927 }
928 impl Connection for EmptyStream {
929 fn connected(&self) -> Connected {
930 Connected::new()
931 }
932 }
933
934 #[tokio::test]
935 async fn http_connect_timeout_works() {
936 let tcp_connector = NeverConnects::default();
937 let connector_settings = HttpConnectorSettings::builder()
938 .connect_timeout(Duration::from_secs(1))
939 .build();
940 let hyper = HyperConnector::builder()
941 .connector_settings(connector_settings)
942 .sleep_impl(SharedAsyncSleep::new(TokioSleep::new()))
943 .build(tcp_connector)
944 .adapter;
945 let now = tokio::time::Instant::now();
946 tokio::time::pause();
947 let resp = hyper
948 .call(HttpRequest::get("https://static-uri.com").unwrap())
949 .await
950 .unwrap_err();
951 assert!(
952 resp.is_timeout(),
953 "expected resp.is_timeout() to be true but it was false, resp == {:?}",
954 resp
955 );
956 let message = DisplayErrorContext(&resp).to_string();
957 let expected =
958 "timeout: error trying to connect: HTTP connect timeout occurred after 1s";
959 assert!(
960 message.contains(expected),
961 "expected '{message}' to contain '{expected}'"
962 );
963 assert_elapsed!(now, Duration::from_secs(1));
964 }
965
966 #[tokio::test]
967 async fn http_read_timeout_works() {
968 let tcp_connector = NeverReplies;
969 let connector_settings = HttpConnectorSettings::builder()
970 .connect_timeout(Duration::from_secs(1))
971 .read_timeout(Duration::from_secs(2))
972 .build();
973 let hyper = HyperConnector::builder()
974 .connector_settings(connector_settings)
975 .sleep_impl(SharedAsyncSleep::new(TokioSleep::new()))
976 .build(tcp_connector)
977 .adapter;
978 let now = tokio::time::Instant::now();
979 tokio::time::pause();
980 let err = hyper
981 .call(HttpRequest::get("https://fake-uri.com").unwrap())
982 .await
983 .unwrap_err();
984 assert!(
985 err.is_timeout(),
986 "expected err.is_timeout() to be true but it was false, err == {err:?}",
987 );
988 let message = format!("{}", DisplayErrorContext(&err));
989 let expected = "timeout: HTTP read timeout occurred after 2s";
990 assert!(
991 message.contains(expected),
992 "expected '{message}' to contain '{expected}'"
993 );
994 assert_elapsed!(now, Duration::from_secs(2));
995 }
996 }
997}
998
999#[cfg(test)]
1000mod test {
1001 use crate::hyper_legacy::timeout_middleware::test::NeverConnects;
1002 use crate::hyper_legacy::{HyperClientBuilder, HyperConnector};
1003 use aws_smithy_async::time::SystemTimeSource;
1004 use aws_smithy_runtime_api::box_error::BoxError;
1005 use aws_smithy_runtime_api::client::http::{HttpClient, HttpConnectorSettings};
1006 use aws_smithy_runtime_api::client::orchestrator::HttpRequest;
1007 use aws_smithy_runtime_api::client::runtime_components::RuntimeComponentsBuilder;
1008 use hyper_0_14::client::connect::{Connected, Connection};
1009 use std::io::{Error, ErrorKind};
1010 use std::pin::Pin;
1011 use std::sync::atomic::{AtomicU32, Ordering};
1012 use std::sync::Arc;
1013 use std::task::{Context, Poll};
1014 use std::time::Duration;
1015 use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
1016
1017 #[tokio::test]
1018 async fn connector_selection() {
1019 let creation_count = Arc::new(AtomicU32::new(0));
1021 let http_client = HyperClientBuilder::new().build_with_fn({
1022 let count = creation_count.clone();
1023 move || {
1024 count.fetch_add(1, Ordering::Relaxed);
1025 NeverConnects::default()
1026 }
1027 });
1028
1029 let settings = [
1031 HttpConnectorSettings::builder()
1032 .connect_timeout(Duration::from_secs(3))
1033 .build(),
1034 HttpConnectorSettings::builder()
1035 .read_timeout(Duration::from_secs(3))
1036 .build(),
1037 HttpConnectorSettings::builder()
1038 .connect_timeout(Duration::from_secs(3))
1039 .read_timeout(Duration::from_secs(3))
1040 .build(),
1041 HttpConnectorSettings::builder()
1042 .connect_timeout(Duration::from_secs(5))
1043 .read_timeout(Duration::from_secs(3))
1044 .build(),
1045 ];
1046
1047 let components = RuntimeComponentsBuilder::for_tests()
1049 .with_time_source(Some(SystemTimeSource::new()))
1050 .build()
1051 .unwrap();
1052 let mut handles = Vec::new();
1053 for setting in &settings {
1054 for _ in 0..1000 {
1055 let client = http_client.clone();
1056 handles.push(tokio::spawn({
1057 let setting = setting.clone();
1058 let components = components.clone();
1059 async move {
1060 let _ = client.http_connector(&setting, &components);
1061 }
1062 }));
1063 }
1064 }
1065 for handle in handles {
1066 handle.await.unwrap();
1067 }
1068
1069 assert_eq!(4, creation_count.load(Ordering::Relaxed));
1071 }
1072
1073 #[tokio::test]
1074 async fn hyper_io_error() {
1075 let connector = TestConnection {
1076 inner: HangupStream,
1077 };
1078 let adapter = HyperConnector::builder().build(connector).adapter;
1079 let err = adapter
1080 .call(HttpRequest::get("https://socket-hangup.com").unwrap())
1081 .await
1082 .expect_err("socket hangup");
1083 assert!(err.is_io(), "{:?}", err);
1084 }
1085
1086 #[derive(Clone)]
1088 struct HangupStream;
1089
1090 impl Connection for HangupStream {
1091 fn connected(&self) -> Connected {
1092 Connected::new()
1093 }
1094 }
1095
1096 impl AsyncRead for HangupStream {
1097 fn poll_read(
1098 self: Pin<&mut Self>,
1099 _cx: &mut Context<'_>,
1100 _buf: &mut ReadBuf<'_>,
1101 ) -> Poll<std::io::Result<()>> {
1102 Poll::Ready(Err(Error::new(
1103 ErrorKind::ConnectionReset,
1104 "connection reset",
1105 )))
1106 }
1107 }
1108
1109 impl AsyncWrite for HangupStream {
1110 fn poll_write(
1111 self: Pin<&mut Self>,
1112 _cx: &mut Context<'_>,
1113 _buf: &[u8],
1114 ) -> Poll<Result<usize, Error>> {
1115 Poll::Pending
1116 }
1117
1118 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
1119 Poll::Pending
1120 }
1121
1122 fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
1123 Poll::Pending
1124 }
1125 }
1126
1127 #[derive(Clone)]
1128 struct TestConnection<T> {
1129 inner: T,
1130 }
1131
1132 impl<T> hyper_0_14::service::Service<http_02x::Uri> for TestConnection<T>
1133 where
1134 T: Clone + Connection,
1135 {
1136 type Response = T;
1137 type Error = BoxError;
1138 type Future = std::future::Ready<Result<Self::Response, Self::Error>>;
1139
1140 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1141 Poll::Ready(Ok(()))
1142 }
1143
1144 fn call(&mut self, _req: http_02x::Uri) -> Self::Future {
1145 std::future::ready(Ok(self.inner.clone()))
1146 }
1147 }
1148}