1 - | /*
|
2 - | * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
3 - | * SPDX-License-Identifier: Apache-2.0
|
4 - | */
|
5 - |
|
6 - | use aws_smithy_async::future::timeout::TimedOutError;
|
7 - | use aws_smithy_async::rt::sleep::{default_async_sleep, AsyncSleep, SharedAsyncSleep};
|
8 - | use aws_smithy_runtime::client::http::connection_poisoning::CaptureSmithyConnection;
|
9 - | use aws_smithy_runtime_api::box_error::BoxError;
|
10 - | use aws_smithy_runtime_api::client::connection::ConnectionMetadata;
|
11 - | use aws_smithy_runtime_api::client::connector_metadata::ConnectorMetadata;
|
12 - | use aws_smithy_runtime_api::client::dns::ResolveDns;
|
13 - | use aws_smithy_runtime_api::client::http::{
|
14 - | HttpClient, HttpConnector, HttpConnectorFuture, HttpConnectorSettings, SharedHttpClient,
|
15 - | SharedHttpConnector,
|
16 - | };
|
17 - | use aws_smithy_runtime_api::client::orchestrator::{HttpRequest, HttpResponse};
|
18 - | use aws_smithy_runtime_api::client::result::ConnectorError;
|
19 - | use aws_smithy_runtime_api::client::runtime_components::{
|
20 - | RuntimeComponents, RuntimeComponentsBuilder,
|
21 - | };
|
22 - | use aws_smithy_runtime_api::shared::IntoShared;
|
23 - | use aws_smithy_types::body::SdkBody;
|
24 - | use aws_smithy_types::config_bag::ConfigBag;
|
25 - | use aws_smithy_types::error::display::DisplayErrorContext;
|
26 - | use aws_smithy_types::retry::ErrorKind;
|
27 - | use client::connect::Connection;
|
28 - | use h2::Reason;
|
29 - | use http::{Extensions, Uri};
|
30 - | use hyper::rt::{Read, Write};
|
31 - | use hyper_util::client::legacy as client;
|
32 - | use hyper_util::client::legacy::connect::dns::Name;
|
33 - | use hyper_util::client::legacy::connect::{
|
34 - | capture_connection, CaptureConnection, Connect, HttpInfo,
|
35 - | };
|
36 - | use hyper_util::rt::TokioExecutor;
|
37 - | use rustls::crypto::CryptoProvider;
|
38 - | use std::borrow::Cow;
|
39 - | use std::collections::HashMap;
|
40 - | use std::error::Error;
|
41 - | use std::future::Future;
|
42 - | use std::net::SocketAddr;
|
43 - | use std::pin::Pin;
|
44 - | use std::sync::RwLock;
|
45 - | use std::task::{Context, Poll};
|
46 - | use std::time::Duration;
|
47 - | use std::{fmt, vec};
|
48 - |
|
49 - | #[derive(Debug, Eq, PartialEq, Clone, Copy)]
|
50 - | #[non_exhaustive]
|
51 - | pub enum CryptoMode {
|
52 - | #[cfg(feature = "crypto-ring")]
|
53 - | Ring,
|
54 - | #[cfg(feature = "crypto-aws-lc")]
|
55 - | AwsLc,
|
56 - | #[cfg(feature = "crypto-aws-lc-fips")]
|
57 - | AwsLcFips,
|
58 - | }
|
59 - |
|
60 - | impl CryptoMode {
|
61 - | fn provider(self) -> CryptoProvider {
|
62 - | match self {
|
63 - | #[cfg(feature = "crypto-aws-lc")]
|
64 - | CryptoMode::AwsLc => rustls::crypto::aws_lc_rs::default_provider(),
|
65 - |
|
66 - | #[cfg(feature = "crypto-ring")]
|
67 - | CryptoMode::Ring => rustls::crypto::ring::default_provider(),
|
68 - |
|
69 - | #[cfg(feature = "crypto-aws-lc-fips")]
|
70 - | CryptoMode::AwsLcFips => {
|
71 - | let provider = rustls::crypto::default_fips_provider();
|
72 - | assert!(
|
73 - | provider.fips(),
|
74 - | "FIPS was requested but the provider did not support FIPS"
|
75 - | );
|
76 - | provider
|
77 - | }
|
78 - | }
|
79 - | }
|
80 - | }
|
81 - |
|
82 - | /// A bridge that allows our `ResolveDns` trait to work with Hyper's `Resolver` interface (based on tower)
|
83 - | #[derive(Clone)]
|
84 - | struct HyperUtilResolver<R> {
|
85 - | resolver: R,
|
86 - | }
|
87 - |
|
88 - | impl<R: ResolveDns + Clone + 'static> tower::Service<Name> for HyperUtilResolver<R> {
|
89 - | type Response = vec::IntoIter<SocketAddr>;
|
90 - | type Error = Box<dyn Error + Send + Sync>;
|
91 - | type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
|
92 - |
|
93 - | fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
94 - | Poll::Ready(Ok(()))
|
95 - | }
|
96 - |
|
97 - | fn call(&mut self, req: Name) -> Self::Future {
|
98 - | let resolver = self.resolver.clone();
|
99 - | Box::pin(async move {
|
100 - | let dns_entries = resolver.resolve_dns(req.as_str()).await?;
|
101 - | Ok(dns_entries
|
102 - | .into_iter()
|
103 - | .map(|ip_addr| SocketAddr::new(ip_addr, 0))
|
104 - | .collect::<Vec<_>>()
|
105 - | .into_iter())
|
106 - | })
|
107 - | }
|
108 - | }
|
109 - |
|
110 - | #[allow(unused_imports)]
|
111 - | mod cached_connectors {
|
112 - | use client::connect::HttpConnector;
|
113 - | use hyper_util::client::legacy as client;
|
114 - | use hyper_util::client::legacy::connect::dns::GaiResolver;
|
115 - |
|
116 - | use crate::hyper_1_0::build_connector::make_tls;
|
117 - | use crate::hyper_1_0::{CryptoMode, Inner};
|
118 - |
|
119 - | #[cfg(feature = "crypto-ring")]
|
120 - | pub(crate) static HTTPS_NATIVE_ROOTS_RING: once_cell::sync::Lazy<
|
121 - | hyper_rustls::HttpsConnector<HttpConnector>,
|
122 - | > = once_cell::sync::Lazy::new(|| make_tls(GaiResolver::new(), CryptoMode::Ring.provider()));
|
123 - |
|
124 - | #[cfg(feature = "crypto-aws-lc")]
|
125 - | pub(crate) static HTTPS_NATIVE_ROOTS_AWS_LC: once_cell::sync::Lazy<
|
126 - | hyper_rustls::HttpsConnector<HttpConnector>,
|
127 - | > = once_cell::sync::Lazy::new(|| make_tls(GaiResolver::new(), CryptoMode::AwsLc.provider()));
|
128 - |
|
129 - | #[cfg(feature = "crypto-aws-lc-fips")]
|
130 - | pub(crate) static HTTPS_NATIVE_ROOTS_AWS_LC_FIPS: once_cell::sync::Lazy<
|
131 - | hyper_rustls::HttpsConnector<HttpConnector>,
|
132 - | > = once_cell::sync::Lazy::new(|| {
|
133 - | make_tls(GaiResolver::new(), CryptoMode::AwsLcFips.provider())
|
134 - | });
|
135 - |
|
136 - | pub(super) fn cached_https(mode: Inner) -> hyper_rustls::HttpsConnector<HttpConnector> {
|
137 - | match mode {
|
138 - | #[cfg(feature = "crypto-ring")]
|
139 - | Inner::Standard(CryptoMode::Ring) => HTTPS_NATIVE_ROOTS_RING.clone(),
|
140 - | #[cfg(feature = "crypto-aws-lc")]
|
141 - | Inner::Standard(CryptoMode::AwsLc) => HTTPS_NATIVE_ROOTS_AWS_LC.clone(),
|
142 - | #[cfg(feature = "crypto-aws-lc-fips")]
|
143 - | Inner::Standard(CryptoMode::AwsLcFips) => HTTPS_NATIVE_ROOTS_AWS_LC_FIPS.clone(),
|
144 - | #[allow(unreachable_patterns)]
|
145 - | Inner::Standard(_) => unreachable!("unexpected mode"),
|
146 - | Inner::Custom(provider) => make_tls(GaiResolver::new(), provider),
|
147 - | }
|
148 - | }
|
149 - | }
|
150 - |
|
151 - | mod build_connector {
|
152 - | use crate::hyper_1_0::{HyperUtilResolver, Inner};
|
153 - | use aws_smithy_runtime_api::client::dns::ResolveDns;
|
154 - | use client::connect::HttpConnector;
|
155 - | use hyper_util::client::legacy as client;
|
156 - | use rustls::crypto::CryptoProvider;
|
157 - | use std::sync::Arc;
|
158 - |
|
159 - | fn restrict_ciphers(base: CryptoProvider) -> CryptoProvider {
|
160 - | let suites = &[
|
161 - | rustls::CipherSuite::TLS13_AES_256_GCM_SHA384,
|
162 - | rustls::CipherSuite::TLS13_AES_128_GCM_SHA256,
|
163 - | // TLS1.2 suites
|
164 - | rustls::CipherSuite::TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
|
165 - | rustls::CipherSuite::TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
|
166 - | rustls::CipherSuite::TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
|
167 - | rustls::CipherSuite::TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
|
168 - | rustls::CipherSuite::TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_SHA256,
|
169 - | ];
|
170 - | let supported_suites = suites
|
171 - | .iter()
|
172 - | .flat_map(|suite| {
|
173 - | base.cipher_suites
|
174 - | .iter()
|
175 - | .find(|s| &s.suite() == suite)
|
176 - | .cloned()
|
177 - | })
|
178 - | .collect::<Vec<_>>();
|
179 - | CryptoProvider {
|
180 - | cipher_suites: supported_suites,
|
181 - | ..base
|
182 - | }
|
183 - | }
|
184 - |
|
185 - | pub(crate) fn make_tls<R>(
|
186 - | resolver: R,
|
187 - | crypto_provider: CryptoProvider,
|
188 - | ) -> hyper_rustls::HttpsConnector<HttpConnector<R>> {
|
189 - | use hyper_rustls::ConfigBuilderExt;
|
190 - | let mut base_connector = HttpConnector::new_with_resolver(resolver);
|
191 - | base_connector.enforce_http(false);
|
192 - | hyper_rustls::HttpsConnectorBuilder::new()
|
193 - | .with_tls_config(
|
194 - | rustls::ClientConfig::builder_with_provider(Arc::new(restrict_ciphers(crypto_provider)))
|
195 - | .with_safe_default_protocol_versions()
|
196 - | .expect("Error with the TLS configuration. Please file a bug report under https://github.com/smithy-lang/smithy-rs/issues.")
|
197 - | .with_native_roots().expect("error with TLS configuration.")
|
198 - | .with_no_client_auth()
|
199 - | )
|
200 - | .https_or_http()
|
201 - | .enable_http1()
|
202 - | .enable_http2()
|
203 - | .wrap_connector(base_connector)
|
204 - | }
|
205 - |
|
206 - | pub(super) fn https_with_resolver<R: ResolveDns>(
|
207 - | crypto_provider: Inner,
|
208 - | resolver: R,
|
209 - | ) -> hyper_rustls::HttpsConnector<HttpConnector<HyperUtilResolver<R>>> {
|
210 - | make_tls(HyperUtilResolver { resolver }, crypto_provider.provider())
|
211 - | }
|
212 - | }
|
213 - |
|
214 - | /// [`HttpConnector`] that uses [`hyper`] to make HTTP requests.
|
215 - | ///
|
216 - | /// This connector also implements socket connect and read timeouts.
|
217 - | ///
|
218 - | /// This shouldn't be used directly in most cases.
|
219 - | /// See the docs on [`HyperClientBuilder`] for examples of how
|
220 - | /// to customize the Hyper client.
|
221 - | #[derive(Debug)]
|
222 - | pub struct HyperConnector {
|
223 - | adapter: Box<dyn HttpConnector>,
|
224 - | }
|
225 - |
|
226 - | impl HyperConnector {
|
227 - | /// Builder for a Hyper connector.
|
228 - | pub fn builder() -> HyperConnectorBuilder {
|
229 - | Default::default()
|
230 - | }
|
231 - | }
|
232 - |
|
233 - | impl HttpConnector for HyperConnector {
|
234 - | fn call(&self, request: HttpRequest) -> HttpConnectorFuture {
|
235 - | self.adapter.call(request)
|
236 - | }
|
237 - | }
|
238 - |
|
239 - | /// Builder for [`HyperConnector`].
|
240 - | #[derive(Default, Debug)]
|
241 - | pub struct HyperConnectorBuilder<Crypto = CryptoUnset> {
|
242 - | connector_settings: Option<HttpConnectorSettings>,
|
243 - | sleep_impl: Option<SharedAsyncSleep>,
|
244 - | client_builder: Option<hyper_util::client::legacy::Builder>,
|
245 - | #[allow(unused)]
|
246 - | crypto: Crypto,
|
247 - | }
|
248 - |
|
249 - | #[derive(Default)]
|
250 - | #[non_exhaustive]
|
251 - | pub struct CryptoUnset {}
|
252 - |
|
253 - | pub struct CryptoProviderSelected {
|
254 - | crypto_provider: Inner,
|
255 - | }
|
256 - |
|
257 - | #[derive(Clone)]
|
258 - | enum Inner {
|
259 - | Standard(CryptoMode),
|
260 - | #[allow(dead_code)]
|
261 - | Custom(CryptoProvider),
|
262 - | }
|
263 - |
|
264 - | impl Inner {
|
265 - | fn provider(&self) -> CryptoProvider {
|
266 - | match self {
|
267 - | Inner::Standard(mode) => mode.provider(),
|
268 - | Inner::Custom(provider) => provider.clone(),
|
269 - | }
|
270 - | }
|
271 - | }
|
272 - |
|
273 - | #[cfg(any(feature = "crypto-aws-lc", feature = "crypto-ring"))]
|
274 - | impl HyperConnectorBuilder<CryptoProviderSelected> {
|
275 - | pub fn build_from_resolver<R: ResolveDns + Clone + 'static>(
|
276 - | self,
|
277 - | resolver: R,
|
278 - | ) -> HyperConnector {
|
279 - | let connector =
|
280 - | build_connector::https_with_resolver(self.crypto.crypto_provider.clone(), resolver);
|
281 - | self.build(connector)
|
282 - | }
|
283 - | }
|
284 - |
|
285 - | impl<Any> HyperConnectorBuilder<Any> {
|
286 - | /// Create a [`HyperConnector`] from this builder and a given connector.
|
287 - | pub(crate) fn build<C>(self, tcp_connector: C) -> HyperConnector
|
288 - | where
|
289 - | C: Send + Sync + 'static,
|
290 - | C: Clone,
|
291 - | C: tower::Service<Uri>,
|
292 - | C::Response: Read + Write + Connection + Send + Sync + Unpin,
|
293 - | C: Connect,
|
294 - | C::Future: Unpin + Send + 'static,
|
295 - | C::Error: Into<BoxError>,
|
296 - | {
|
297 - | let client_builder =
|
298 - | self.client_builder
|
299 - | .unwrap_or(hyper_util::client::legacy::Builder::new(
|
300 - | TokioExecutor::new(),
|
301 - | ));
|
302 - | let sleep_impl = self.sleep_impl.or_else(default_async_sleep);
|
303 - | let (connect_timeout, read_timeout) = self
|
304 - | .connector_settings
|
305 - | .map(|c| (c.connect_timeout(), c.read_timeout()))
|
306 - | .unwrap_or((None, None));
|
307 - |
|
308 - | let connector = match connect_timeout {
|
309 - | Some(duration) => timeout_middleware::ConnectTimeout::new(
|
310 - | tcp_connector,
|
311 - | sleep_impl
|
312 - | .clone()
|
313 - | .expect("a sleep impl must be provided in order to have a connect timeout"),
|
314 - | duration,
|
315 - | ),
|
316 - | None => timeout_middleware::ConnectTimeout::no_timeout(tcp_connector),
|
317 - | };
|
318 - | let base = client_builder.build(connector);
|
319 - | let read_timeout = match read_timeout {
|
320 - | Some(duration) => timeout_middleware::HttpReadTimeout::new(
|
321 - | base,
|
322 - | sleep_impl.expect("a sleep impl must be provided in order to have a read timeout"),
|
323 - | duration,
|
324 - | ),
|
325 - | None => timeout_middleware::HttpReadTimeout::no_timeout(base),
|
326 - | };
|
327 - | HyperConnector {
|
328 - | adapter: Box::new(Adapter {
|
329 - | client: read_timeout,
|
330 - | }),
|
331 - | }
|
332 - | }
|
333 - |
|
334 - | /// Set the async sleep implementation used for timeouts
|
335 - | ///
|
336 - | /// Calling this is only necessary for testing or to use something other than
|
337 - | /// [`default_async_sleep`].
|
338 - | pub fn sleep_impl(mut self, sleep_impl: impl AsyncSleep + 'static) -> Self {
|
339 - | self.sleep_impl = Some(sleep_impl.into_shared());
|
340 - | self
|
341 - | }
|
342 - |
|
343 - | /// Set the async sleep implementation used for timeouts
|
344 - | ///
|
345 - | /// Calling this is only necessary for testing or to use something other than
|
346 - | /// [`default_async_sleep`].
|
347 - | pub fn set_sleep_impl(&mut self, sleep_impl: Option<SharedAsyncSleep>) -> &mut Self {
|
348 - | self.sleep_impl = sleep_impl;
|
349 - | self
|
350 - | }
|
351 - |
|
352 - | /// Configure the HTTP settings for the `HyperAdapter`
|
353 - | pub fn connector_settings(mut self, connector_settings: HttpConnectorSettings) -> Self {
|
354 - | self.connector_settings = Some(connector_settings);
|
355 - | self
|
356 - | }
|
357 - |
|
358 - | /// Configure the HTTP settings for the `HyperAdapter`
|
359 - | pub fn set_connector_settings(
|
360 - | &mut self,
|
361 - | connector_settings: Option<HttpConnectorSettings>,
|
362 - | ) -> &mut Self {
|
363 - | self.connector_settings = connector_settings;
|
364 - | self
|
365 - | }
|
366 - |
|
367 - | /// Override the Hyper client [`Builder`](hyper_util::client::legacy::Builder) used to construct this client.
|
368 - | ///
|
369 - | /// This enables changing settings like forcing HTTP2 and modifying other default client behavior.
|
370 - | pub(crate) fn hyper_builder(
|
371 - | mut self,
|
372 - | hyper_builder: hyper_util::client::legacy::Builder,
|
373 - | ) -> Self {
|
374 - | self.set_hyper_builder(Some(hyper_builder));
|
375 - | self
|
376 - | }
|
377 - |
|
378 - | /// Override the Hyper client [`Builder`](hyper_util::client::legacy::Builder) used to construct this client.
|
379 - | ///
|
380 - | /// This enables changing settings like forcing HTTP2 and modifying other default client behavior.
|
381 - | pub(crate) fn set_hyper_builder(
|
382 - | &mut self,
|
383 - | hyper_builder: Option<hyper_util::client::legacy::Builder>,
|
384 - | ) -> &mut Self {
|
385 - | self.client_builder = hyper_builder;
|
386 - | self
|
387 - | }
|
388 - | }
|
389 - |
|
390 - | /// Adapter to use a Hyper 1.0-based Client as an `HttpConnector`
|
391 - | ///
|
392 - | /// This adapter also enables TCP `CONNECT` and HTTP `READ` timeouts via [`HyperConnector::builder`].
|
393 - | struct Adapter<C> {
|
394 - | client: timeout_middleware::HttpReadTimeout<
|
395 - | hyper_util::client::legacy::Client<timeout_middleware::ConnectTimeout<C>, SdkBody>,
|
396 - | >,
|
397 - | }
|
398 - |
|
399 - | impl<C> fmt::Debug for Adapter<C> {
|
400 - | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
401 - | f.debug_struct("Adapter")
|
402 - | .field("client", &"** hyper client **")
|
403 - | .finish()
|
404 - | }
|
405 - | }
|
406 - |
|
407 - | /// Extract a smithy connection from a hyper CaptureConnection
|
408 - | fn extract_smithy_connection(capture_conn: &CaptureConnection) -> Option<ConnectionMetadata> {
|
409 - | let capture_conn = capture_conn.clone();
|
410 - | if let Some(conn) = capture_conn.clone().connection_metadata().as_ref() {
|
411 - | let mut extensions = Extensions::new();
|
412 - | conn.get_extras(&mut extensions);
|
413 - | let http_info = extensions.get::<HttpInfo>();
|
414 - | let mut builder = ConnectionMetadata::builder()
|
415 - | .proxied(conn.is_proxied())
|
416 - | .poison_fn(move || match capture_conn.connection_metadata().as_ref() {
|
417 - | Some(conn) => conn.poison(),
|
418 - | None => tracing::trace!("no connection existed to poison"),
|
419 - | });
|
420 - |
|
421 - | builder
|
422 - | .set_local_addr(http_info.map(|info| info.local_addr()))
|
423 - | .set_remote_addr(http_info.map(|info| info.remote_addr()));
|
424 - |
|
425 - | let smithy_connection = builder.build();
|
426 - |
|
427 - | Some(smithy_connection)
|
428 - | } else {
|
429 - | None
|
430 - | }
|
431 - | }
|
432 - |
|
433 - | impl<C> HttpConnector for Adapter<C>
|
434 - | where
|
435 - | C: Clone + Send + Sync + 'static,
|
436 - | C: tower::Service<Uri>,
|
437 - | C::Response: Connection + Read + Write + Unpin + 'static,
|
438 - | timeout_middleware::ConnectTimeout<C>: Connect,
|
439 - | C::Future: Unpin + Send + 'static,
|
440 - | C::Error: Into<BoxError>,
|
441 - | {
|
442 - | fn call(&self, request: HttpRequest) -> HttpConnectorFuture {
|
443 - | let mut request = match request.try_into_http1x() {
|
444 - | Ok(request) => request,
|
445 - | Err(err) => {
|
446 - | return HttpConnectorFuture::ready(Err(ConnectorError::user(err.into())));
|
447 - | }
|
448 - | };
|
449 - | let capture_connection = capture_connection(&mut request);
|
450 - | if let Some(capture_smithy_connection) =
|
451 - | request.extensions().get::<CaptureSmithyConnection>()
|
452 - | {
|
453 - | capture_smithy_connection
|
454 - | .set_connection_retriever(move || extract_smithy_connection(&capture_connection));
|
455 - | }
|
456 - | let mut client = self.client.clone();
|
457 - | use tower::Service;
|
458 - | let fut = client.call(request);
|
459 - | HttpConnectorFuture::new(async move {
|
460 - | let response = fut
|
461 - | .await
|
462 - | .map_err(downcast_error)?
|
463 - | .map(SdkBody::from_body_1_x);
|
464 - | match HttpResponse::try_from(response) {
|
465 - | Ok(response) => Ok(response),
|
466 - | Err(err) => Err(ConnectorError::other(err.into(), None)),
|
467 - | }
|
468 - | })
|
469 - | }
|
470 - | }
|
471 - |
|
472 - | /// Downcast errors coming out of hyper into an appropriate `ConnectorError`
|
473 - | fn downcast_error(err: BoxError) -> ConnectorError {
|
474 - | // is a `TimedOutError` (from aws_smithy_async::timeout) in the chain? if it is, this is a timeout
|
475 - | if find_source::<TimedOutError>(err.as_ref()).is_some() {
|
476 - | return ConnectorError::timeout(err);
|
477 - | }
|
478 - | // is the top of chain error actually already a `ConnectorError`? return that directly
|
479 - | let err = match err.downcast::<ConnectorError>() {
|
480 - | Ok(connector_error) => return *connector_error,
|
481 - | Err(box_error) => box_error,
|
482 - | };
|
483 - | // generally, the top of chain will probably be a hyper error. Go through a set of hyper specific
|
484 - | // error classifications
|
485 - | let err = match find_source::<hyper::Error>(err.as_ref()) {
|
486 - | Some(hyper_error) => return to_connector_error(hyper_error)(err),
|
487 - | None => err,
|
488 - | };
|
489 - |
|
490 - | // otherwise, we have no idea!
|
491 - | ConnectorError::other(err, None)
|
492 - | }
|
493 - |
|
494 - | /// Convert a [`hyper::Error`] into a [`ConnectorError`]
|
495 - | fn to_connector_error(err: &hyper::Error) -> fn(BoxError) -> ConnectorError {
|
496 - | if err.is_timeout() || find_source::<timeout_middleware::HttpTimeoutError>(err).is_some() {
|
497 - | return ConnectorError::timeout;
|
498 - | }
|
499 - | if err.is_user() {
|
500 - | return ConnectorError::user;
|
501 - | }
|
502 - | if err.is_closed() || err.is_canceled() || find_source::<std::io::Error>(err).is_some() {
|
503 - | return ConnectorError::io;
|
504 - | }
|
505 - | // We sometimes receive this from S3: hyper::Error(IncompleteMessage)
|
506 - | if err.is_incomplete_message() {
|
507 - | return |err: BoxError| ConnectorError::other(err, Some(ErrorKind::TransientError));
|
508 - | }
|
509 - |
|
510 - | if let Some(h2_err) = find_source::<h2::Error>(err) {
|
511 - | if h2_err.is_go_away()
|
512 - | || (h2_err.is_reset() && h2_err.reason() == Some(Reason::REFUSED_STREAM))
|
513 - | {
|
514 - | return ConnectorError::io;
|
515 - | }
|
516 - | }
|
517 - |
|
518 - | tracing::warn!(err = %DisplayErrorContext(&err), "unrecognized error from Hyper. If this error should be retried, please file an issue.");
|
519 - | |err: BoxError| ConnectorError::other(err, None)
|
520 - | }
|
521 - |
|
522 - | fn find_source<'a, E: Error + 'static>(err: &'a (dyn Error + 'static)) -> Option<&'a E> {
|
523 - | let mut next = Some(err);
|
524 - | while let Some(err) = next {
|
525 - | if let Some(matching_err) = err.downcast_ref::<E>() {
|
526 - | return Some(matching_err);
|
527 - | }
|
528 - | next = err.source();
|
529 - | }
|
530 - | None
|
531 - | }
|
532 - |
|
533 - | // TODO(https://github.com/awslabs/aws-sdk-rust/issues/1090): CacheKey must also include ptr equality to any
|
534 - | // runtime components that are used—sleep_impl as a base (unless we prohibit overriding sleep impl)
|
535 - | // If we decide to put a DnsResolver in RuntimeComponents, then we'll need to handle that as well.
|
536 - | #[derive(Clone, Debug, Eq, PartialEq, Hash)]
|
537 - | struct CacheKey {
|
538 - | connect_timeout: Option<Duration>,
|
539 - | read_timeout: Option<Duration>,
|
540 - | }
|
541 - |
|
542 - | impl From<&HttpConnectorSettings> for CacheKey {
|
543 - | fn from(value: &HttpConnectorSettings) -> Self {
|
544 - | Self {
|
545 - | connect_timeout: value.connect_timeout(),
|
546 - | read_timeout: value.read_timeout(),
|
547 - | }
|
548 - | }
|
549 - | }
|
550 - |
|
551 - | struct HyperClient<F> {
|
552 - | connector_cache: RwLock<HashMap<CacheKey, SharedHttpConnector>>,
|
553 - | client_builder: hyper_util::client::legacy::Builder,
|
554 - | tcp_connector_fn: F,
|
555 - | }
|
556 - |
|
557 - | impl<F> fmt::Debug for HyperClient<F> {
|
558 - | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
559 - | f.debug_struct("HyperClient")
|
560 - | .field("connector_cache", &self.connector_cache)
|
561 - | .field("client_builder", &self.client_builder)
|
562 - | .finish()
|
563 - | }
|
564 - | }
|
565 - |
|
566 - | impl<C, F> HttpClient for HyperClient<F>
|
567 - | where
|
568 - | F: Fn() -> C + Send + Sync,
|
569 - | C: Clone + Send + Sync + 'static,
|
570 - | C: tower::Service<Uri>,
|
571 - | C::Response: Connection + Read + Write + Send + Sync + Unpin + 'static,
|
572 - | C::Future: Unpin + Send + 'static,
|
573 - | C::Error: Into<BoxError>,
|
574 - | {
|
575 - | fn http_connector(
|
576 - | &self,
|
577 - | settings: &HttpConnectorSettings,
|
578 - | components: &RuntimeComponents,
|
579 - | ) -> SharedHttpConnector {
|
580 - | let key = CacheKey::from(settings);
|
581 - | let mut connector = self.connector_cache.read().unwrap().get(&key).cloned();
|
582 - | if connector.is_none() {
|
583 - | let mut cache = self.connector_cache.write().unwrap();
|
584 - | // Short-circuit if another thread already wrote a connector to the cache for this key
|
585 - | if !cache.contains_key(&key) {
|
586 - | let mut builder = HyperConnector::builder()
|
587 - | .hyper_builder(self.client_builder.clone())
|
588 - | .connector_settings(settings.clone());
|
589 - | builder.set_sleep_impl(components.sleep_impl());
|
590 - |
|
591 - | let start = components.time_source().map(|ts| ts.now());
|
592 - | let tcp_connector = (self.tcp_connector_fn)();
|
593 - | let end = components.time_source().map(|ts| ts.now());
|
594 - | if let (Some(start), Some(end)) = (start, end) {
|
595 - | if let Ok(elapsed) = end.duration_since(start) {
|
596 - | tracing::debug!("new TCP connector created in {:?}", elapsed);
|
597 - | }
|
598 - | }
|
599 - | let connector = SharedHttpConnector::new(builder.build(tcp_connector));
|
600 - | cache.insert(key.clone(), connector);
|
601 - | }
|
602 - | connector = cache.get(&key).cloned();
|
603 - | }
|
604 - |
|
605 - | connector.expect("cache populated above")
|
606 - | }
|
607 - |
|
608 - | fn validate_base_client_config(
|
609 - | &self,
|
610 - | _: &RuntimeComponentsBuilder,
|
611 - | _: &ConfigBag,
|
612 - | ) -> Result<(), BoxError> {
|
613 - | // Initialize the TCP connector at this point so that native certs load
|
614 - | // at client initialization time instead of upon first request. We do it
|
615 - | // here rather than at construction so that it won't run if this is not
|
616 - | // the selected HTTP client for the base config (for example, if this was
|
617 - | // the default HTTP client, and it was overridden by a later plugin).
|
618 - | let _ = (self.tcp_connector_fn)();
|
619 - | Ok(())
|
620 - | }
|
621 - |
|
622 - | fn connector_metadata(&self) -> Option<ConnectorMetadata> {
|
623 - | Some(ConnectorMetadata::new("hyper", Some(Cow::Borrowed("1.x"))))
|
624 - | }
|
625 - | }
|
626 - |
|
627 - | /// Builder for a hyper-backed [`HttpClient`] implementation.
|
628 - | ///
|
629 - | /// This builder can be used to customize the underlying TCP connector used, as well as
|
630 - | /// hyper client configuration.
|
631 - | ///
|
632 - | /// # Examples
|
633 - | ///
|
634 - | /// Construct a Hyper client with the RusTLS TLS implementation.
|
635 - | /// This can be useful when you want to share a Hyper connector between multiple
|
636 - | /// generated Smithy clients.
|
637 - | #[derive(Clone, Default, Debug)]
|
638 - | pub struct HyperClientBuilder<Crypto = CryptoUnset> {
|
639 - | client_builder: Option<hyper_util::client::legacy::Builder>,
|
640 - | crypto_provider: Crypto,
|
641 - | }
|
642 - |
|
643 - | impl HyperClientBuilder<CryptoProviderSelected> {
|
644 - | /// Create a hyper client using RusTLS for TLS
|
645 - | ///
|
646 - | /// The trusted certificates will be loaded later when this becomes the selected
|
647 - | /// HTTP client for a Smithy client.
|
648 - | pub fn build_https(self) -> SharedHttpClient {
|
649 - | let crypto = self.crypto_provider.crypto_provider;
|
650 - | build_with_fn(self.client_builder, move || {
|
651 - | cached_connectors::cached_https(crypto.clone())
|
652 - | })
|
653 - | }
|
654 - |
|
655 - | /// Create a hyper client using a custom DNS resolver
|
656 - | pub fn build_with_resolver(
|
657 - | self,
|
658 - | resolver: impl ResolveDns + Clone + 'static,
|
659 - | ) -> SharedHttpClient {
|
660 - | build_with_fn(self.client_builder, move || {
|
661 - | build_connector::https_with_resolver(
|
662 - | self.crypto_provider.crypto_provider.clone(),
|
663 - | resolver.clone(),
|
664 - | )
|
665 - | })
|
666 - | }
|
667 - | }
|
668 - |
|
669 - | impl HyperClientBuilder<CryptoUnset> {
|
670 - | /// Creates a new builder.
|
671 - | pub fn new() -> Self {
|
672 - | Self::default()
|
673 - | }
|
674 - |
|
675 - | pub fn crypto_mode(self, provider: CryptoMode) -> HyperClientBuilder<CryptoProviderSelected> {
|
676 - | HyperClientBuilder {
|
677 - | client_builder: self.client_builder,
|
678 - | crypto_provider: CryptoProviderSelected {
|
679 - | crypto_provider: Inner::Standard(provider),
|
680 - | },
|
681 - | }
|
682 - | }
|
683 - |
|
684 - | /// This interface will be broken in the future
|
685 - | ///
|
686 - | /// This exposes `CryptoProvider` from `rustls` directly and this API has no stability guarantee.
|
687 - | #[cfg(crypto_unstable)]
|
688 - | pub fn crypto_provider_unstable(
|
689 - | self,
|
690 - | provider: CryptoProvider,
|
691 - | ) -> HyperClientBuilder<CryptoProviderSelected> {
|
692 - | HyperClientBuilder {
|
693 - | client_builder: self.client_builder,
|
694 - | crypto_provider: CryptoProviderSelected {
|
695 - | crypto_provider: Inner::Custom(provider),
|
696 - | },
|
697 - | }
|
698 - | }
|
699 - | }
|
700 - |
|
701 - | fn build_with_fn<C, F>(
|
702 - | client_builder: Option<hyper_util::client::legacy::Builder>,
|
703 - | tcp_connector_fn: F,
|
704 - | ) -> SharedHttpClient
|
705 - | where
|
706 - | F: Fn() -> C + Send + Sync + 'static,
|
707 - | C: Clone + Send + Sync + 'static,
|
708 - | C: tower::Service<Uri>,
|
709 - | C::Response: Connection + Read + Write + Send + Sync + Unpin + 'static,
|
710 - | C::Future: Unpin + Send + 'static,
|
711 - | C::Error: Into<BoxError>,
|
712 - | C: Connect,
|
713 - | {
|
714 - | SharedHttpClient::new(HyperClient {
|
715 - | connector_cache: RwLock::new(HashMap::new()),
|
716 - | client_builder: client_builder
|
717 - | .unwrap_or_else(|| hyper_util::client::legacy::Builder::new(TokioExecutor::new())),
|
718 - | tcp_connector_fn,
|
719 - | })
|
720 - | }
|
721 - |
|
722 - | mod timeout_middleware {
|
723 - | use std::error::Error;
|
724 - | use std::fmt::Formatter;
|
725 - | use std::future::Future;
|
726 - | use std::pin::Pin;
|
727 - | use std::task::{Context, Poll};
|
728 - | use std::time::Duration;
|
729 - |
|
730 - | use http::Uri;
|
731 - | use pin_project_lite::pin_project;
|
732 - |
|
733 - | use aws_smithy_async::future::timeout::{TimedOutError, Timeout};
|
734 - | use aws_smithy_async::rt::sleep::Sleep;
|
735 - | use aws_smithy_async::rt::sleep::{AsyncSleep, SharedAsyncSleep};
|
736 - | use aws_smithy_runtime_api::box_error::BoxError;
|
737 - |
|
738 - | #[derive(Debug)]
|
739 - | pub(crate) struct HttpTimeoutError {
|
740 - | kind: &'static str,
|
741 - | duration: Duration,
|
742 - | }
|
743 - |
|
744 - | impl std::fmt::Display for HttpTimeoutError {
|
745 - | fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
746 - | write!(
|
747 - | f,
|
748 - | "{} timeout occurred after {:?}",
|
749 - | self.kind, self.duration
|
750 - | )
|
751 - | }
|
752 - | }
|
753 - |
|
754 - | impl Error for HttpTimeoutError {
|
755 - | // We implement the `source` function as returning a `TimedOutError` because when `downcast_error`
|
756 - | // or `find_source` is called with an `HttpTimeoutError` (or another error wrapping an `HttpTimeoutError`)
|
757 - | // this method will be checked to determine if it's a timeout-related error.
|
758 - | fn source(&self) -> Option<&(dyn Error + 'static)> {
|
759 - | Some(&TimedOutError)
|
760 - | }
|
761 - | }
|
762 - |
|
763 - | /// Timeout wrapper that will timeout on the initial TCP connection
|
764 - | ///
|
765 - | /// # Stability
|
766 - | /// This interface is unstable.
|
767 - | #[derive(Clone, Debug)]
|
768 - | pub(super) struct ConnectTimeout<I> {
|
769 - | inner: I,
|
770 - | timeout: Option<(SharedAsyncSleep, Duration)>,
|
771 - | }
|
772 - |
|
773 - | impl<I> ConnectTimeout<I> {
|
774 - | /// Create a new `ConnectTimeout` around `inner`.
|
775 - | ///
|
776 - | /// Typically, `I` will implement [`hyper_util::client::legacy::connect::Connect`].
|
777 - | pub(crate) fn new(inner: I, sleep: SharedAsyncSleep, timeout: Duration) -> Self {
|
778 - | Self {
|
779 - | inner,
|
780 - | timeout: Some((sleep, timeout)),
|
781 - | }
|
782 - | }
|
783 - |
|
784 - | pub(crate) fn no_timeout(inner: I) -> Self {
|
785 - | Self {
|
786 - | inner,
|
787 - | timeout: None,
|
788 - | }
|
789 - | }
|
790 - | }
|
791 - |
|
792 - | #[derive(Clone, Debug)]
|
793 - | pub(crate) struct HttpReadTimeout<I> {
|
794 - | inner: I,
|
795 - | timeout: Option<(SharedAsyncSleep, Duration)>,
|
796 - | }
|
797 - |
|
798 - | impl<I> HttpReadTimeout<I> {
|
799 - | /// Create a new `HttpReadTimeout` around `inner`.
|
800 - | ///
|
801 - | /// Typically, `I` will implement [`tower::Service<http::Request<SdkBody>>`].
|
802 - | pub(crate) fn new(inner: I, sleep: SharedAsyncSleep, timeout: Duration) -> Self {
|
803 - | Self {
|
804 - | inner,
|
805 - | timeout: Some((sleep, timeout)),
|
806 - | }
|
807 - | }
|
808 - |
|
809 - | pub(crate) fn no_timeout(inner: I) -> Self {
|
810 - | Self {
|
811 - | inner,
|
812 - | timeout: None,
|
813 - | }
|
814 - | }
|
815 - | }
|
816 - |
|
817 - | pin_project! {
|
818 - | /// Timeout future for Tower services
|
819 - | ///
|
820 - | /// Timeout future to handle timing out, mapping errors, and the possibility of not timing out
|
821 - | /// without incurring an additional allocation for each timeout layer.
|
822 - | #[project = MaybeTimeoutFutureProj]
|
823 - | pub enum MaybeTimeoutFuture<F> {
|
824 - | Timeout {
|
825 - | #[pin]
|
826 - | timeout: Timeout<F, Sleep>,
|
827 - | error_type: &'static str,
|
828 - | duration: Duration,
|
829 - | },
|
830 - | NoTimeout {
|
831 - | #[pin]
|
832 - | future: F
|
833 - | }
|
834 - | }
|
835 - | }
|
836 - |
|
837 - | impl<F, T, E> Future for MaybeTimeoutFuture<F>
|
838 - | where
|
839 - | F: Future<Output = Result<T, E>>,
|
840 - | E: Into<BoxError>,
|
841 - | {
|
842 - | type Output = Result<T, BoxError>;
|
843 - |
|
844 - | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
845 - | let (timeout_future, kind, &mut duration) = match self.project() {
|
846 - | MaybeTimeoutFutureProj::NoTimeout { future } => {
|
847 - | return future.poll(cx).map_err(|err| err.into());
|
848 - | }
|
849 - | MaybeTimeoutFutureProj::Timeout {
|
850 - | timeout,
|
851 - | error_type,
|
852 - | duration,
|
853 - | } => (timeout, error_type, duration),
|
854 - | };
|
855 - | match timeout_future.poll(cx) {
|
856 - | Poll::Ready(Ok(response)) => Poll::Ready(response.map_err(|err| err.into())),
|
857 - | Poll::Ready(Err(_timeout)) => {
|
858 - | Poll::Ready(Err(HttpTimeoutError { kind, duration }.into()))
|
859 - | }
|
860 - | Poll::Pending => Poll::Pending,
|
861 - | }
|
862 - | }
|
863 - | }
|
864 - |
|
865 - | impl<I> tower::Service<Uri> for ConnectTimeout<I>
|
866 - | where
|
867 - | I: tower::Service<Uri>,
|
868 - | I::Error: Into<BoxError>,
|
869 - | {
|
870 - | type Response = I::Response;
|
871 - | type Error = BoxError;
|
872 - | type Future = MaybeTimeoutFuture<I::Future>;
|
873 - |
|
874 - | fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
875 - | self.inner.poll_ready(cx).map_err(|err| err.into())
|
876 - | }
|
877 - |
|
878 - | fn call(&mut self, req: Uri) -> Self::Future {
|
879 - | match &self.timeout {
|
880 - | Some((sleep, duration)) => {
|
881 - | let sleep = sleep.sleep(*duration);
|
882 - | MaybeTimeoutFuture::Timeout {
|
883 - | timeout: Timeout::new(self.inner.call(req), sleep),
|
884 - | error_type: "HTTP connect",
|
885 - | duration: *duration,
|
886 - | }
|
887 - | }
|
888 - | None => MaybeTimeoutFuture::NoTimeout {
|
889 - | future: self.inner.call(req),
|
890 - | },
|
891 - | }
|
892 - | }
|
893 - | }
|
894 - |
|
895 - | impl<I, B> tower::Service<http::Request<B>> for HttpReadTimeout<I>
|
896 - | where
|
897 - | I: tower::Service<http::Request<B>>,
|
898 - | I::Error: Send + Sync + Error + 'static,
|
899 - | {
|
900 - | type Response = I::Response;
|
901 - | type Error = BoxError;
|
902 - | type Future = MaybeTimeoutFuture<I::Future>;
|
903 - |
|
904 - | fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
905 - | self.inner.poll_ready(cx).map_err(|err| err.into())
|
906 - | }
|
907 - |
|
908 - | fn call(&mut self, req: http::Request<B>) -> Self::Future {
|
909 - | match &self.timeout {
|
910 - | Some((sleep, duration)) => {
|
911 - | let sleep = sleep.sleep(*duration);
|
912 - | MaybeTimeoutFuture::Timeout {
|
913 - | timeout: Timeout::new(self.inner.call(req), sleep),
|
914 - | error_type: "HTTP read",
|
915 - | duration: *duration,
|
916 - | }
|
917 - | }
|
918 - | None => MaybeTimeoutFuture::NoTimeout {
|
919 - | future: self.inner.call(req),
|
920 - | },
|
921 - | }
|
922 - | }
|
923 - | }
|
924 - |
|
925 - | #[cfg(test)]
|
926 - | pub(crate) mod test {
|
927 - | use std::time::Duration;
|
928 - |
|
929 - | use hyper::rt::ReadBufCursor;
|
930 - | use hyper_util::client::legacy::connect::Connected;
|
931 - | use hyper_util::rt::TokioIo;
|
932 - | use tokio::net::TcpStream;
|
933 - |
|
934 - | use aws_smithy_async::assert_elapsed;
|
935 - | use aws_smithy_async::future::never::Never;
|
936 - | use aws_smithy_async::rt::sleep::{SharedAsyncSleep, TokioSleep};
|
937 - | use aws_smithy_types::error::display::DisplayErrorContext;
|
938 - |
|
939 - | use super::super::*;
|
940 - |
|
941 - | #[allow(unused)]
|
942 - | fn connect_timeout_is_correct<T: Send + Sync + Clone + 'static>() {
|
943 - | is_send_sync::<super::ConnectTimeout<T>>();
|
944 - | }
|
945 - |
|
946 - | #[allow(unused)]
|
947 - | fn is_send_sync<T: Send + Sync>() {}
|
948 - |
|
949 - | /// A service that will never return whatever it is you want
|
950 - | ///
|
951 - | /// Returned futures will return Pending forever
|
952 - | #[non_exhaustive]
|
953 - | #[derive(Clone, Default, Debug)]
|
954 - | pub(crate) struct NeverConnects;
|
955 - | impl tower::Service<Uri> for NeverConnects {
|
956 - | type Response = TokioIo<TcpStream>;
|
957 - | type Error = ConnectorError;
|
958 - | type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
|
959 - |
|
960 - | fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
961 - | Poll::Ready(Ok(()))
|
962 - | }
|
963 - |
|
964 - | fn call(&mut self, _uri: Uri) -> Self::Future {
|
965 - | Box::pin(async move {
|
966 - | Never::new().await;
|
967 - | unreachable!()
|
968 - | })
|
969 - | }
|
970 - | }
|
971 - |
|
972 - | /// A service that will connect but never send any data
|
973 - | #[derive(Clone, Debug, Default)]
|
974 - | struct NeverReplies;
|
975 - | impl tower::Service<Uri> for NeverReplies {
|
976 - | type Response = EmptyStream;
|
977 - | type Error = BoxError;
|
978 - | type Future = std::future::Ready<Result<Self::Response, Self::Error>>;
|
979 - |
|
980 - | fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
981 - | Poll::Ready(Ok(()))
|
982 - | }
|
983 - |
|
984 - | fn call(&mut self, _req: Uri) -> Self::Future {
|
985 - | std::future::ready(Ok(EmptyStream))
|
986 - | }
|
987 - | }
|
988 - |
|
989 - | /// A stream that will never return or accept any data
|
990 - | #[non_exhaustive]
|
991 - | #[derive(Debug, Default)]
|
992 - | struct EmptyStream;
|
993 - | impl Read for EmptyStream {
|
994 - | fn poll_read(
|
995 - | self: Pin<&mut Self>,
|
996 - | _cx: &mut Context<'_>,
|
997 - | _buf: ReadBufCursor<'_>,
|
998 - | ) -> Poll<Result<(), std::io::Error>> {
|
999 - | Poll::Pending
|
1000 - | }
|
1001 - | }
|
1002 - | impl Write for EmptyStream {
|
1003 - | fn poll_write(
|
1004 - | self: Pin<&mut Self>,
|
1005 - | _cx: &mut Context<'_>,
|
1006 - | _buf: &[u8],
|
1007 - | ) -> Poll<Result<usize, std::io::Error>> {
|
1008 - | Poll::Pending
|
1009 - | }
|
1010 - |
|
1011 - | fn poll_flush(
|
1012 - | self: Pin<&mut Self>,
|
1013 - | _cx: &mut Context<'_>,
|
1014 - | ) -> Poll<Result<(), std::io::Error>> {
|
1015 - | Poll::Pending
|
1016 - | }
|
1017 - |
|
1018 - | fn poll_shutdown(
|
1019 - | self: Pin<&mut Self>,
|
1020 - | _cx: &mut Context<'_>,
|
1021 - | ) -> Poll<Result<(), std::io::Error>> {
|
1022 - | Poll::Pending
|
1023 - | }
|
1024 - | }
|
1025 - | impl Connection for EmptyStream {
|
1026 - | fn connected(&self) -> Connected {
|
1027 - | Connected::new()
|
1028 - | }
|
1029 - | }
|
1030 - |
|
1031 - | #[tokio::test]
|
1032 - | async fn http_connect_timeout_works() {
|
1033 - | let tcp_connector = NeverConnects::default();
|
1034 - | let connector_settings = HttpConnectorSettings::builder()
|
1035 - | .connect_timeout(Duration::from_secs(1))
|
1036 - | .build();
|
1037 - | let hyper = HyperConnector::builder()
|
1038 - | .connector_settings(connector_settings)
|
1039 - | .sleep_impl(SharedAsyncSleep::new(TokioSleep::new()))
|
1040 - | .build(tcp_connector)
|
1041 - | .adapter;
|
1042 - | let now = tokio::time::Instant::now();
|
1043 - | tokio::time::pause();
|
1044 - | let resp = hyper
|
1045 - | .call(HttpRequest::get("https://static-uri.com").unwrap())
|
1046 - | .await
|
1047 - | .unwrap_err();
|
1048 - | assert!(
|
1049 - | resp.is_timeout(),
|
1050 - | "expected resp.is_timeout() to be true but it was false, resp == {:?}",
|
1051 - | resp
|
1052 - | );
|
1053 - | let message = DisplayErrorContext(&resp).to_string();
|
1054 - | let expected =
|
1055 - | "timeout: client error (Connect): HTTP connect timeout occurred after 1s";
|
1056 - | assert!(
|
1057 - | message.contains(expected),
|
1058 - | "expected '{message}' to contain '{expected}'"
|
1059 - | );
|
1060 - | assert_elapsed!(now, Duration::from_secs(1));
|
1061 - | }
|
1062 - |
|
1063 - | #[tokio::test]
|
1064 - | async fn http_read_timeout_works() {
|
1065 - | let tcp_connector = NeverReplies;
|
1066 - | let connector_settings = HttpConnectorSettings::builder()
|
1067 - | .connect_timeout(Duration::from_secs(1))
|
1068 - | .read_timeout(Duration::from_secs(2))
|
1069 - | .build();
|
1070 - | let hyper = HyperConnector::builder()
|
1071 - | .connector_settings(connector_settings)
|
1072 - | .sleep_impl(SharedAsyncSleep::new(TokioSleep::new()))
|
1073 - | .build(tcp_connector)
|
1074 - | .adapter;
|
1075 - | let now = tokio::time::Instant::now();
|
1076 - | tokio::time::pause();
|
1077 - | let err = hyper
|
1078 - | .call(HttpRequest::get("https://fake-uri.com").unwrap())
|
1079 - | .await
|
1080 - | .unwrap_err();
|
1081 - | assert!(
|
1082 - | err.is_timeout(),
|
1083 - | "expected err.is_timeout() to be true but it was false, err == {err:?}",
|
1084 - | );
|
1085 - | let message = format!("{}", DisplayErrorContext(&err));
|
1086 - | let expected = "timeout: HTTP read timeout occurred after 2s";
|
1087 - | assert!(
|
1088 - | message.contains(expected),
|
1089 - | "expected '{message}' to contain '{expected}'"
|
1090 - | );
|
1091 - | assert_elapsed!(now, Duration::from_secs(2));
|
1092 - | }
|
1093 - | }
|
1094 - | }
|
1095 - |
|
1096 - | #[cfg(test)]
|
1097 - | mod test {
|
1098 - | use std::io::{Error, ErrorKind};
|
1099 - | use std::pin::Pin;
|
1100 - | use std::sync::atomic::{AtomicU32, Ordering};
|
1101 - | use std::sync::Arc;
|
1102 - | use std::task::{Context, Poll};
|
1103 - |
|
1104 - | use http::Uri;
|
1105 - | use hyper::rt::ReadBufCursor;
|
1106 - | use hyper_util::client::legacy::connect::Connected;
|
1107 - |
|
1108 - | use aws_smithy_async::time::SystemTimeSource;
|
1109 - | use aws_smithy_runtime_api::client::runtime_components::RuntimeComponentsBuilder;
|
1110 - |
|
1111 - | use crate::hyper_1_0::timeout_middleware::test::NeverConnects;
|
1112 - |
|
1113 - | use super::*;
|
1114 - |
|
1115 - | #[tokio::test]
|
1116 - | async fn connector_selection() {
|
1117 - | // Create a client that increments a count every time it creates a new HyperConnector
|
1118 - | let creation_count = Arc::new(AtomicU32::new(0));
|
1119 - | let http_client = build_with_fn(None, {
|
1120 - | let count = creation_count.clone();
|
1121 - | move || {
|
1122 - | count.fetch_add(1, Ordering::Relaxed);
|
1123 - | NeverConnects
|
1124 - | }
|
1125 - | });
|
1126 - |
|
1127 - | // This configuration should result in 4 separate connectors with different timeout settings
|
1128 - | let settings = [
|
1129 - | HttpConnectorSettings::builder()
|
1130 - | .connect_timeout(Duration::from_secs(3))
|
1131 - | .build(),
|
1132 - | HttpConnectorSettings::builder()
|
1133 - | .read_timeout(Duration::from_secs(3))
|
1134 - | .build(),
|
1135 - | HttpConnectorSettings::builder()
|
1136 - | .connect_timeout(Duration::from_secs(3))
|
1137 - | .read_timeout(Duration::from_secs(3))
|
1138 - | .build(),
|
1139 - | HttpConnectorSettings::builder()
|
1140 - | .connect_timeout(Duration::from_secs(5))
|
1141 - | .read_timeout(Duration::from_secs(3))
|
1142 - | .build(),
|
1143 - | ];
|
1144 - |
|
1145 - | // Kick off thousands of parallel tasks that will try to create a connector
|
1146 - | let components = RuntimeComponentsBuilder::for_tests()
|
1147 - | .with_time_source(Some(SystemTimeSource::new()))
|
1148 - | .build()
|
1149 - | .unwrap();
|
1150 - | let mut handles = Vec::new();
|
1151 - | for setting in &settings {
|
1152 - | for _ in 0..1000 {
|
1153 - | let client = http_client.clone();
|
1154 - | handles.push(tokio::spawn({
|
1155 - | let setting = setting.clone();
|
1156 - | let components = components.clone();
|
1157 - | async move {
|
1158 - | let _ = client.http_connector(&setting, &components);
|
1159 - | }
|
1160 - | }));
|
1161 - | }
|
1162 - | }
|
1163 - | for handle in handles {
|
1164 - | handle.await.unwrap();
|
1165 - | }
|
1166 - |
|
1167 - | // Verify only 4 connectors were created amidst the chaos
|
1168 - | assert_eq!(4, creation_count.load(Ordering::Relaxed));
|
1169 - | }
|
1170 - |
|
1171 - | #[tokio::test]
|
1172 - | async fn hyper_io_error() {
|
1173 - | let connector = TestConnection {
|
1174 - | inner: HangupStream,
|
1175 - | };
|
1176 - | let adapter = HyperConnector::builder().build(connector).adapter;
|
1177 - | let err = adapter
|
1178 - | .call(HttpRequest::get("https://socket-hangup.com").unwrap())
|
1179 - | .await
|
1180 - | .expect_err("socket hangup");
|
1181 - | assert!(err.is_io(), "unexpected error type: {:?}", err);
|
1182 - | }
|
1183 - |
|
1184 - | // ---- machinery to make a Hyper connector that responds with an IO Error
|
1185 - | #[derive(Clone)]
|
1186 - | struct HangupStream;
|
1187 - |
|
1188 - | impl Connection for HangupStream {
|
1189 - | fn connected(&self) -> Connected {
|
1190 - | Connected::new()
|
1191 - | }
|
1192 - | }
|
1193 - |
|
1194 - | impl Read for HangupStream {
|
1195 - | fn poll_read(
|
1196 - | self: Pin<&mut Self>,
|
1197 - | _cx: &mut Context<'_>,
|
1198 - | _buf: ReadBufCursor<'_>,
|
1199 - | ) -> Poll<std::io::Result<()>> {
|
1200 - | Poll::Ready(Err(Error::new(
|
1201 - | ErrorKind::ConnectionReset,
|
1202 - | "connection reset",
|
1203 - | )))
|
1204 - | }
|
1205 - | }
|
1206 - |
|
1207 - | impl Write for HangupStream {
|
1208 - | fn poll_write(
|
1209 - | self: Pin<&mut Self>,
|
1210 - | _cx: &mut Context<'_>,
|
1211 - | _buf: &[u8],
|
1212 - | ) -> Poll<Result<usize, Error>> {
|
1213 - | Poll::Pending
|
1214 - | }
|
1215 - |
|
1216 - | fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
|
1217 - | Poll::Pending
|
1218 - | }
|
1219 - |
|
1220 - | fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
|
1221 - | Poll::Pending
|
1222 - | }
|
1223 - | }
|
1224 - |
|
1225 - | #[derive(Clone)]
|
1226 - | struct TestConnection<T> {
|
1227 - | inner: T,
|
1228 - | }
|
1229 - |
|
1230 - | impl<T> tower::Service<Uri> for TestConnection<T>
|
1231 - | where
|
1232 - | T: Clone + Connection,
|
1233 - | {
|
1234 - | type Response = T;
|
1235 - | type Error = BoxError;
|
1236 - | type Future = std::future::Ready<Result<Self::Response, Self::Error>>;
|
1237 - |
|
1238 - | fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
1239 - | Poll::Ready(Ok(()))
|
1240 - | }
|
1241 - |
|
1242 - | fn call(&mut self, _req: Uri) -> Self::Future {
|
1243 - | std::future::ready(Ok(self.inner.clone()))
|
1244 - | }
|
1245 - | }
|
1246 - | }
|