aws_smithy_async/test_util/
tick_advance_sleep.rs

1/*
2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6//! Test time/sleep implementations that work by manually advancing time with a `tick()`
7//!
8//! # Examples
9//!
10//! Spawning a task that creates new sleep tasks and waits for them sequentially,
11//! and advancing passed all of them with a single call to `tick()`.
12//!
13//! ```rust,no_run
14//! use std::time::{Duration, SystemTime};
15//! use aws_smithy_async::test_util::tick_advance_sleep::tick_advance_time_and_sleep;
16//! use aws_smithy_async::time::TimeSource;
17//! use aws_smithy_async::rt::sleep::AsyncSleep;
18//!
19//! # async fn example() {
20//! // Create the test time/sleep implementations.
21//! // They will start at SystemTime::UNIX_EPOCH.
22//! let (time, sleep) = tick_advance_time_and_sleep();
23//!
24//! // Spawn the task that sequentially sleeps
25//! let task = tokio::spawn(async move {
26//!     sleep.sleep(Duration::from_secs(1)).await;
27//!     sleep.sleep(Duration::from_secs(2)).await;
28//!     sleep.sleep(Duration::from_secs(3)).await;
29//! });
30//! // Verify that task hasn't done anything yet since we haven't called `tick`
31//! tokio::task::yield_now().await;
32//! assert!(!task.is_finished());
33//! assert_eq!(SystemTime::UNIX_EPOCH, time.now());
34//!
35//! // Tick 6 seconds, which is long enough to go passed all the sequential sleeps
36//! time.tick(Duration::from_secs(6)).await;
37//! assert_eq!(SystemTime::UNIX_EPOCH + Duration::from_secs(6), time.now());
38//!
39//! // Verify the task joins, indicating all the sleeps are done
40//! task.await.unwrap();
41//! # }
42//! ```
43
44use crate::{
45    rt::sleep::{AsyncSleep, Sleep},
46    time::TimeSource,
47};
48use std::{
49    future::IntoFuture,
50    ops::{Deref, DerefMut},
51    sync::{Arc, Mutex},
52    time::{Duration, SystemTime},
53};
54use tokio::sync::oneshot::Sender;
55
56#[derive(Debug)]
57struct QueuedSleep {
58    /// Duration since `UNIX_EPOCH` at which point the sleep is finished.
59    presents_at: Duration,
60    notify: Option<Sender<()>>,
61}
62
63#[derive(Default, Debug)]
64struct Inner {
65    // Need to use a Vec since VecDeque doesn't have sort functions,
66    // and BTreeSet doesn't fit since we could have more than one sleep presenting
67    // at the same time (and there's no way to compare the notify channels).
68    sleeps: Vec<QueuedSleep>,
69    /// Duration since `UNIX_EPOCH` that represents "now".
70    now: Duration,
71}
72
73impl Inner {
74    fn push(&mut self, sleep: QueuedSleep) {
75        self.sleeps.push(sleep);
76        self.sleeps.sort_by_key(|s| s.presents_at);
77    }
78
79    fn next_presenting(&mut self, time: Duration) -> Option<QueuedSleep> {
80        if self
81            .sleeps
82            .first()
83            .map(|f| f.presents_at <= time)
84            .unwrap_or(false)
85        {
86            Some(self.sleeps.remove(0))
87        } else {
88            None
89        }
90    }
91}
92
93#[derive(Clone, Default, Debug)]
94struct SharedInner {
95    inner: Arc<Mutex<Inner>>,
96}
97impl SharedInner {
98    fn get(&self) -> impl Deref<Target = Inner> + '_ {
99        self.inner.lock().unwrap()
100    }
101    fn get_mut(&self) -> impl DerefMut<Target = Inner> + '_ {
102        self.inner.lock().unwrap()
103    }
104}
105
106/// Tick-advancing test sleep implementation.
107///
108/// See [module docs](crate::test_util::tick_advance_sleep) for more information.
109#[derive(Clone, Debug)]
110pub struct TickAdvanceSleep {
111    inner: SharedInner,
112}
113
114impl AsyncSleep for TickAdvanceSleep {
115    fn sleep(&self, duration: Duration) -> Sleep {
116        // Use a one-shot channel to block the sleep future until `TickAdvanceTime::tick`
117        // chooses to complete it by sending with the receiver.
118        let (tx, rx) = tokio::sync::oneshot::channel::<()>();
119
120        let mut inner = self.inner.get_mut();
121        let now = inner.now;
122
123        // Add the sleep to the queue, which `TickAdvanceTime` will examine when ticking.
124        inner.push(QueuedSleep {
125            presents_at: now + duration,
126            notify: Some(tx),
127        });
128
129        Sleep::new(async move {
130            let _ = rx.into_future().await;
131        })
132    }
133}
134
135/// Tick-advancing test time source implementation.
136///
137/// See [module docs](crate::test_util::tick_advance_sleep) for more information.
138#[derive(Clone, Debug)]
139pub struct TickAdvanceTime {
140    inner: SharedInner,
141}
142
143impl TickAdvanceTime {
144    /// Advance time by `duration`.
145    ///
146    /// This will yield the async runtime after each sleep that presents between
147    /// the previous current time and the time after the given duration. This allows
148    /// for async tasks pending one of those sleeps to do some work and also create
149    /// additional sleep tasks. Created sleep tasks may also complete during this
150    /// call to `tick()` if they present before the given time duration.
151    pub async fn tick(&self, duration: Duration) {
152        let time = self.inner.get().now + duration;
153
154        // Tick to each individual sleep time and yield the runtime so that any
155        // futures waiting on a sleep run before futures waiting on a later sleep.
156        //
157        // We also need to recheck the list of queued sleeps every iteration since
158        // unblocked tasks could have queued up more sleeps, and these sleeps may also
159        // need to present before ones that were previously queued.
160        loop {
161            // Can't do `while let` since that holds the lock open
162            let Some(mut presenting) = self.inner.get_mut().next_presenting(time) else {
163                break;
164            };
165
166            // Make sure the time is always accurate for async code that runs
167            // after completing the sleep.
168            self.inner.get_mut().now = presenting.presents_at;
169
170            // Notify the sleep, and then yield to let work blocked on that sleep to proceed
171            let _ = presenting.notify.take().unwrap().send(());
172            tokio::task::yield_now().await;
173        }
174
175        // Set the final time.
176        self.inner.get_mut().now = time;
177    }
178}
179
180impl TimeSource for TickAdvanceTime {
181    fn now(&self) -> SystemTime {
182        SystemTime::UNIX_EPOCH + self.inner.get().now
183    }
184}
185
186/// Creates tick-advancing test time/sleep implementations.
187///
188/// See [module docs](crate::test_util::tick_advance_sleep) for more information.
189pub fn tick_advance_time_and_sleep() -> (TickAdvanceTime, TickAdvanceSleep) {
190    let inner = SharedInner::default();
191    (
192        TickAdvanceTime {
193            inner: inner.clone(),
194        },
195        TickAdvanceSleep {
196            inner: inner.clone(),
197        },
198    )
199}
200
201#[cfg(test)]
202mod tests {
203    use super::*;
204    use futures_util::FutureExt;
205
206    #[tokio::test]
207    async fn tick_advances() {
208        let (time, sleep) = tick_advance_time_and_sleep();
209
210        assert_eq!(SystemTime::UNIX_EPOCH, time.now());
211        time.tick(Duration::from_secs(1)).await;
212        assert_eq!(SystemTime::UNIX_EPOCH + Duration::from_secs(1), time.now());
213
214        let sleeps = vec![
215            tokio::spawn(sleep.sleep(Duration::from_millis(500))),
216            tokio::spawn(sleep.sleep(Duration::from_secs(1))),
217            tokio::spawn(sleep.sleep(Duration::from_secs(2))),
218            tokio::spawn(sleep.sleep(Duration::from_secs(3))),
219            tokio::spawn(sleep.sleep(Duration::from_secs(4))),
220        ];
221
222        tokio::task::yield_now().await;
223        for sleep in &sleeps {
224            assert!(!sleep.is_finished());
225        }
226
227        time.tick(Duration::from_secs(1)).await;
228        assert_eq!(SystemTime::UNIX_EPOCH + Duration::from_secs(2), time.now());
229        assert!(sleeps[0].is_finished());
230        assert!(sleeps[1].is_finished());
231        assert!(!sleeps[2].is_finished());
232        assert!(!sleeps[3].is_finished());
233        assert!(!sleeps[4].is_finished());
234
235        time.tick(Duration::from_secs(2)).await;
236        assert_eq!(SystemTime::UNIX_EPOCH + Duration::from_secs(4), time.now());
237        assert!(sleeps[2].is_finished());
238        assert!(sleeps[3].is_finished());
239        assert!(!sleeps[4].is_finished());
240
241        time.tick(Duration::from_secs(1)).await;
242        assert_eq!(SystemTime::UNIX_EPOCH + Duration::from_secs(5), time.now());
243        assert!(sleeps[4].is_finished());
244    }
245
246    #[tokio::test]
247    async fn sleep_leading_to_sleep() {
248        let (time, sleep) = tick_advance_time_and_sleep();
249
250        let task = tokio::spawn(async move {
251            sleep.sleep(Duration::from_secs(1)).await;
252            sleep.sleep(Duration::from_secs(2)).await;
253            sleep.sleep(Duration::from_secs(3)).await;
254        });
255        tokio::task::yield_now().await;
256        assert!(!task.is_finished());
257        assert_eq!(SystemTime::UNIX_EPOCH, time.now());
258
259        time.tick(Duration::from_secs(6)).await;
260        assert_eq!(SystemTime::UNIX_EPOCH + Duration::from_secs(6), time.now());
261        task.await.unwrap();
262    }
263
264    #[tokio::test]
265    async fn racing_sleeps() {
266        let (time, sleep) = tick_advance_time_and_sleep();
267
268        let task = tokio::spawn(async move {
269            let sleep1 = sleep.sleep(Duration::from_secs(1)).then({
270                let sleep = sleep.clone();
271                move |_| async move {
272                    sleep.sleep(Duration::from_secs(1)).await;
273                }
274            });
275            let sleep2 = sleep.sleep(Duration::from_secs(3));
276            tokio::select! {
277                _ = sleep1 => { /* good */}
278                _ = sleep2 => { panic!("sleep2 should not complete before sleep1") }
279            }
280        });
281        tokio::task::yield_now().await;
282        assert!(!task.is_finished());
283        assert_eq!(SystemTime::UNIX_EPOCH, time.now());
284
285        time.tick(Duration::from_secs(6)).await;
286        assert_eq!(SystemTime::UNIX_EPOCH + Duration::from_secs(6), time.now());
287        task.await.unwrap();
288    }
289}