aws_smithy_async/test_util/
controlled_sleep.rs

1/*
2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6use crate::{
7    rt::sleep::{AsyncSleep, Sleep},
8    test_util::ManualTimeSource,
9};
10use std::time::{Duration, SystemTime};
11use std::{
12    collections::VecDeque,
13    sync::{Arc, Mutex},
14};
15use tokio::sync::oneshot;
16use tokio::sync::Barrier;
17use tokio::time::timeout;
18
19/// A sleep implementation where calls to [`AsyncSleep::sleep`] block until [`SleepGate::expect_sleep`] is called
20///
21/// Create a [`ControlledSleep`] with [`controlled_time_and_sleep`]
22#[derive(Debug, Clone)]
23pub struct ControlledSleep {
24    barrier: Arc<Barrier>,
25    log: Arc<Mutex<Vec<Duration>>>,
26    duration: Arc<Mutex<VecDeque<Duration>>>,
27    advance_guard: Arc<Mutex<Option<oneshot::Sender<()>>>>,
28}
29
30impl ControlledSleep {
31    fn new(log: Arc<Mutex<Vec<Duration>>>) -> (ControlledSleep, SleepGate) {
32        let gate = Arc::new(Barrier::new(2));
33        let pending = Arc::new(Mutex::new(VecDeque::new()));
34        let advance_guard: Arc<Mutex<Option<oneshot::Sender<()>>>> = Default::default();
35        (
36            ControlledSleep {
37                barrier: gate.clone(),
38                log,
39                duration: pending.clone(),
40                advance_guard: advance_guard.clone(),
41            },
42            SleepGate {
43                gate,
44                pending,
45                advance_guard,
46            },
47        )
48    }
49}
50
51/// Guard returned from [`SleepGate::expect_sleep`]
52///
53/// # Examples
54/// ```rust
55/// # use std::sync::Arc;
56/// use std::sync::atomic::{AtomicUsize, Ordering};
57/// # async {
58/// use std::time::{Duration, UNIX_EPOCH};
59/// use aws_smithy_async::rt::sleep::AsyncSleep;
60/// use aws_smithy_async::test_util::controlled_time_and_sleep;
61/// let (time, sleep, mut gate) = controlled_time_and_sleep(UNIX_EPOCH);
62/// let progress = Arc::new(AtomicUsize::new(0));
63/// let task_progress = progress.clone();
64/// let task = tokio::spawn(async move {
65///     let progress = task_progress;
66///     progress.store(1, Ordering::Release);
67///     sleep.sleep(Duration::from_secs(1)).await;
68///     progress.store(2, Ordering::Release);
69///     sleep.sleep(Duration::from_secs(2)).await;
70/// });
71/// while progress.load(Ordering::Acquire) != 1 {}
72/// let guard = gate.expect_sleep().await;
73/// assert_eq!(guard.duration(), Duration::from_secs(1));
74/// assert_eq!(progress.load(Ordering::Acquire), 1);
75/// guard.allow_progress();
76///
77/// let guard = gate.expect_sleep().await;
78/// assert_eq!(progress.load(Ordering::Acquire), 2);
79/// assert_eq!(task.is_finished(), false);
80/// guard.allow_progress();
81/// task.await.expect("successful completion");
82/// # };
83/// ```
84#[allow(dead_code)] // unused fields retained for their `Drop` impls
85pub struct CapturedSleep<'a>(oneshot::Sender<()>, &'a SleepGate, Duration);
86impl CapturedSleep<'_> {
87    /// Allow the calling code to advance past the call to [`AsyncSleep::sleep`]
88    ///
89    /// In order to facilitate testing with no flakiness, the future returned by the call to `sleep`
90    /// will not resolve until [`CapturedSleep`] is dropped or this method is called.
91    ///
92    /// ```rust
93    /// use std::time::Duration;
94    /// use aws_smithy_async::rt::sleep::AsyncSleep;
95    /// async fn do_something(sleep: &dyn AsyncSleep) {
96    ///   println!("before sleep");
97    ///   sleep.sleep(Duration::from_secs(1)).await;
98    ///   println!("after sleep");
99    /// }
100    /// ```
101    ///
102    /// To be specific, when `do_something` is called, the code will advance to `sleep.sleep`.
103    /// When [`SleepGate::expect_sleep`] is called, the 1 second sleep will be captured, but `after sleep`
104    /// WILL NOT be printed, until `allow_progress` is called.
105    pub fn allow_progress(self) {
106        drop(self)
107    }
108
109    /// Duration in the call to [`AsyncSleep::sleep`]
110    pub fn duration(&self) -> Duration {
111        self.2
112    }
113}
114
115impl AsRef<Duration> for CapturedSleep<'_> {
116    fn as_ref(&self) -> &Duration {
117        &self.2
118    }
119}
120
121/// Gate that allows [`ControlledSleep`] to advance.
122///
123/// See [`controlled_time_and_sleep`] for more details
124pub struct SleepGate {
125    gate: Arc<Barrier>,
126    pending: Arc<Mutex<VecDeque<Duration>>>,
127    advance_guard: Arc<Mutex<Option<oneshot::Sender<()>>>>,
128}
129
130impl SleepGate {
131    /// Expect the time source to sleep
132    ///
133    /// This returns the duration that was slept and a [`CapturedSleep`]. The drop guard is used
134    /// to precisely control
135    pub async fn expect_sleep(&mut self) -> CapturedSleep<'_> {
136        timeout(Duration::from_secs(1), self.gate.wait())
137            .await
138            .expect("timeout");
139        let dur = self
140            .pending
141            .lock()
142            .unwrap()
143            .pop_front()
144            .unwrap_or(Duration::from_secs(123456));
145        let guard = CapturedSleep(
146            self.advance_guard.lock().unwrap().take().unwrap(),
147            self,
148            dur,
149        );
150        guard
151    }
152
153    /// Skips any sleep that may be queued up, returning its duration
154    pub async fn skip_sleep(&mut self) -> Option<Duration> {
155        if timeout(Duration::from_millis(1), self.gate.wait())
156            .await
157            .is_ok()
158        {
159            let _ = self.advance_guard.lock().unwrap().take();
160            self.pending.lock().unwrap().pop_front()
161        } else {
162            None
163        }
164    }
165}
166
167impl AsyncSleep for ControlledSleep {
168    fn sleep(&self, duration: Duration) -> Sleep {
169        let barrier = self.barrier.clone();
170        let log = self.log.clone();
171        let pending = self.duration.clone();
172        let drop_guard = self.advance_guard.clone();
173        Sleep::new(async move {
174            // 1. write the duration into the shared mutex
175            pending.lock().unwrap().push_back(duration);
176            let (tx, rx) = oneshot::channel();
177            *drop_guard.lock().unwrap() = Some(tx);
178            // 2. first wait on the barrier—this is how we wait for an invocation of `expect_sleep`
179            barrier.wait().await;
180            log.lock().unwrap().push(duration);
181            let _ = rx.await;
182        })
183    }
184}
185
186/// Returns a trio of tools to test interactions with time
187///
188/// 1. [`ManualTimeSource`] which starts at a specific time and only advances when `sleep` is called.
189///    It MUST be paired with [`ControlledSleep`] in order to function.
190pub fn controlled_time_and_sleep(
191    start_time: SystemTime,
192) -> (ManualTimeSource, ControlledSleep, SleepGate) {
193    let log = Arc::new(Mutex::new(vec![]));
194    let (sleep, gate) = ControlledSleep::new(log.clone());
195    (ManualTimeSource { start_time, log }, sleep, gate)
196}
197
198#[cfg(test)]
199mod test {
200    use crate::rt::sleep::AsyncSleep;
201    use crate::test_util::controlled_time_and_sleep;
202    use crate::time::TimeSource;
203    use std::sync::atomic::{AtomicUsize, Ordering};
204    use std::sync::Arc;
205    use std::time::{Duration, UNIX_EPOCH};
206    use tokio::task::yield_now;
207    use tokio::time::timeout;
208
209    #[tokio::test]
210    async fn test_sleep_gate() {
211        let start = UNIX_EPOCH;
212        let (time, sleep, mut gate) = controlled_time_and_sleep(UNIX_EPOCH);
213        let progress = Arc::new(AtomicUsize::new(0));
214        let task_progress = progress.clone();
215        let task = tokio::spawn(async move {
216            assert_eq!(time.now(), start);
217            let progress = task_progress;
218            progress.store(1, Ordering::Release);
219            sleep.sleep(Duration::from_secs(1)).await;
220            assert_eq!(time.now(), start + Duration::from_secs(1));
221            progress.store(2, Ordering::Release);
222            sleep.sleep(Duration::from_secs(2)).await;
223            assert_eq!(time.now(), start + Duration::from_secs(3));
224        });
225        while progress.load(Ordering::Acquire) != 1 {
226            yield_now().await
227        }
228        let guard = gate.expect_sleep().await;
229        assert_eq!(guard.duration(), Duration::from_secs(1));
230        assert_eq!(progress.load(Ordering::Acquire), 1);
231        guard.allow_progress();
232
233        let guard = gate.expect_sleep().await;
234        assert_eq!(progress.load(Ordering::Acquire), 2);
235        assert!(!task.is_finished(), "task should not be finished");
236        guard.allow_progress();
237        timeout(Duration::from_secs(1), task)
238            .await
239            .expect("no timeout")
240            .expect("successful completion");
241    }
242
243    #[tokio::test]
244    async fn sleep_gate_multiple_sleeps() {
245        let (time, sleep, mut gate) = controlled_time_and_sleep(UNIX_EPOCH);
246        let one = sleep.sleep(Duration::from_secs(1));
247        let two = sleep.sleep(Duration::from_secs(2));
248        let three = sleep.sleep(Duration::from_secs(3));
249
250        let spawn = tokio::spawn(async move {
251            let _ = (one.await, two.await, three.await);
252        });
253
254        assert_eq!(Duration::from_secs(1), gate.expect_sleep().await.duration());
255        gate.skip_sleep().await;
256        assert_eq!(Duration::from_secs(3), gate.expect_sleep().await.duration());
257
258        let _ = spawn.await;
259
260        assert_eq!(UNIX_EPOCH + Duration::from_secs(6), time.now());
261    }
262
263    #[tokio::test]
264    async fn sleep_gate_skipping_a_sleep_doesnt_blow_up_if_no_sleep() {
265        let (time, sleep, mut gate) = controlled_time_and_sleep(UNIX_EPOCH);
266
267        let some_sleep = sleep.sleep(Duration::from_secs(1));
268        let spawn = tokio::spawn(async move {
269            let _ = some_sleep.await;
270        });
271
272        assert_eq!(Some(Duration::from_secs(1)), gate.skip_sleep().await);
273        assert_eq!(None, gate.skip_sleep().await);
274
275        let _ = spawn.await;
276
277        assert_eq!(UNIX_EPOCH + Duration::from_secs(1), time.now());
278    }
279}