aws_smithy_runtime/client/
defaults.rs

1/*
2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6//! Runtime plugins that provide defaults for clients.
7//!
8//! Note: these are the absolute base-level defaults. They may not be the defaults
9//! for _your_ client, since many things can change these defaults on the way to
10//! code generating and constructing a full client.
11
12use crate::client::http::body::content_length_enforcement::EnforceContentLengthRuntimePlugin;
13use crate::client::identity::IdentityCache;
14use crate::client::retries::strategy::standard::TokenBucketProvider;
15use crate::client::retries::strategy::StandardRetryStrategy;
16use crate::client::retries::RetryPartition;
17use aws_smithy_async::rt::sleep::default_async_sleep;
18use aws_smithy_async::time::{SharedTimeSource, SystemTimeSource, TimeSource};
19use aws_smithy_runtime_api::box_error::BoxError;
20use aws_smithy_runtime_api::client::behavior_version::BehaviorVersion;
21use aws_smithy_runtime_api::client::http::SharedHttpClient;
22use aws_smithy_runtime_api::client::runtime_components::{
23    RuntimeComponentsBuilder, SharedConfigValidator,
24};
25use aws_smithy_runtime_api::client::runtime_plugin::{
26    Order, SharedRuntimePlugin, StaticRuntimePlugin,
27};
28use aws_smithy_runtime_api::client::stalled_stream_protection::StalledStreamProtectionConfig;
29use aws_smithy_runtime_api::shared::IntoShared;
30use aws_smithy_types::config_bag::{ConfigBag, FrozenLayer, Layer};
31use aws_smithy_types::retry::RetryConfig;
32use aws_smithy_types::timeout::TimeoutConfig;
33use std::borrow::Cow;
34use std::time::Duration;
35
36fn default_plugin<CompFn>(name: &'static str, components_fn: CompFn) -> StaticRuntimePlugin
37where
38    CompFn: FnOnce(RuntimeComponentsBuilder) -> RuntimeComponentsBuilder,
39{
40    StaticRuntimePlugin::new()
41        .with_order(Order::Defaults)
42        .with_runtime_components((components_fn)(RuntimeComponentsBuilder::new(name)))
43}
44
45fn layer<LayerFn>(name: &'static str, layer_fn: LayerFn) -> FrozenLayer
46where
47    LayerFn: FnOnce(&mut Layer),
48{
49    let mut layer = Layer::new(name);
50    (layer_fn)(&mut layer);
51    layer.freeze()
52}
53
54/// Runtime plugin that provides a default connector.
55#[deprecated(
56    since = "1.8.0",
57    note = "This function wasn't intended to be public, and didn't take the behavior major version as an argument, so it couldn't be evolved over time."
58)]
59pub fn default_http_client_plugin() -> Option<SharedRuntimePlugin> {
60    #[allow(deprecated)]
61    default_http_client_plugin_v2(BehaviorVersion::v2024_03_28())
62}
63
64/// Runtime plugin that provides a default HTTPS connector.
65pub fn default_http_client_plugin_v2(
66    behavior_version: BehaviorVersion,
67) -> Option<SharedRuntimePlugin> {
68    let mut _default: Option<SharedHttpClient> = None;
69
70    #[allow(deprecated)]
71    if behavior_version.is_at_least(BehaviorVersion::v2025_01_17()) {
72        // the latest https stack takes precedence if the config flag
73        // is enabled otherwise try to fall back to the legacy connector
74        // if that feature flag is available.
75        #[cfg(all(
76            feature = "connector-hyper-0-14-x",
77            not(feature = "default-https-client")
78        ))]
79        #[allow(deprecated)]
80        {
81            _default = crate::client::http::hyper_014::default_client();
82        }
83
84        // takes precedence over legacy connector if enabled
85        #[cfg(feature = "default-https-client")]
86        {
87            let opts = crate::client::http::DefaultClientOptions::default()
88                .with_behavior_version(behavior_version);
89            _default = crate::client::http::default_https_client(opts);
90        }
91    } else {
92        // fallback to legacy hyper client for given behavior version
93        #[cfg(feature = "connector-hyper-0-14-x")]
94        #[allow(deprecated)]
95        {
96            _default = crate::client::http::hyper_014::default_client();
97        }
98    }
99
100    _default.map(|default| {
101        default_plugin("default_http_client_plugin", |components| {
102            components.with_http_client(Some(default))
103        })
104        .into_shared()
105    })
106}
107
108/// Runtime plugin that provides a default async sleep implementation.
109pub fn default_sleep_impl_plugin() -> Option<SharedRuntimePlugin> {
110    default_async_sleep().map(|default| {
111        default_plugin("default_sleep_impl_plugin", |components| {
112            components.with_sleep_impl(Some(default))
113        })
114        .into_shared()
115    })
116}
117
118/// Runtime plugin that provides a default time source.
119pub fn default_time_source_plugin() -> Option<SharedRuntimePlugin> {
120    Some(
121        default_plugin("default_time_source_plugin", |components| {
122            components.with_time_source(Some(SystemTimeSource::new()))
123        })
124        .into_shared(),
125    )
126}
127
128/// Runtime plugin that sets the default retry strategy, config (disabled), and partition.
129#[deprecated = "Use default_retry_config_plugin_v2 to get a TokenBucket that respects the user provided TimeSource."]
130pub fn default_retry_config_plugin(
131    default_partition_name: impl Into<Cow<'static, str>>,
132) -> Option<SharedRuntimePlugin> {
133    let retry_partition = RetryPartition::new(default_partition_name);
134    Some(
135        default_plugin("default_retry_config_plugin", |components| {
136            components
137                .with_retry_strategy(Some(StandardRetryStrategy::new()))
138                .with_config_validator(SharedConfigValidator::base_client_config_fn(
139                    validate_retry_config,
140                ))
141                .with_interceptor(TokenBucketProvider::new(
142                    retry_partition.clone(),
143                    SharedTimeSource::default(), // Replicates previous behavior
144                ))
145        })
146        .with_config(layer("default_retry_config", |layer| {
147            layer.store_put(RetryConfig::disabled());
148            layer.store_put(retry_partition);
149        }))
150        .into_shared(),
151    )
152}
153
154/// Runtime plugin that sets the default retry strategy, config (disabled), and partition.
155pub fn default_retry_config_plugin_v2(
156    default_plugin_params: &DefaultPluginParams,
157) -> Option<SharedRuntimePlugin> {
158    let retry_partition = RetryPartition::new(
159        default_plugin_params
160            .retry_partition_name()
161            .clone()
162            .expect("retry_partition_name is required"),
163    );
164    Some(
165        default_plugin("default_retry_config_plugin", |components| {
166            components
167                .with_retry_strategy(Some(StandardRetryStrategy::new()))
168                .with_config_validator(SharedConfigValidator::base_client_config_fn(
169                    validate_retry_config,
170                ))
171                .with_interceptor(TokenBucketProvider::new(
172                    retry_partition.clone(),
173                    default_plugin_params
174                        .time_source
175                        .clone()
176                        .unwrap_or_default(),
177                ))
178        })
179        .with_config(layer("default_retry_config", |layer| {
180            layer.store_put(RetryConfig::disabled());
181            layer.store_put(retry_partition);
182        }))
183        .into_shared(),
184    )
185}
186
187fn validate_retry_config(
188    components: &RuntimeComponentsBuilder,
189    cfg: &ConfigBag,
190) -> Result<(), BoxError> {
191    if let Some(retry_config) = cfg.load::<RetryConfig>() {
192        if retry_config.has_retry() && components.sleep_impl().is_none() {
193            Err("An async sleep implementation is required for retry to work. Please provide a `sleep_impl` on \
194                 the config, or disable timeouts.".into())
195        } else {
196            Ok(())
197        }
198    } else {
199        Err(
200            "The default retry config was removed, and no other config was put in its place."
201                .into(),
202        )
203    }
204}
205
206/// Runtime plugin that sets the default timeout config (no timeouts).
207pub fn default_timeout_config_plugin() -> Option<SharedRuntimePlugin> {
208    Some(
209        default_plugin("default_timeout_config_plugin", |components| {
210            components.with_config_validator(SharedConfigValidator::base_client_config_fn(
211                validate_timeout_config,
212            ))
213        })
214        .with_config(layer("default_timeout_config", |layer| {
215            layer.store_put(TimeoutConfig::disabled());
216        }))
217        .into_shared(),
218    )
219}
220
221fn validate_timeout_config(
222    components: &RuntimeComponentsBuilder,
223    cfg: &ConfigBag,
224) -> Result<(), BoxError> {
225    if let Some(timeout_config) = cfg.load::<TimeoutConfig>() {
226        if timeout_config.has_timeouts() && components.sleep_impl().is_none() {
227            Err("An async sleep implementation is required for timeouts to work. Please provide a `sleep_impl` on \
228                 the config, or disable timeouts.".into())
229        } else {
230            Ok(())
231        }
232    } else {
233        Err(
234            "The default timeout config was removed, and no other config was put in its place."
235                .into(),
236        )
237    }
238}
239
240/// Runtime plugin that registers the default identity cache implementation.
241pub fn default_identity_cache_plugin() -> Option<SharedRuntimePlugin> {
242    Some(
243        default_plugin("default_identity_cache_plugin", |components| {
244            components.with_identity_cache(Some(IdentityCache::lazy().build()))
245        })
246        .into_shared(),
247    )
248}
249
250/// Runtime plugin that sets the default stalled stream protection config.
251///
252/// By default, when throughput falls below 1/Bs for more than 5 seconds, the
253/// stream is cancelled.
254#[deprecated(
255    since = "1.2.0",
256    note = "This function wasn't intended to be public, and didn't take the behavior major version as an argument, so it couldn't be evolved over time."
257)]
258pub fn default_stalled_stream_protection_config_plugin() -> Option<SharedRuntimePlugin> {
259    #[allow(deprecated)]
260    default_stalled_stream_protection_config_plugin_v2(BehaviorVersion::v2023_11_09())
261}
262fn default_stalled_stream_protection_config_plugin_v2(
263    behavior_version: BehaviorVersion,
264) -> Option<SharedRuntimePlugin> {
265    Some(
266        default_plugin(
267            "default_stalled_stream_protection_config_plugin",
268            |components| {
269                components.with_config_validator(SharedConfigValidator::base_client_config_fn(
270                    validate_stalled_stream_protection_config,
271                ))
272            },
273        )
274        .with_config(layer("default_stalled_stream_protection_config", |layer| {
275            let mut config =
276                StalledStreamProtectionConfig::enabled().grace_period(Duration::from_secs(5));
277            // Before v2024_03_28, upload streams did not have stalled stream protection by default
278            #[allow(deprecated)]
279            if !behavior_version.is_at_least(BehaviorVersion::v2024_03_28()) {
280                config = config.upload_enabled(false);
281            }
282            layer.store_put(config.build());
283        }))
284        .into_shared(),
285    )
286}
287
288fn enforce_content_length_runtime_plugin() -> Option<SharedRuntimePlugin> {
289    Some(EnforceContentLengthRuntimePlugin::new().into_shared())
290}
291
292fn validate_stalled_stream_protection_config(
293    components: &RuntimeComponentsBuilder,
294    cfg: &ConfigBag,
295) -> Result<(), BoxError> {
296    if let Some(stalled_stream_protection_config) = cfg.load::<StalledStreamProtectionConfig>() {
297        if stalled_stream_protection_config.is_enabled() {
298            if components.sleep_impl().is_none() {
299                return Err(
300                    "An async sleep implementation is required for stalled stream protection to work. \
301                     Please provide a `sleep_impl` on the config, or disable stalled stream protection.".into());
302            }
303
304            if components.time_source().is_none() {
305                return Err(
306                    "A time source is required for stalled stream protection to work.\
307                     Please provide a `time_source` on the config, or disable stalled stream protection.".into());
308            }
309        }
310
311        Ok(())
312    } else {
313        Err(
314            "The default stalled stream protection config was removed, and no other config was put in its place."
315                .into(),
316        )
317    }
318}
319
320/// Arguments for the [`default_plugins`] method.
321///
322/// This is a struct to enable adding new parameters in the future without breaking the API.
323#[non_exhaustive]
324#[derive(Debug, Default)]
325pub struct DefaultPluginParams {
326    retry_partition_name: Option<Cow<'static, str>>,
327    behavior_version: Option<BehaviorVersion>,
328    time_source: Option<SharedTimeSource>,
329}
330
331impl DefaultPluginParams {
332    /// Creates a new [`DefaultPluginParams`].
333    pub fn new() -> Self {
334        Default::default()
335    }
336
337    /// Sets the retry partition name.
338    pub fn with_retry_partition_name(mut self, name: impl Into<Cow<'static, str>>) -> Self {
339        self.retry_partition_name = Some(name.into());
340        self
341    }
342
343    /// Gets the retry partition name.
344    pub fn retry_partition_name(&self) -> &Option<Cow<'static, str>> {
345        &self.retry_partition_name
346    }
347
348    /// Sets the behavior major version.
349    pub fn with_behavior_version(mut self, version: BehaviorVersion) -> Self {
350        self.behavior_version = Some(version);
351        self
352    }
353
354    /// Gets the behavior major version.
355    pub fn behavior_version(&self) -> &Option<BehaviorVersion> {
356        &self.behavior_version
357    }
358
359    /// Sets the time_source.
360    pub fn with_time_source(mut self, time_source: impl TimeSource + 'static) -> Self {
361        self.time_source = Some(SharedTimeSource::new(time_source));
362        self
363    }
364
365    /// Gets the time_source.
366    pub fn time_source(&self) -> &Option<SharedTimeSource> {
367        &self.time_source
368    }
369}
370
371/// All default plugins.
372pub fn default_plugins(
373    params: DefaultPluginParams,
374) -> impl IntoIterator<Item = SharedRuntimePlugin> {
375    let behavior_version = params
376        .behavior_version
377        .unwrap_or_else(BehaviorVersion::latest);
378
379    [
380        default_http_client_plugin_v2(behavior_version),
381        default_identity_cache_plugin(),
382        default_retry_config_plugin_v2(&params),
383        default_sleep_impl_plugin(),
384        default_time_source_plugin(),
385        default_timeout_config_plugin(),
386        enforce_content_length_runtime_plugin(),
387        default_stalled_stream_protection_config_plugin_v2(behavior_version),
388    ]
389    .into_iter()
390    .flatten()
391    .collect::<Vec<SharedRuntimePlugin>>()
392}
393
394#[cfg(test)]
395mod tests {
396    use super::*;
397    use aws_smithy_runtime_api::client::runtime_plugin::RuntimePlugins;
398
399    fn test_plugin_params(version: BehaviorVersion) -> DefaultPluginParams {
400        DefaultPluginParams::new()
401            .with_behavior_version(version)
402            .with_retry_partition_name("dontcare")
403    }
404    fn config_for(plugins: impl IntoIterator<Item = SharedRuntimePlugin>) -> ConfigBag {
405        let mut config = ConfigBag::base();
406        let plugins = RuntimePlugins::new().with_client_plugins(plugins);
407        plugins.apply_client_configuration(&mut config).unwrap();
408        config
409    }
410
411    #[test]
412    #[allow(deprecated)]
413    fn v2024_03_28_stalled_stream_protection_difference() {
414        let latest = config_for(default_plugins(test_plugin_params(
415            BehaviorVersion::latest(),
416        )));
417        let v2023 = config_for(default_plugins(test_plugin_params(
418            BehaviorVersion::v2023_11_09(),
419        )));
420
421        assert!(
422            latest
423                .load::<StalledStreamProtectionConfig>()
424                .unwrap()
425                .upload_enabled(),
426            "stalled stream protection on uploads MUST be enabled after v2024_03_28"
427        );
428        assert!(
429            !v2023
430                .load::<StalledStreamProtectionConfig>()
431                .unwrap()
432                .upload_enabled(),
433            "stalled stream protection on uploads MUST NOT be enabled before v2024_03_28"
434        );
435    }
436}