1use crate::expiring_cache::ExpiringCache;
7use aws_smithy_async::future::timeout::Timeout;
8use aws_smithy_async::rt::sleep::{AsyncSleep, SharedAsyncSleep};
9use aws_smithy_async::time::{SharedTimeSource, TimeSource};
10use aws_smithy_runtime_api::box_error::BoxError;
11use aws_smithy_runtime_api::client::identity::{
12 Identity, IdentityCachePartition, IdentityFuture, ResolveCachedIdentity, ResolveIdentity,
13 SharedIdentityCache, SharedIdentityResolver,
14};
15use aws_smithy_runtime_api::client::runtime_components::RuntimeComponents;
16use aws_smithy_runtime_api::shared::IntoShared;
17use aws_smithy_types::config_bag::ConfigBag;
18use aws_smithy_types::DateTime;
19use std::collections::HashMap;
20use std::fmt;
21use std::sync::RwLock;
22use std::time::Duration;
23use tracing::Instrument;
24
25const DEFAULT_LOAD_TIMEOUT: Duration = Duration::from_secs(5);
26const DEFAULT_EXPIRATION: Duration = Duration::from_secs(15 * 60);
27const DEFAULT_BUFFER_TIME: Duration = Duration::from_secs(10);
28const DEFAULT_BUFFER_TIME_JITTER_FRACTION: fn() -> f64 = || fastrand::f64() * 0.5;
29const DEFAULT_MAX_PARTITIONS: usize = 64;
30
31#[derive(Default, Debug)]
33pub struct LazyCacheBuilder {
34 time_source: Option<SharedTimeSource>,
35 sleep_impl: Option<SharedAsyncSleep>,
36 load_timeout: Option<Duration>,
37 buffer_time: Option<Duration>,
38 buffer_time_jitter_fraction: Option<fn() -> f64>,
39 default_expiration: Option<Duration>,
40 max_partitions: Option<usize>,
41}
42
43impl LazyCacheBuilder {
44 pub fn new() -> Self {
46 Default::default()
47 }
48
49 pub fn time_source(mut self, time_source: impl TimeSource + 'static) -> Self {
51 self.set_time_source(time_source.into_shared());
52 self
53 }
54 pub fn set_time_source(&mut self, time_source: SharedTimeSource) -> &mut Self {
56 self.time_source = Some(time_source.into_shared());
57 self
58 }
59
60 pub fn sleep_impl(mut self, sleep_impl: impl AsyncSleep + 'static) -> Self {
62 self.set_sleep_impl(sleep_impl.into_shared());
63 self
64 }
65 pub fn set_sleep_impl(&mut self, sleep_impl: SharedAsyncSleep) -> &mut Self {
67 self.sleep_impl = Some(sleep_impl);
68 self
69 }
70
71 pub fn load_timeout(mut self, timeout: Duration) -> Self {
75 self.set_load_timeout(Some(timeout));
76 self
77 }
78
79 pub fn set_load_timeout(&mut self, timeout: Option<Duration>) -> &mut Self {
83 self.load_timeout = timeout;
84 self
85 }
86
87 pub fn buffer_time(mut self, buffer_time: Duration) -> Self {
96 self.set_buffer_time(Some(buffer_time));
97 self
98 }
99
100 pub fn set_buffer_time(&mut self, buffer_time: Option<Duration>) -> &mut Self {
109 self.buffer_time = buffer_time;
110 self
111 }
112
113 #[allow(unused)]
121 #[cfg(test)]
122 fn buffer_time_jitter_fraction(mut self, buffer_time_jitter_fraction: fn() -> f64) -> Self {
123 self.set_buffer_time_jitter_fraction(Some(buffer_time_jitter_fraction));
124 self
125 }
126
127 #[allow(unused)]
135 #[cfg(test)]
136 fn set_buffer_time_jitter_fraction(
137 &mut self,
138 buffer_time_jitter_fraction: Option<fn() -> f64>,
139 ) -> &mut Self {
140 self.buffer_time_jitter_fraction = buffer_time_jitter_fraction;
141 self
142 }
143
144 pub fn default_expiration(mut self, duration: Duration) -> Self {
151 self.set_default_expiration(Some(duration));
152 self
153 }
154
155 pub fn set_default_expiration(&mut self, duration: Option<Duration>) -> &mut Self {
162 self.default_expiration = duration;
163 self
164 }
165
166 pub fn max_partitions(mut self, max: usize) -> Self {
178 self.set_max_partitions(Some(max));
179 self
180 }
181
182 pub fn set_max_partitions(&mut self, max: Option<usize>) -> &mut Self {
194 if let Some(0) = max {
195 panic!("max_partitions must be greater than 0");
196 }
197 self.max_partitions = max;
198 self
199 }
200
201 pub fn build(self) -> SharedIdentityCache {
207 let default_expiration = self.default_expiration.unwrap_or(DEFAULT_EXPIRATION);
208 assert!(
209 default_expiration >= DEFAULT_EXPIRATION,
210 "default_expiration must be at least 15 minutes"
211 );
212 LazyCache::new(
213 self.load_timeout.unwrap_or(DEFAULT_LOAD_TIMEOUT),
214 self.buffer_time.unwrap_or(DEFAULT_BUFFER_TIME),
215 self.buffer_time_jitter_fraction
216 .unwrap_or(DEFAULT_BUFFER_TIME_JITTER_FRACTION),
217 default_expiration,
218 self.max_partitions.unwrap_or(DEFAULT_MAX_PARTITIONS),
219 )
220 .into_shared()
221 }
222}
223
224#[derive(Debug)]
225struct CachePartitions {
226 partitions: RwLock<HashMap<IdentityCachePartition, ExpiringCache<Identity, BoxError>>>,
227 buffer_time: Duration,
228 max_partitions: usize,
229}
230
231impl CachePartitions {
232 fn new(buffer_time: Duration, max_partitions: usize) -> Self {
233 Self {
234 partitions: RwLock::new(HashMap::new()),
235 buffer_time,
236 max_partitions,
237 }
238 }
239
240 fn partition(&self, key: IdentityCachePartition) -> ExpiringCache<Identity, BoxError> {
241 if let Some(partition) = self.partitions.read().unwrap().get(&key).cloned() {
243 return partition;
244 }
245 let mut partitions = self.partitions.write().unwrap();
247 if let Some(partition) = partitions.get(&key).cloned() {
249 return partition;
250 }
251 if partitions.len() >= self.max_partitions {
255 if let Some(&evict_key) = partitions.keys().next() {
256 partitions.remove(&evict_key);
257 }
258 }
259 let partition = ExpiringCache::new(self.buffer_time);
260 partitions.insert(key, partition.clone());
261 tracing::debug!(
262 partition_count = partitions.len(),
263 "identity cache partition created"
264 );
265 partition
266 }
267}
268
269#[derive(Debug)]
270struct LazyCache {
271 partitions: CachePartitions,
272 load_timeout: Duration,
273 buffer_time: Duration,
274 buffer_time_jitter_fraction: fn() -> f64,
275 default_expiration: Duration,
276}
277
278impl LazyCache {
279 fn new(
280 load_timeout: Duration,
281 buffer_time: Duration,
282 buffer_time_jitter_fraction: fn() -> f64,
283 default_expiration: Duration,
284 max_partitions: usize,
285 ) -> Self {
286 Self {
287 partitions: CachePartitions::new(buffer_time, max_partitions),
288 load_timeout,
289 buffer_time,
290 buffer_time_jitter_fraction,
291 default_expiration,
292 }
293 }
294}
295
296macro_rules! required_err {
297 ($thing:literal, $how:literal) => {
298 BoxError::from(concat!(
299 "Lazy identity caching requires ",
300 $thing,
301 " to be configured. ",
302 $how,
303 " If this isn't possible, then disable identity caching by calling ",
304 "the `identity_cache` method on config with `IdentityCache::no_cache()`",
305 ))
306 };
307}
308macro_rules! validate_components {
309 ($components:ident) => {
310 let _ = $components.time_source().ok_or_else(|| {
311 required_err!(
312 "a time source",
313 "Set a time source using the `time_source` method on config."
314 )
315 })?;
316 let _ = $components.sleep_impl().ok_or_else(|| {
317 required_err!(
318 "an async sleep implementation",
319 "Set a sleep impl using the `sleep_impl` method on config."
320 )
321 })?;
322 };
323}
324
325impl ResolveCachedIdentity for LazyCache {
326 fn validate_base_client_config(
327 &self,
328 runtime_components: &aws_smithy_runtime_api::client::runtime_components::RuntimeComponentsBuilder,
329 _cfg: &ConfigBag,
330 ) -> Result<(), BoxError> {
331 validate_components!(runtime_components);
332 Ok(())
333 }
334
335 fn validate_final_config(
336 &self,
337 runtime_components: &RuntimeComponents,
338 _cfg: &ConfigBag,
339 ) -> Result<(), BoxError> {
340 validate_components!(runtime_components);
341 Ok(())
342 }
343
344 fn resolve_cached_identity<'a>(
345 &'a self,
346 resolver: SharedIdentityResolver,
347 runtime_components: &'a RuntimeComponents,
348 config_bag: &'a ConfigBag,
349 ) -> IdentityFuture<'a> {
350 let (time_source, sleep_impl) = (
351 runtime_components.time_source().expect("validated"),
352 runtime_components.sleep_impl().expect("validated"),
353 );
354
355 let now = time_source.now();
356 let timeout_future = sleep_impl.sleep(self.load_timeout);
357 let load_timeout = self.load_timeout;
358 let partition = resolver.cache_partition();
359 let cache = self.partitions.partition(partition);
360 let default_expiration = self.default_expiration;
361
362 IdentityFuture::new(async move {
363 if let Some(identity) = cache.yield_or_clear_if_expired(now).await {
365 tracing::debug!(
366 buffer_time=?self.buffer_time,
367 cached_expiration=?identity.expiration(),
368 now=?now,
369 "loaded identity from cache"
370 );
371 Ok(identity)
372 } else {
373 let start_time = time_source.now();
378 let result = cache
379 .get_or_load(|| {
380 let span = tracing::debug_span!("lazy_load_identity");
381 async move {
382 let fut = Timeout::new(
383 resolver.resolve_identity(runtime_components, config_bag),
384 timeout_future,
385 );
386 let identity = match fut.await {
387 Ok(result) => result?,
388 Err(_err) => match resolver.fallback_on_interrupt() {
389 Some(identity) => identity,
390 None => {
391 return Err(BoxError::from(TimedOutError(load_timeout)))
392 }
393 },
394 };
395 let expiration =
397 identity.expiration().unwrap_or(now + default_expiration);
398
399 let jitter = self
400 .buffer_time
401 .mul_f64((self.buffer_time_jitter_fraction)());
402
403 let printable = DateTime::from(expiration);
408 tracing::debug!(
409 new_expiration=%printable,
410 valid_for=?expiration.duration_since(time_source.now()).unwrap_or_default(),
411 partition=?partition,
412 "identity cache miss occurred; added new identity (took {:?})",
413 time_source.now().duration_since(start_time).unwrap_or_default()
414 );
415
416 Ok((identity, expiration + jitter))
417 }
418 .instrument(span)
421 })
422 .await;
423 tracing::debug!("loaded identity");
424 result
425 }
426 })
427 }
428}
429
430#[derive(Debug)]
431struct TimedOutError(Duration);
432
433impl std::error::Error for TimedOutError {}
434
435impl fmt::Display for TimedOutError {
436 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
437 write!(f, "identity resolver timed out after {:?}", self.0)
438 }
439}
440
441#[cfg(all(test, feature = "client", feature = "http-auth"))]
442mod tests {
443 use super::*;
444 use aws_smithy_async::rt::sleep::TokioSleep;
445 use aws_smithy_async::test_util::{instant_time_and_sleep, ManualTimeSource};
446 use aws_smithy_async::time::TimeSource;
447 use aws_smithy_runtime_api::client::identity::http::Token;
448 use aws_smithy_runtime_api::client::runtime_components::RuntimeComponentsBuilder;
449 use std::sync::atomic::{AtomicUsize, Ordering};
450 use std::sync::{Arc, Mutex};
451 use std::time::{Duration, SystemTime, UNIX_EPOCH};
452 use tracing::info;
453
454 const BUFFER_TIME_NO_JITTER: fn() -> f64 = || 0_f64;
455
456 struct ResolverFn<F>(F);
457 impl<F> fmt::Debug for ResolverFn<F> {
458 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
459 f.write_str("ResolverFn")
460 }
461 }
462 impl<F> ResolveIdentity for ResolverFn<F>
463 where
464 F: Fn() -> IdentityFuture<'static> + Send + Sync,
465 {
466 fn resolve_identity<'a>(
467 &'a self,
468 _: &'a RuntimeComponents,
469 _config_bag: &'a ConfigBag,
470 ) -> IdentityFuture<'a> {
471 (self.0)()
472 }
473 }
474
475 fn resolver_fn<F>(f: F) -> SharedIdentityResolver
476 where
477 F: Fn() -> IdentityFuture<'static> + Send + Sync + 'static,
478 {
479 SharedIdentityResolver::new(ResolverFn(f))
480 }
481
482 fn test_cache(
483 buffer_time_jitter_fraction: fn() -> f64,
484 load_list: Vec<Result<Identity, BoxError>>,
485 ) -> (LazyCache, SharedIdentityResolver) {
486 #[derive(Debug)]
487 struct Resolver(Mutex<Vec<Result<Identity, BoxError>>>);
488 impl ResolveIdentity for Resolver {
489 fn resolve_identity<'a>(
490 &'a self,
491 _: &'a RuntimeComponents,
492 _config_bag: &'a ConfigBag,
493 ) -> IdentityFuture<'a> {
494 let mut list = self.0.lock().unwrap();
495 if list.len() > 0 {
496 let next = list.remove(0);
497 info!("refreshing the identity to {:?}", next);
498 IdentityFuture::ready(next)
499 } else {
500 drop(list);
501 panic!("no more identities")
502 }
503 }
504 }
505
506 let identity_resolver = SharedIdentityResolver::new(Resolver(Mutex::new(load_list)));
507 let cache = LazyCache::new(
508 DEFAULT_LOAD_TIMEOUT,
509 DEFAULT_BUFFER_TIME,
510 buffer_time_jitter_fraction,
511 DEFAULT_EXPIRATION,
512 DEFAULT_MAX_PARTITIONS,
513 );
514 (cache, identity_resolver)
515 }
516
517 fn epoch_secs(secs: u64) -> SystemTime {
518 SystemTime::UNIX_EPOCH + Duration::from_secs(secs)
519 }
520
521 fn test_identity(expired_secs: u64) -> Identity {
522 let expiration = Some(epoch_secs(expired_secs));
523 Identity::new(Token::new("test", expiration), expiration)
524 }
525
526 async fn expect_identity(
527 expired_secs: u64,
528 cache: &LazyCache,
529 components: &RuntimeComponents,
530 resolver: SharedIdentityResolver,
531 ) {
532 let config_bag = ConfigBag::base();
533 let identity = cache
534 .resolve_cached_identity(resolver, components, &config_bag)
535 .await
536 .expect("expected identity");
537 assert_eq!(Some(epoch_secs(expired_secs)), identity.expiration());
538 }
539
540 #[tokio::test]
541 async fn initial_populate_test_identity() {
542 let time = ManualTimeSource::new(UNIX_EPOCH);
543 let components = RuntimeComponentsBuilder::for_tests()
544 .with_time_source(Some(time.clone()))
545 .with_sleep_impl(Some(TokioSleep::new()))
546 .build()
547 .unwrap();
548 let config_bag = ConfigBag::base();
549 let resolver = SharedIdentityResolver::new(resolver_fn(|| {
550 info!("refreshing the test_identity");
551 IdentityFuture::ready(Ok(test_identity(1000)))
552 }));
553 let cache = LazyCache::new(
554 DEFAULT_LOAD_TIMEOUT,
555 DEFAULT_BUFFER_TIME,
556 BUFFER_TIME_NO_JITTER,
557 DEFAULT_EXPIRATION,
558 DEFAULT_MAX_PARTITIONS,
559 );
560 assert_eq!(
561 epoch_secs(1000),
562 cache
563 .resolve_cached_identity(resolver, &components, &config_bag)
564 .await
565 .unwrap()
566 .expiration()
567 .unwrap()
568 );
569 }
570
571 #[tokio::test]
572 async fn reload_expired_test_identity() {
573 let time = ManualTimeSource::new(epoch_secs(100));
574 let components = RuntimeComponentsBuilder::for_tests()
575 .with_time_source(Some(time.clone()))
576 .with_sleep_impl(Some(TokioSleep::new()))
577 .build()
578 .unwrap();
579 let (cache, resolver) = test_cache(
580 BUFFER_TIME_NO_JITTER,
581 vec![
582 Ok(test_identity(1000)),
583 Ok(test_identity(2000)),
584 Ok(test_identity(3000)),
585 ],
586 );
587
588 expect_identity(1000, &cache, &components, resolver.clone()).await;
589 expect_identity(1000, &cache, &components, resolver.clone()).await;
590 time.set_time(epoch_secs(1500));
591 expect_identity(2000, &cache, &components, resolver.clone()).await;
592 expect_identity(2000, &cache, &components, resolver.clone()).await;
593 time.set_time(epoch_secs(2500));
594 expect_identity(3000, &cache, &components, resolver.clone()).await;
595 expect_identity(3000, &cache, &components, resolver.clone()).await;
596 }
597
598 #[tokio::test]
599 async fn load_failed_error() {
600 let config_bag = ConfigBag::base();
601 let time = ManualTimeSource::new(epoch_secs(100));
602 let components = RuntimeComponentsBuilder::for_tests()
603 .with_time_source(Some(time.clone()))
604 .with_sleep_impl(Some(TokioSleep::new()))
605 .build()
606 .unwrap();
607 let (cache, resolver) = test_cache(
608 BUFFER_TIME_NO_JITTER,
609 vec![Ok(test_identity(1000)), Err("failed".into())],
610 );
611
612 expect_identity(1000, &cache, &components, resolver.clone()).await;
613 time.set_time(epoch_secs(1500));
614 assert!(cache
615 .resolve_cached_identity(resolver.clone(), &components, &config_bag)
616 .await
617 .is_err());
618 }
619
620 #[test]
621 fn load_contention() {
622 let rt = tokio::runtime::Builder::new_multi_thread()
623 .enable_time()
624 .worker_threads(16)
625 .build()
626 .unwrap();
627
628 let time = ManualTimeSource::new(epoch_secs(0));
629 let components = RuntimeComponentsBuilder::for_tests()
630 .with_time_source(Some(time.clone()))
631 .with_sleep_impl(Some(TokioSleep::new()))
632 .build()
633 .unwrap();
634 let (cache, resolver) = test_cache(
635 BUFFER_TIME_NO_JITTER,
636 vec![
637 Ok(test_identity(500)),
638 Ok(test_identity(1500)),
639 Ok(test_identity(2500)),
640 Ok(test_identity(3500)),
641 Ok(test_identity(4500)),
642 ],
643 );
644 let cache: SharedIdentityCache = cache.into_shared();
645
646 for _ in 0..4 {
649 let mut tasks = Vec::new();
650 for _ in 0..50 {
651 let resolver = resolver.clone();
652 let cache = cache.clone();
653 let time = time.clone();
654 let components = components.clone();
655 tasks.push(rt.spawn(async move {
656 let now = time.advance(Duration::from_secs(22));
657
658 let config_bag = ConfigBag::base();
659 let identity = cache
660 .resolve_cached_identity(resolver, &components, &config_bag)
661 .await
662 .unwrap();
663 assert!(
664 identity.expiration().unwrap() >= now,
665 "{:?} >= {:?}",
666 identity.expiration(),
667 now
668 );
669 }));
670 }
671 for task in tasks {
672 rt.block_on(task).unwrap();
673 }
674 }
675 }
676
677 #[tokio::test]
678 async fn load_timeout() {
679 let config_bag = ConfigBag::base();
680 let (time, sleep) = instant_time_and_sleep(epoch_secs(100));
681 let components = RuntimeComponentsBuilder::for_tests()
682 .with_time_source(Some(time.clone()))
683 .with_sleep_impl(Some(sleep))
684 .build()
685 .unwrap();
686 let resolver = SharedIdentityResolver::new(resolver_fn(|| {
687 IdentityFuture::new(async {
688 aws_smithy_async::future::never::Never::new().await;
689 Ok(test_identity(1000))
690 })
691 }));
692 let cache = LazyCache::new(
693 Duration::from_secs(5),
694 DEFAULT_BUFFER_TIME,
695 BUFFER_TIME_NO_JITTER,
696 DEFAULT_EXPIRATION,
697 DEFAULT_MAX_PARTITIONS,
698 );
699
700 let err: BoxError = cache
701 .resolve_cached_identity(resolver, &components, &config_bag)
702 .await
703 .expect_err("it should return an error");
704 let downcasted = err.downcast_ref::<TimedOutError>();
705 assert!(
706 downcasted.is_some(),
707 "expected a BoxError of TimedOutError, but was {err:?}"
708 );
709 assert_eq!(time.now(), epoch_secs(105));
710 }
711
712 #[tokio::test]
713 async fn buffer_time_jitter() {
714 let time = ManualTimeSource::new(epoch_secs(100));
715 let components = RuntimeComponentsBuilder::for_tests()
716 .with_time_source(Some(time.clone()))
717 .with_sleep_impl(Some(TokioSleep::new()))
718 .build()
719 .unwrap();
720 let buffer_time_jitter_fraction = || 0.5_f64;
721 let (cache, resolver) = test_cache(
722 buffer_time_jitter_fraction,
723 vec![Ok(test_identity(1000)), Ok(test_identity(2000))],
724 );
725
726 expect_identity(1000, &cache, &components, resolver.clone()).await;
727 let buffer_time_with_jitter =
728 (DEFAULT_BUFFER_TIME.as_secs_f64() * buffer_time_jitter_fraction()) as u64;
729 assert_eq!(buffer_time_with_jitter, 5);
730 let almost_expired_secs = 1000 - buffer_time_with_jitter - 1;
732 time.set_time(epoch_secs(almost_expired_secs));
733 expect_identity(1000, &cache, &components, resolver.clone()).await;
735 let expired_secs = almost_expired_secs + 1;
737 time.set_time(epoch_secs(expired_secs));
738 expect_identity(2000, &cache, &components, resolver.clone()).await;
740 }
741
742 #[tokio::test]
743 async fn cache_partitioning() {
744 let time = ManualTimeSource::new(epoch_secs(0));
745 let components = RuntimeComponentsBuilder::for_tests()
746 .with_time_source(Some(time.clone()))
747 .with_sleep_impl(Some(TokioSleep::new()))
748 .build()
749 .unwrap();
750 let (cache, _) = test_cache(BUFFER_TIME_NO_JITTER, Vec::new());
751
752 #[allow(clippy::disallowed_methods)]
753 let far_future = SystemTime::now() + Duration::from_secs(10_000);
754
755 let resolver_a_calls = Arc::new(AtomicUsize::new(0));
758 let resolver_b_calls = Arc::new(AtomicUsize::new(0));
759 let resolver_a = resolver_fn({
760 let calls = resolver_a_calls.clone();
761 move || {
762 calls.fetch_add(1, Ordering::Relaxed);
763 IdentityFuture::ready(Ok(Identity::new(
764 Token::new("A", Some(far_future)),
765 Some(far_future),
766 )))
767 }
768 });
769 let resolver_b = resolver_fn({
770 let calls = resolver_b_calls.clone();
771 move || {
772 calls.fetch_add(1, Ordering::Relaxed);
773 IdentityFuture::ready(Ok(Identity::new(
774 Token::new("B", Some(far_future)),
775 Some(far_future),
776 )))
777 }
778 });
779 assert_ne!(
780 resolver_a.cache_partition(),
781 resolver_b.cache_partition(),
782 "pre-condition: they should have different partition keys"
783 );
784
785 let config_bag = ConfigBag::base();
786
787 let identity = cache
790 .resolve_cached_identity(resolver_a.clone(), &components, &config_bag)
791 .await
792 .unwrap();
793 assert_eq!("A", identity.data::<Token>().unwrap().token());
794 let identity = cache
795 .resolve_cached_identity(resolver_a.clone(), &components, &config_bag)
796 .await
797 .unwrap();
798 assert_eq!("A", identity.data::<Token>().unwrap().token());
799 assert_eq!(1, resolver_a_calls.load(Ordering::Relaxed));
800
801 let identity = cache
804 .resolve_cached_identity(resolver_b.clone(), &components, &config_bag)
805 .await
806 .unwrap();
807 assert_eq!("B", identity.data::<Token>().unwrap().token());
808 let identity = cache
809 .resolve_cached_identity(resolver_b.clone(), &components, &config_bag)
810 .await
811 .unwrap();
812 assert_eq!("B", identity.data::<Token>().unwrap().token());
813 assert_eq!(1, resolver_a_calls.load(Ordering::Relaxed));
814 assert_eq!(1, resolver_b_calls.load(Ordering::Relaxed));
815
816 let identity = cache
818 .resolve_cached_identity(resolver_a.clone(), &components, &config_bag)
819 .await
820 .unwrap();
821 assert_eq!("A", identity.data::<Token>().unwrap().token());
822 assert_eq!(1, resolver_a_calls.load(Ordering::Relaxed));
823 assert_eq!(1, resolver_b_calls.load(Ordering::Relaxed));
824 }
825
826 #[tokio::test]
827 async fn eviction_when_at_capacity() {
828 let time = ManualTimeSource::new(epoch_secs(0));
829 let components = RuntimeComponentsBuilder::for_tests()
830 .with_time_source(Some(time.clone()))
831 .with_sleep_impl(Some(TokioSleep::new()))
832 .build()
833 .unwrap();
834 let cache = LazyCache::new(
836 DEFAULT_LOAD_TIMEOUT,
837 DEFAULT_BUFFER_TIME,
838 BUFFER_TIME_NO_JITTER,
839 DEFAULT_EXPIRATION,
840 2,
841 );
842
843 #[allow(clippy::disallowed_methods)]
844 let far_future = SystemTime::now() + Duration::from_secs(10_000);
845
846 let resolver_a_calls = Arc::new(AtomicUsize::new(0));
847 let resolver_b_calls = Arc::new(AtomicUsize::new(0));
848 let resolver_c_calls = Arc::new(AtomicUsize::new(0));
849
850 let resolver_a = resolver_fn({
851 let calls = resolver_a_calls.clone();
852 move || {
853 calls.fetch_add(1, Ordering::Relaxed);
854 IdentityFuture::ready(Ok(Identity::new(
855 Token::new("A", Some(far_future)),
856 Some(far_future),
857 )))
858 }
859 });
860 let resolver_b = resolver_fn({
861 let calls = resolver_b_calls.clone();
862 move || {
863 calls.fetch_add(1, Ordering::Relaxed);
864 IdentityFuture::ready(Ok(Identity::new(
865 Token::new("B", Some(far_future)),
866 Some(far_future),
867 )))
868 }
869 });
870 let resolver_c = resolver_fn({
871 let calls = resolver_c_calls.clone();
872 move || {
873 calls.fetch_add(1, Ordering::Relaxed);
874 IdentityFuture::ready(Ok(Identity::new(
875 Token::new("C", Some(far_future)),
876 Some(far_future),
877 )))
878 }
879 });
880
881 let config_bag = ConfigBag::base();
882
883 cache
885 .resolve_cached_identity(resolver_a.clone(), &components, &config_bag)
886 .await
887 .unwrap();
888 cache
889 .resolve_cached_identity(resolver_b.clone(), &components, &config_bag)
890 .await
891 .unwrap();
892 assert_eq!(1, resolver_a_calls.load(Ordering::Relaxed));
893 assert_eq!(1, resolver_b_calls.load(Ordering::Relaxed));
894
895 cache
897 .resolve_cached_identity(resolver_c.clone(), &components, &config_bag)
898 .await
899 .unwrap();
900 assert_eq!(1, resolver_c_calls.load(Ordering::Relaxed));
901
902 cache
906 .resolve_cached_identity(resolver_a.clone(), &components, &config_bag)
907 .await
908 .unwrap();
909 cache
910 .resolve_cached_identity(resolver_b.clone(), &components, &config_bag)
911 .await
912 .unwrap();
913 let total_calls = resolver_a_calls.load(Ordering::Relaxed)
914 + resolver_b_calls.load(Ordering::Relaxed)
915 + resolver_c_calls.load(Ordering::Relaxed);
916 assert!(
919 (4..=5).contains(&total_calls),
920 "expected 4 or 5 total calls (3 initial + 1 or 2 re-resolutions), got {total_calls}"
921 );
922 }
923
924 #[tokio::test]
925 async fn single_partition_cache() {
926 let time = ManualTimeSource::new(epoch_secs(0));
927 let components = RuntimeComponentsBuilder::for_tests()
928 .with_time_source(Some(time.clone()))
929 .with_sleep_impl(Some(TokioSleep::new()))
930 .build()
931 .unwrap();
932 let cache = LazyCache::new(
934 DEFAULT_LOAD_TIMEOUT,
935 DEFAULT_BUFFER_TIME,
936 BUFFER_TIME_NO_JITTER,
937 DEFAULT_EXPIRATION,
938 1,
939 );
940
941 #[allow(clippy::disallowed_methods)]
942 let far_future = SystemTime::now() + Duration::from_secs(10_000);
943
944 let resolver_a_calls = Arc::new(AtomicUsize::new(0));
945 let resolver_b_calls = Arc::new(AtomicUsize::new(0));
946
947 let resolver_a = resolver_fn({
948 let calls = resolver_a_calls.clone();
949 move || {
950 calls.fetch_add(1, Ordering::Relaxed);
951 IdentityFuture::ready(Ok(Identity::new(
952 Token::new("A", Some(far_future)),
953 Some(far_future),
954 )))
955 }
956 });
957 let resolver_b = resolver_fn({
958 let calls = resolver_b_calls.clone();
959 move || {
960 calls.fetch_add(1, Ordering::Relaxed);
961 IdentityFuture::ready(Ok(Identity::new(
962 Token::new("B", Some(far_future)),
963 Some(far_future),
964 )))
965 }
966 });
967
968 let config_bag = ConfigBag::base();
969
970 let identity = cache
972 .resolve_cached_identity(resolver_a.clone(), &components, &config_bag)
973 .await
974 .unwrap();
975 assert_eq!("A", identity.data::<Token>().unwrap().token());
976 assert_eq!(1, resolver_a_calls.load(Ordering::Relaxed));
977
978 let identity = cache
980 .resolve_cached_identity(resolver_a.clone(), &components, &config_bag)
981 .await
982 .unwrap();
983 assert_eq!("A", identity.data::<Token>().unwrap().token());
984 assert_eq!(1, resolver_a_calls.load(Ordering::Relaxed));
985
986 let identity = cache
988 .resolve_cached_identity(resolver_b.clone(), &components, &config_bag)
989 .await
990 .unwrap();
991 assert_eq!("B", identity.data::<Token>().unwrap().token());
992 assert_eq!(1, resolver_b_calls.load(Ordering::Relaxed));
993
994 let identity = cache
996 .resolve_cached_identity(resolver_a.clone(), &components, &config_bag)
997 .await
998 .unwrap();
999 assert_eq!("A", identity.data::<Token>().unwrap().token());
1000 assert_eq!(2, resolver_a_calls.load(Ordering::Relaxed));
1001 }
1002
1003 #[test]
1004 #[should_panic(expected = "max_partitions must be greater than 0")]
1005 fn max_partitions_zero_panics() {
1006 LazyCacheBuilder::new().max_partitions(0);
1007 }
1008}