aws_smithy_runtime/client/identity/cache/
lazy.rs

1/*
2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6use crate::expiring_cache::ExpiringCache;
7use aws_smithy_async::future::timeout::Timeout;
8use aws_smithy_async::rt::sleep::{AsyncSleep, SharedAsyncSleep};
9use aws_smithy_async::time::{SharedTimeSource, TimeSource};
10use aws_smithy_runtime_api::box_error::BoxError;
11use aws_smithy_runtime_api::client::identity::{
12    Identity, IdentityCachePartition, IdentityFuture, ResolveCachedIdentity, ResolveIdentity,
13    SharedIdentityCache, SharedIdentityResolver,
14};
15use aws_smithy_runtime_api::client::runtime_components::RuntimeComponents;
16use aws_smithy_runtime_api::shared::IntoShared;
17use aws_smithy_types::config_bag::ConfigBag;
18use aws_smithy_types::DateTime;
19use std::collections::HashMap;
20use std::fmt;
21use std::sync::RwLock;
22use std::time::Duration;
23use tracing::Instrument;
24
25const DEFAULT_LOAD_TIMEOUT: Duration = Duration::from_secs(5);
26const DEFAULT_EXPIRATION: Duration = Duration::from_secs(15 * 60);
27const DEFAULT_BUFFER_TIME: Duration = Duration::from_secs(10);
28const DEFAULT_BUFFER_TIME_JITTER_FRACTION: fn() -> f64 = || fastrand::f64() * 0.5;
29const DEFAULT_MAX_PARTITIONS: usize = 64;
30
31/// Builder for lazy identity caching.
32#[derive(Default, Debug)]
33pub struct LazyCacheBuilder {
34    time_source: Option<SharedTimeSource>,
35    sleep_impl: Option<SharedAsyncSleep>,
36    load_timeout: Option<Duration>,
37    buffer_time: Option<Duration>,
38    buffer_time_jitter_fraction: Option<fn() -> f64>,
39    default_expiration: Option<Duration>,
40    max_partitions: Option<usize>,
41}
42
43impl LazyCacheBuilder {
44    /// Create a new builder.
45    pub fn new() -> Self {
46        Default::default()
47    }
48
49    /// Set the time source for this cache.
50    pub fn time_source(mut self, time_source: impl TimeSource + 'static) -> Self {
51        self.set_time_source(time_source.into_shared());
52        self
53    }
54    /// Set the time source for this cache.
55    pub fn set_time_source(&mut self, time_source: SharedTimeSource) -> &mut Self {
56        self.time_source = Some(time_source.into_shared());
57        self
58    }
59
60    /// Set the async sleep implementation for this cache.
61    pub fn sleep_impl(mut self, sleep_impl: impl AsyncSleep + 'static) -> Self {
62        self.set_sleep_impl(sleep_impl.into_shared());
63        self
64    }
65    /// Set the async sleep implementation for this cache.
66    pub fn set_sleep_impl(&mut self, sleep_impl: SharedAsyncSleep) -> &mut Self {
67        self.sleep_impl = Some(sleep_impl);
68        self
69    }
70
71    /// Timeout for identity resolution.
72    ///
73    /// Defaults to 5 seconds.
74    pub fn load_timeout(mut self, timeout: Duration) -> Self {
75        self.set_load_timeout(Some(timeout));
76        self
77    }
78
79    /// Timeout for identity resolution.
80    ///
81    /// Defaults to 5 seconds.
82    pub fn set_load_timeout(&mut self, timeout: Option<Duration>) -> &mut Self {
83        self.load_timeout = timeout;
84        self
85    }
86
87    /// Amount of time before the actual identity expiration time where the identity is considered expired.
88    ///
89    /// For example, if the identity are expiring in 15 minutes, and the buffer time is 10 seconds,
90    /// then any requests made after 14 minutes and 50 seconds will load a new identity.
91    ///
92    /// Note: random jitter value between [0.0, 0.5] is multiplied to this buffer time.
93    ///
94    /// Defaults to 10 seconds.
95    pub fn buffer_time(mut self, buffer_time: Duration) -> Self {
96        self.set_buffer_time(Some(buffer_time));
97        self
98    }
99
100    /// Amount of time before the actual identity expiration time where the identity is considered expired.
101    ///
102    /// For example, if the identity are expiring in 15 minutes, and the buffer time is 10 seconds,
103    /// then any requests made after 14 minutes and 50 seconds will load a new identity.
104    ///
105    /// Note: random jitter value between [0.0, 0.5] is multiplied to this buffer time.
106    ///
107    /// Defaults to 10 seconds.
108    pub fn set_buffer_time(&mut self, buffer_time: Option<Duration>) -> &mut Self {
109        self.buffer_time = buffer_time;
110        self
111    }
112
113    /// A random percentage by which buffer time is jittered for randomization.
114    ///
115    /// For example, if the identity is expiring in 15 minutes, the buffer time is 10 seconds,
116    /// and buffer time jitter fraction is 0.2, then buffer time is adjusted to 8 seconds.
117    /// Therefore, any requests made after 14 minutes and 52 seconds will load a new identity.
118    ///
119    /// Defaults to a randomly generated value between 0.0 and 0.5. This setter is for testing only.
120    #[allow(unused)]
121    #[cfg(test)]
122    fn buffer_time_jitter_fraction(mut self, buffer_time_jitter_fraction: fn() -> f64) -> Self {
123        self.set_buffer_time_jitter_fraction(Some(buffer_time_jitter_fraction));
124        self
125    }
126
127    /// A random percentage by which buffer time is jittered for randomization.
128    ///
129    /// For example, if the identity is expiring in 15 minutes, the buffer time is 10 seconds,
130    /// and buffer time jitter fraction is 0.2, then buffer time is adjusted to 8 seconds.
131    /// Therefore, any requests made after 14 minutes and 52 seconds will load a new identity.
132    ///
133    /// Defaults to a randomly generated value between 0.0 and 0.5. This setter is for testing only.
134    #[allow(unused)]
135    #[cfg(test)]
136    fn set_buffer_time_jitter_fraction(
137        &mut self,
138        buffer_time_jitter_fraction: Option<fn() -> f64>,
139    ) -> &mut Self {
140        self.buffer_time_jitter_fraction = buffer_time_jitter_fraction;
141        self
142    }
143
144    /// Default expiration time to set on an identity if it doesn't have an expiration time.
145    ///
146    /// This is only used if the resolved identity doesn't have an expiration time set.
147    /// This must be at least 15 minutes.
148    ///
149    /// Defaults to 15 minutes.
150    pub fn default_expiration(mut self, duration: Duration) -> Self {
151        self.set_default_expiration(Some(duration));
152        self
153    }
154
155    /// Default expiration time to set on an identity if it doesn't have an expiration time.
156    ///
157    /// This is only used if the resolved identity doesn't have an expiration time set.
158    /// This must be at least 15 minutes.
159    ///
160    /// Defaults to 15 minutes.
161    pub fn set_default_expiration(&mut self, duration: Option<Duration>) -> &mut Self {
162        self.default_expiration = duration;
163        self
164    }
165
166    /// Maximum number of identity cache partitions before eviction occurs.
167    ///
168    /// A normally functioning application should not have more than 5-10
169    /// credential providers active at any given time. This limit acts as
170    /// a safety net against memory leaks.
171    ///
172    /// Defaults to 64.
173    ///
174    /// # Panics
175    ///
176    /// Panics if `max` is 0.
177    pub fn max_partitions(mut self, max: usize) -> Self {
178        self.set_max_partitions(Some(max));
179        self
180    }
181
182    /// Maximum number of identity cache partitions before eviction occurs.
183    ///
184    /// A normally functioning application should not have more than 5-10
185    /// credential providers active at any given time. This limit acts as
186    /// a safety net against memory leaks.
187    ///
188    /// Defaults to 64.
189    ///
190    /// # Panics
191    ///
192    /// Panics if `max` is `Some(0)`.
193    pub fn set_max_partitions(&mut self, max: Option<usize>) -> &mut Self {
194        if let Some(0) = max {
195            panic!("max_partitions must be greater than 0");
196        }
197        self.max_partitions = max;
198        self
199    }
200
201    /// Builds a [`SharedIdentityCache`] from this builder.
202    ///
203    /// # Panics
204    ///
205    /// This builder will panic if required fields are not given, or if given values are not valid.
206    pub fn build(self) -> SharedIdentityCache {
207        let default_expiration = self.default_expiration.unwrap_or(DEFAULT_EXPIRATION);
208        assert!(
209            default_expiration >= DEFAULT_EXPIRATION,
210            "default_expiration must be at least 15 minutes"
211        );
212        LazyCache::new(
213            self.load_timeout.unwrap_or(DEFAULT_LOAD_TIMEOUT),
214            self.buffer_time.unwrap_or(DEFAULT_BUFFER_TIME),
215            self.buffer_time_jitter_fraction
216                .unwrap_or(DEFAULT_BUFFER_TIME_JITTER_FRACTION),
217            default_expiration,
218            self.max_partitions.unwrap_or(DEFAULT_MAX_PARTITIONS),
219        )
220        .into_shared()
221    }
222}
223
224#[derive(Debug)]
225struct CachePartitions {
226    partitions: RwLock<HashMap<IdentityCachePartition, ExpiringCache<Identity, BoxError>>>,
227    buffer_time: Duration,
228    max_partitions: usize,
229}
230
231impl CachePartitions {
232    fn new(buffer_time: Duration, max_partitions: usize) -> Self {
233        Self {
234            partitions: RwLock::new(HashMap::new()),
235            buffer_time,
236            max_partitions,
237        }
238    }
239
240    fn partition(&self, key: IdentityCachePartition) -> ExpiringCache<Identity, BoxError> {
241        // Fast path: read lock for cache hits
242        if let Some(partition) = self.partitions.read().unwrap().get(&key).cloned() {
243            return partition;
244        }
245        // Slow path: write lock for cache misses
246        let mut partitions = self.partitions.write().unwrap();
247        // Another thread may have inserted while we waited for the write lock
248        if let Some(partition) = partitions.get(&key).cloned() {
249            return partition;
250        }
251        // Evict an arbitrary entry if at capacity. Eviction order doesn't matter
252        // because a normally functioning application should not have more than
253        // 5-10 credential providers active at any given time, well under the cap.
254        if partitions.len() >= self.max_partitions {
255            if let Some(&evict_key) = partitions.keys().next() {
256                partitions.remove(&evict_key);
257            }
258        }
259        let partition = ExpiringCache::new(self.buffer_time);
260        partitions.insert(key, partition.clone());
261        tracing::debug!(
262            partition_count = partitions.len(),
263            "identity cache partition created"
264        );
265        partition
266    }
267}
268
269#[derive(Debug)]
270struct LazyCache {
271    partitions: CachePartitions,
272    load_timeout: Duration,
273    buffer_time: Duration,
274    buffer_time_jitter_fraction: fn() -> f64,
275    default_expiration: Duration,
276}
277
278impl LazyCache {
279    fn new(
280        load_timeout: Duration,
281        buffer_time: Duration,
282        buffer_time_jitter_fraction: fn() -> f64,
283        default_expiration: Duration,
284        max_partitions: usize,
285    ) -> Self {
286        Self {
287            partitions: CachePartitions::new(buffer_time, max_partitions),
288            load_timeout,
289            buffer_time,
290            buffer_time_jitter_fraction,
291            default_expiration,
292        }
293    }
294}
295
296macro_rules! required_err {
297    ($thing:literal, $how:literal) => {
298        BoxError::from(concat!(
299            "Lazy identity caching requires ",
300            $thing,
301            " to be configured. ",
302            $how,
303            " If this isn't possible, then disable identity caching by calling ",
304            "the `identity_cache` method on config with `IdentityCache::no_cache()`",
305        ))
306    };
307}
308macro_rules! validate_components {
309    ($components:ident) => {
310        let _ = $components.time_source().ok_or_else(|| {
311            required_err!(
312                "a time source",
313                "Set a time source using the `time_source` method on config."
314            )
315        })?;
316        let _ = $components.sleep_impl().ok_or_else(|| {
317            required_err!(
318                "an async sleep implementation",
319                "Set a sleep impl using the `sleep_impl` method on config."
320            )
321        })?;
322    };
323}
324
325impl ResolveCachedIdentity for LazyCache {
326    fn validate_base_client_config(
327        &self,
328        runtime_components: &aws_smithy_runtime_api::client::runtime_components::RuntimeComponentsBuilder,
329        _cfg: &ConfigBag,
330    ) -> Result<(), BoxError> {
331        validate_components!(runtime_components);
332        Ok(())
333    }
334
335    fn validate_final_config(
336        &self,
337        runtime_components: &RuntimeComponents,
338        _cfg: &ConfigBag,
339    ) -> Result<(), BoxError> {
340        validate_components!(runtime_components);
341        Ok(())
342    }
343
344    fn resolve_cached_identity<'a>(
345        &'a self,
346        resolver: SharedIdentityResolver,
347        runtime_components: &'a RuntimeComponents,
348        config_bag: &'a ConfigBag,
349    ) -> IdentityFuture<'a> {
350        let (time_source, sleep_impl) = (
351            runtime_components.time_source().expect("validated"),
352            runtime_components.sleep_impl().expect("validated"),
353        );
354
355        let now = time_source.now();
356        let timeout_future = sleep_impl.sleep(self.load_timeout);
357        let load_timeout = self.load_timeout;
358        let partition = resolver.cache_partition();
359        let cache = self.partitions.partition(partition);
360        let default_expiration = self.default_expiration;
361
362        IdentityFuture::new(async move {
363            // Attempt to get cached identity, or clear the cache if they're expired
364            if let Some(identity) = cache.yield_or_clear_if_expired(now).await {
365                tracing::debug!(
366                    buffer_time=?self.buffer_time,
367                    cached_expiration=?identity.expiration(),
368                    now=?now,
369                    "loaded identity from cache"
370                );
371                Ok(identity)
372            } else {
373                // If we didn't get identity from the cache, then we need to try and load.
374                // There may be other threads also loading simultaneously, but this is OK
375                // since the futures are not eagerly executed, and the cache will only run one
376                // of them.
377                let start_time = time_source.now();
378                let result = cache
379                    .get_or_load(|| {
380                        let span = tracing::debug_span!("lazy_load_identity");
381                        async move {
382                            let fut = Timeout::new(
383                                resolver.resolve_identity(runtime_components, config_bag),
384                                timeout_future,
385                            );
386                            let identity = match fut.await {
387                                Ok(result) => result?,
388                                Err(_err) => match resolver.fallback_on_interrupt() {
389                                    Some(identity) => identity,
390                                    None => {
391                                        return Err(BoxError::from(TimedOutError(load_timeout)))
392                                    }
393                                },
394                            };
395                            // If the identity don't have an expiration time, then create a default one
396                            let expiration =
397                                identity.expiration().unwrap_or(now + default_expiration);
398
399                            let jitter = self
400                                .buffer_time
401                                .mul_f64((self.buffer_time_jitter_fraction)());
402
403                            // Logging for cache miss should be emitted here as opposed to after the call to
404                            // `cache.get_or_load` above. In the case of multiple threads concurrently executing
405                            // `cache.get_or_load`, logging inside `cache.get_or_load` ensures that it is emitted
406                            // only once for the first thread that succeeds in populating a cache value.
407                            let printable = DateTime::from(expiration);
408                            tracing::debug!(
409                                new_expiration=%printable,
410                                valid_for=?expiration.duration_since(time_source.now()).unwrap_or_default(),
411                                partition=?partition,
412                                "identity cache miss occurred; added new identity (took {:?})",
413                                time_source.now().duration_since(start_time).unwrap_or_default()
414                            );
415
416                            Ok((identity, expiration + jitter))
417                        }
418                        // Only instrument the the actual load future so that no span
419                        // is opened if the cache decides not to execute it.
420                        .instrument(span)
421                    })
422                    .await;
423                tracing::debug!("loaded identity");
424                result
425            }
426        })
427    }
428}
429
430#[derive(Debug)]
431struct TimedOutError(Duration);
432
433impl std::error::Error for TimedOutError {}
434
435impl fmt::Display for TimedOutError {
436    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
437        write!(f, "identity resolver timed out after {:?}", self.0)
438    }
439}
440
441#[cfg(all(test, feature = "client", feature = "http-auth"))]
442mod tests {
443    use super::*;
444    use aws_smithy_async::rt::sleep::TokioSleep;
445    use aws_smithy_async::test_util::{instant_time_and_sleep, ManualTimeSource};
446    use aws_smithy_async::time::TimeSource;
447    use aws_smithy_runtime_api::client::identity::http::Token;
448    use aws_smithy_runtime_api::client::runtime_components::RuntimeComponentsBuilder;
449    use std::sync::atomic::{AtomicUsize, Ordering};
450    use std::sync::{Arc, Mutex};
451    use std::time::{Duration, SystemTime, UNIX_EPOCH};
452    use tracing::info;
453
454    const BUFFER_TIME_NO_JITTER: fn() -> f64 = || 0_f64;
455
456    struct ResolverFn<F>(F);
457    impl<F> fmt::Debug for ResolverFn<F> {
458        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
459            f.write_str("ResolverFn")
460        }
461    }
462    impl<F> ResolveIdentity for ResolverFn<F>
463    where
464        F: Fn() -> IdentityFuture<'static> + Send + Sync,
465    {
466        fn resolve_identity<'a>(
467            &'a self,
468            _: &'a RuntimeComponents,
469            _config_bag: &'a ConfigBag,
470        ) -> IdentityFuture<'a> {
471            (self.0)()
472        }
473    }
474
475    fn resolver_fn<F>(f: F) -> SharedIdentityResolver
476    where
477        F: Fn() -> IdentityFuture<'static> + Send + Sync + 'static,
478    {
479        SharedIdentityResolver::new(ResolverFn(f))
480    }
481
482    fn test_cache(
483        buffer_time_jitter_fraction: fn() -> f64,
484        load_list: Vec<Result<Identity, BoxError>>,
485    ) -> (LazyCache, SharedIdentityResolver) {
486        #[derive(Debug)]
487        struct Resolver(Mutex<Vec<Result<Identity, BoxError>>>);
488        impl ResolveIdentity for Resolver {
489            fn resolve_identity<'a>(
490                &'a self,
491                _: &'a RuntimeComponents,
492                _config_bag: &'a ConfigBag,
493            ) -> IdentityFuture<'a> {
494                let mut list = self.0.lock().unwrap();
495                if list.len() > 0 {
496                    let next = list.remove(0);
497                    info!("refreshing the identity to {:?}", next);
498                    IdentityFuture::ready(next)
499                } else {
500                    drop(list);
501                    panic!("no more identities")
502                }
503            }
504        }
505
506        let identity_resolver = SharedIdentityResolver::new(Resolver(Mutex::new(load_list)));
507        let cache = LazyCache::new(
508            DEFAULT_LOAD_TIMEOUT,
509            DEFAULT_BUFFER_TIME,
510            buffer_time_jitter_fraction,
511            DEFAULT_EXPIRATION,
512            DEFAULT_MAX_PARTITIONS,
513        );
514        (cache, identity_resolver)
515    }
516
517    fn epoch_secs(secs: u64) -> SystemTime {
518        SystemTime::UNIX_EPOCH + Duration::from_secs(secs)
519    }
520
521    fn test_identity(expired_secs: u64) -> Identity {
522        let expiration = Some(epoch_secs(expired_secs));
523        Identity::new(Token::new("test", expiration), expiration)
524    }
525
526    async fn expect_identity(
527        expired_secs: u64,
528        cache: &LazyCache,
529        components: &RuntimeComponents,
530        resolver: SharedIdentityResolver,
531    ) {
532        let config_bag = ConfigBag::base();
533        let identity = cache
534            .resolve_cached_identity(resolver, components, &config_bag)
535            .await
536            .expect("expected identity");
537        assert_eq!(Some(epoch_secs(expired_secs)), identity.expiration());
538    }
539
540    #[tokio::test]
541    async fn initial_populate_test_identity() {
542        let time = ManualTimeSource::new(UNIX_EPOCH);
543        let components = RuntimeComponentsBuilder::for_tests()
544            .with_time_source(Some(time.clone()))
545            .with_sleep_impl(Some(TokioSleep::new()))
546            .build()
547            .unwrap();
548        let config_bag = ConfigBag::base();
549        let resolver = SharedIdentityResolver::new(resolver_fn(|| {
550            info!("refreshing the test_identity");
551            IdentityFuture::ready(Ok(test_identity(1000)))
552        }));
553        let cache = LazyCache::new(
554            DEFAULT_LOAD_TIMEOUT,
555            DEFAULT_BUFFER_TIME,
556            BUFFER_TIME_NO_JITTER,
557            DEFAULT_EXPIRATION,
558            DEFAULT_MAX_PARTITIONS,
559        );
560        assert_eq!(
561            epoch_secs(1000),
562            cache
563                .resolve_cached_identity(resolver, &components, &config_bag)
564                .await
565                .unwrap()
566                .expiration()
567                .unwrap()
568        );
569    }
570
571    #[tokio::test]
572    async fn reload_expired_test_identity() {
573        let time = ManualTimeSource::new(epoch_secs(100));
574        let components = RuntimeComponentsBuilder::for_tests()
575            .with_time_source(Some(time.clone()))
576            .with_sleep_impl(Some(TokioSleep::new()))
577            .build()
578            .unwrap();
579        let (cache, resolver) = test_cache(
580            BUFFER_TIME_NO_JITTER,
581            vec![
582                Ok(test_identity(1000)),
583                Ok(test_identity(2000)),
584                Ok(test_identity(3000)),
585            ],
586        );
587
588        expect_identity(1000, &cache, &components, resolver.clone()).await;
589        expect_identity(1000, &cache, &components, resolver.clone()).await;
590        time.set_time(epoch_secs(1500));
591        expect_identity(2000, &cache, &components, resolver.clone()).await;
592        expect_identity(2000, &cache, &components, resolver.clone()).await;
593        time.set_time(epoch_secs(2500));
594        expect_identity(3000, &cache, &components, resolver.clone()).await;
595        expect_identity(3000, &cache, &components, resolver.clone()).await;
596    }
597
598    #[tokio::test]
599    async fn load_failed_error() {
600        let config_bag = ConfigBag::base();
601        let time = ManualTimeSource::new(epoch_secs(100));
602        let components = RuntimeComponentsBuilder::for_tests()
603            .with_time_source(Some(time.clone()))
604            .with_sleep_impl(Some(TokioSleep::new()))
605            .build()
606            .unwrap();
607        let (cache, resolver) = test_cache(
608            BUFFER_TIME_NO_JITTER,
609            vec![Ok(test_identity(1000)), Err("failed".into())],
610        );
611
612        expect_identity(1000, &cache, &components, resolver.clone()).await;
613        time.set_time(epoch_secs(1500));
614        assert!(cache
615            .resolve_cached_identity(resolver.clone(), &components, &config_bag)
616            .await
617            .is_err());
618    }
619
620    #[test]
621    fn load_contention() {
622        let rt = tokio::runtime::Builder::new_multi_thread()
623            .enable_time()
624            .worker_threads(16)
625            .build()
626            .unwrap();
627
628        let time = ManualTimeSource::new(epoch_secs(0));
629        let components = RuntimeComponentsBuilder::for_tests()
630            .with_time_source(Some(time.clone()))
631            .with_sleep_impl(Some(TokioSleep::new()))
632            .build()
633            .unwrap();
634        let (cache, resolver) = test_cache(
635            BUFFER_TIME_NO_JITTER,
636            vec![
637                Ok(test_identity(500)),
638                Ok(test_identity(1500)),
639                Ok(test_identity(2500)),
640                Ok(test_identity(3500)),
641                Ok(test_identity(4500)),
642            ],
643        );
644        let cache: SharedIdentityCache = cache.into_shared();
645
646        // test_identity are available up until 4500 seconds after the unix epoch
647        // 4*50 = 200 tasks are launched => we can advance time 4500/20 => 225 seconds per advance
648        for _ in 0..4 {
649            let mut tasks = Vec::new();
650            for _ in 0..50 {
651                let resolver = resolver.clone();
652                let cache = cache.clone();
653                let time = time.clone();
654                let components = components.clone();
655                tasks.push(rt.spawn(async move {
656                    let now = time.advance(Duration::from_secs(22));
657
658                    let config_bag = ConfigBag::base();
659                    let identity = cache
660                        .resolve_cached_identity(resolver, &components, &config_bag)
661                        .await
662                        .unwrap();
663                    assert!(
664                        identity.expiration().unwrap() >= now,
665                        "{:?} >= {:?}",
666                        identity.expiration(),
667                        now
668                    );
669                }));
670            }
671            for task in tasks {
672                rt.block_on(task).unwrap();
673            }
674        }
675    }
676
677    #[tokio::test]
678    async fn load_timeout() {
679        let config_bag = ConfigBag::base();
680        let (time, sleep) = instant_time_and_sleep(epoch_secs(100));
681        let components = RuntimeComponentsBuilder::for_tests()
682            .with_time_source(Some(time.clone()))
683            .with_sleep_impl(Some(sleep))
684            .build()
685            .unwrap();
686        let resolver = SharedIdentityResolver::new(resolver_fn(|| {
687            IdentityFuture::new(async {
688                aws_smithy_async::future::never::Never::new().await;
689                Ok(test_identity(1000))
690            })
691        }));
692        let cache = LazyCache::new(
693            Duration::from_secs(5),
694            DEFAULT_BUFFER_TIME,
695            BUFFER_TIME_NO_JITTER,
696            DEFAULT_EXPIRATION,
697            DEFAULT_MAX_PARTITIONS,
698        );
699
700        let err: BoxError = cache
701            .resolve_cached_identity(resolver, &components, &config_bag)
702            .await
703            .expect_err("it should return an error");
704        let downcasted = err.downcast_ref::<TimedOutError>();
705        assert!(
706            downcasted.is_some(),
707            "expected a BoxError of TimedOutError, but was {err:?}"
708        );
709        assert_eq!(time.now(), epoch_secs(105));
710    }
711
712    #[tokio::test]
713    async fn buffer_time_jitter() {
714        let time = ManualTimeSource::new(epoch_secs(100));
715        let components = RuntimeComponentsBuilder::for_tests()
716            .with_time_source(Some(time.clone()))
717            .with_sleep_impl(Some(TokioSleep::new()))
718            .build()
719            .unwrap();
720        let buffer_time_jitter_fraction = || 0.5_f64;
721        let (cache, resolver) = test_cache(
722            buffer_time_jitter_fraction,
723            vec![Ok(test_identity(1000)), Ok(test_identity(2000))],
724        );
725
726        expect_identity(1000, &cache, &components, resolver.clone()).await;
727        let buffer_time_with_jitter =
728            (DEFAULT_BUFFER_TIME.as_secs_f64() * buffer_time_jitter_fraction()) as u64;
729        assert_eq!(buffer_time_with_jitter, 5);
730        // Advance time to the point where the first test_identity are about to expire (but haven't).
731        let almost_expired_secs = 1000 - buffer_time_with_jitter - 1;
732        time.set_time(epoch_secs(almost_expired_secs));
733        // We should still use the first test_identity.
734        expect_identity(1000, &cache, &components, resolver.clone()).await;
735        // Now let the first test_identity expire.
736        let expired_secs = almost_expired_secs + 1;
737        time.set_time(epoch_secs(expired_secs));
738        // Now that the first test_identity have been expired, the second test_identity will be retrieved.
739        expect_identity(2000, &cache, &components, resolver.clone()).await;
740    }
741
742    #[tokio::test]
743    async fn cache_partitioning() {
744        let time = ManualTimeSource::new(epoch_secs(0));
745        let components = RuntimeComponentsBuilder::for_tests()
746            .with_time_source(Some(time.clone()))
747            .with_sleep_impl(Some(TokioSleep::new()))
748            .build()
749            .unwrap();
750        let (cache, _) = test_cache(BUFFER_TIME_NO_JITTER, Vec::new());
751
752        #[allow(clippy::disallowed_methods)]
753        let far_future = SystemTime::now() + Duration::from_secs(10_000);
754
755        // Resolver A and B both return an identical identity type with different tokens with an expiration
756        // time that should NOT be hit within this test. They each have their own partition key.
757        let resolver_a_calls = Arc::new(AtomicUsize::new(0));
758        let resolver_b_calls = Arc::new(AtomicUsize::new(0));
759        let resolver_a = resolver_fn({
760            let calls = resolver_a_calls.clone();
761            move || {
762                calls.fetch_add(1, Ordering::Relaxed);
763                IdentityFuture::ready(Ok(Identity::new(
764                    Token::new("A", Some(far_future)),
765                    Some(far_future),
766                )))
767            }
768        });
769        let resolver_b = resolver_fn({
770            let calls = resolver_b_calls.clone();
771            move || {
772                calls.fetch_add(1, Ordering::Relaxed);
773                IdentityFuture::ready(Ok(Identity::new(
774                    Token::new("B", Some(far_future)),
775                    Some(far_future),
776                )))
777            }
778        });
779        assert_ne!(
780            resolver_a.cache_partition(),
781            resolver_b.cache_partition(),
782            "pre-condition: they should have different partition keys"
783        );
784
785        let config_bag = ConfigBag::base();
786
787        // Loading the identity twice with resolver A should result in a single call
788        // to the underlying identity resolver since the result gets cached.
789        let identity = cache
790            .resolve_cached_identity(resolver_a.clone(), &components, &config_bag)
791            .await
792            .unwrap();
793        assert_eq!("A", identity.data::<Token>().unwrap().token());
794        let identity = cache
795            .resolve_cached_identity(resolver_a.clone(), &components, &config_bag)
796            .await
797            .unwrap();
798        assert_eq!("A", identity.data::<Token>().unwrap().token());
799        assert_eq!(1, resolver_a_calls.load(Ordering::Relaxed));
800
801        // Now, loading an identity from B will use a separate cache partition
802        // and return a different result.
803        let identity = cache
804            .resolve_cached_identity(resolver_b.clone(), &components, &config_bag)
805            .await
806            .unwrap();
807        assert_eq!("B", identity.data::<Token>().unwrap().token());
808        let identity = cache
809            .resolve_cached_identity(resolver_b.clone(), &components, &config_bag)
810            .await
811            .unwrap();
812        assert_eq!("B", identity.data::<Token>().unwrap().token());
813        assert_eq!(1, resolver_a_calls.load(Ordering::Relaxed));
814        assert_eq!(1, resolver_b_calls.load(Ordering::Relaxed));
815
816        // Finally, loading with resolver A again should return the original cached A value
817        let identity = cache
818            .resolve_cached_identity(resolver_a.clone(), &components, &config_bag)
819            .await
820            .unwrap();
821        assert_eq!("A", identity.data::<Token>().unwrap().token());
822        assert_eq!(1, resolver_a_calls.load(Ordering::Relaxed));
823        assert_eq!(1, resolver_b_calls.load(Ordering::Relaxed));
824    }
825
826    #[tokio::test]
827    async fn eviction_when_at_capacity() {
828        let time = ManualTimeSource::new(epoch_secs(0));
829        let components = RuntimeComponentsBuilder::for_tests()
830            .with_time_source(Some(time.clone()))
831            .with_sleep_impl(Some(TokioSleep::new()))
832            .build()
833            .unwrap();
834        // Create a cache with max_partitions=2
835        let cache = LazyCache::new(
836            DEFAULT_LOAD_TIMEOUT,
837            DEFAULT_BUFFER_TIME,
838            BUFFER_TIME_NO_JITTER,
839            DEFAULT_EXPIRATION,
840            2,
841        );
842
843        #[allow(clippy::disallowed_methods)]
844        let far_future = SystemTime::now() + Duration::from_secs(10_000);
845
846        let resolver_a_calls = Arc::new(AtomicUsize::new(0));
847        let resolver_b_calls = Arc::new(AtomicUsize::new(0));
848        let resolver_c_calls = Arc::new(AtomicUsize::new(0));
849
850        let resolver_a = resolver_fn({
851            let calls = resolver_a_calls.clone();
852            move || {
853                calls.fetch_add(1, Ordering::Relaxed);
854                IdentityFuture::ready(Ok(Identity::new(
855                    Token::new("A", Some(far_future)),
856                    Some(far_future),
857                )))
858            }
859        });
860        let resolver_b = resolver_fn({
861            let calls = resolver_b_calls.clone();
862            move || {
863                calls.fetch_add(1, Ordering::Relaxed);
864                IdentityFuture::ready(Ok(Identity::new(
865                    Token::new("B", Some(far_future)),
866                    Some(far_future),
867                )))
868            }
869        });
870        let resolver_c = resolver_fn({
871            let calls = resolver_c_calls.clone();
872            move || {
873                calls.fetch_add(1, Ordering::Relaxed);
874                IdentityFuture::ready(Ok(Identity::new(
875                    Token::new("C", Some(far_future)),
876                    Some(far_future),
877                )))
878            }
879        });
880
881        let config_bag = ConfigBag::base();
882
883        // Fill the cache with A and B
884        cache
885            .resolve_cached_identity(resolver_a.clone(), &components, &config_bag)
886            .await
887            .unwrap();
888        cache
889            .resolve_cached_identity(resolver_b.clone(), &components, &config_bag)
890            .await
891            .unwrap();
892        assert_eq!(1, resolver_a_calls.load(Ordering::Relaxed));
893        assert_eq!(1, resolver_b_calls.load(Ordering::Relaxed));
894
895        // Adding C should evict one of A or B (arbitrary eviction order)
896        cache
897            .resolve_cached_identity(resolver_c.clone(), &components, &config_bag)
898            .await
899            .unwrap();
900        assert_eq!(1, resolver_c_calls.load(Ordering::Relaxed));
901
902        // Resolve all three again — at least one of A or B must be re-resolved because
903        // the cache only holds 2 partitions. Depending on HashMap iteration order, re-inserting
904        // the evicted entry may cascade-evict the other, leading to 4 or 5 total calls.
905        cache
906            .resolve_cached_identity(resolver_a.clone(), &components, &config_bag)
907            .await
908            .unwrap();
909        cache
910            .resolve_cached_identity(resolver_b.clone(), &components, &config_bag)
911            .await
912            .unwrap();
913        let total_calls = resolver_a_calls.load(Ordering::Relaxed)
914            + resolver_b_calls.load(Ordering::Relaxed)
915            + resolver_c_calls.load(Ordering::Relaxed);
916        // Initial: 3 calls (A, B, C). At least one of A or B was evicted and re-resolved (+1).
917        // If re-inserting the evicted entry cascade-evicts the other, both need re-resolution (+2).
918        assert!(
919            (4..=5).contains(&total_calls),
920            "expected 4 or 5 total calls (3 initial + 1 or 2 re-resolutions), got {total_calls}"
921        );
922    }
923
924    #[tokio::test]
925    async fn single_partition_cache() {
926        let time = ManualTimeSource::new(epoch_secs(0));
927        let components = RuntimeComponentsBuilder::for_tests()
928            .with_time_source(Some(time.clone()))
929            .with_sleep_impl(Some(TokioSleep::new()))
930            .build()
931            .unwrap();
932        // Mimics the operation-scoped cache used for config overrides
933        let cache = LazyCache::new(
934            DEFAULT_LOAD_TIMEOUT,
935            DEFAULT_BUFFER_TIME,
936            BUFFER_TIME_NO_JITTER,
937            DEFAULT_EXPIRATION,
938            1,
939        );
940
941        #[allow(clippy::disallowed_methods)]
942        let far_future = SystemTime::now() + Duration::from_secs(10_000);
943
944        let resolver_a_calls = Arc::new(AtomicUsize::new(0));
945        let resolver_b_calls = Arc::new(AtomicUsize::new(0));
946
947        let resolver_a = resolver_fn({
948            let calls = resolver_a_calls.clone();
949            move || {
950                calls.fetch_add(1, Ordering::Relaxed);
951                IdentityFuture::ready(Ok(Identity::new(
952                    Token::new("A", Some(far_future)),
953                    Some(far_future),
954                )))
955            }
956        });
957        let resolver_b = resolver_fn({
958            let calls = resolver_b_calls.clone();
959            move || {
960                calls.fetch_add(1, Ordering::Relaxed);
961                IdentityFuture::ready(Ok(Identity::new(
962                    Token::new("B", Some(far_future)),
963                    Some(far_future),
964                )))
965            }
966        });
967
968        let config_bag = ConfigBag::base();
969
970        // First call resolves A
971        let identity = cache
972            .resolve_cached_identity(resolver_a.clone(), &components, &config_bag)
973            .await
974            .unwrap();
975        assert_eq!("A", identity.data::<Token>().unwrap().token());
976        assert_eq!(1, resolver_a_calls.load(Ordering::Relaxed));
977
978        // Second call with same resolver is cached
979        let identity = cache
980            .resolve_cached_identity(resolver_a.clone(), &components, &config_bag)
981            .await
982            .unwrap();
983        assert_eq!("A", identity.data::<Token>().unwrap().token());
984        assert_eq!(1, resolver_a_calls.load(Ordering::Relaxed));
985
986        // Resolving B evicts A (only 1 partition)
987        let identity = cache
988            .resolve_cached_identity(resolver_b.clone(), &components, &config_bag)
989            .await
990            .unwrap();
991        assert_eq!("B", identity.data::<Token>().unwrap().token());
992        assert_eq!(1, resolver_b_calls.load(Ordering::Relaxed));
993
994        // A must be re-resolved
995        let identity = cache
996            .resolve_cached_identity(resolver_a.clone(), &components, &config_bag)
997            .await
998            .unwrap();
999        assert_eq!("A", identity.data::<Token>().unwrap().token());
1000        assert_eq!(2, resolver_a_calls.load(Ordering::Relaxed));
1001    }
1002
1003    #[test]
1004    #[should_panic(expected = "max_partitions must be greater than 0")]
1005    fn max_partitions_zero_panics() {
1006        LazyCacheBuilder::new().max_partitions(0);
1007    }
1008}