aws_smithy_async/test_util/
tick_advance_sleep.rs1use crate::{
45 rt::sleep::{AsyncSleep, Sleep},
46 time::TimeSource,
47};
48use std::{
49 future::IntoFuture,
50 ops::{Deref, DerefMut},
51 sync::{Arc, Mutex},
52 time::{Duration, SystemTime},
53};
54use tokio::sync::oneshot::Sender;
55
56#[derive(Debug)]
57struct QueuedSleep {
58 presents_at: Duration,
60 notify: Option<Sender<()>>,
61}
62
63#[derive(Default, Debug)]
64struct Inner {
65 sleeps: Vec<QueuedSleep>,
69 now: Duration,
71}
72
73impl Inner {
74 fn push(&mut self, sleep: QueuedSleep) {
75 self.sleeps.push(sleep);
76 self.sleeps.sort_by_key(|s| s.presents_at);
77 }
78
79 fn next_presenting(&mut self, time: Duration) -> Option<QueuedSleep> {
80 if self
81 .sleeps
82 .first()
83 .map(|f| f.presents_at <= time)
84 .unwrap_or(false)
85 {
86 Some(self.sleeps.remove(0))
87 } else {
88 None
89 }
90 }
91}
92
93#[derive(Clone, Default, Debug)]
94struct SharedInner {
95 inner: Arc<Mutex<Inner>>,
96}
97impl SharedInner {
98 fn get(&self) -> impl Deref<Target = Inner> + '_ {
99 self.inner.lock().unwrap()
100 }
101 fn get_mut(&self) -> impl DerefMut<Target = Inner> + '_ {
102 self.inner.lock().unwrap()
103 }
104}
105
106#[derive(Clone, Debug)]
110pub struct TickAdvanceSleep {
111 inner: SharedInner,
112}
113
114impl AsyncSleep for TickAdvanceSleep {
115 fn sleep(&self, duration: Duration) -> Sleep {
116 let (tx, rx) = tokio::sync::oneshot::channel::<()>();
119
120 let mut inner = self.inner.get_mut();
121 let now = inner.now;
122
123 inner.push(QueuedSleep {
125 presents_at: now + duration,
126 notify: Some(tx),
127 });
128
129 Sleep::new(async move {
130 let _ = rx.into_future().await;
131 })
132 }
133}
134
135#[derive(Clone, Debug)]
139pub struct TickAdvanceTime {
140 inner: SharedInner,
141}
142
143impl TickAdvanceTime {
144 pub async fn tick(&self, duration: Duration) {
152 let time = self.inner.get().now + duration;
153
154 loop {
161 let Some(mut presenting) = self.inner.get_mut().next_presenting(time) else {
163 break;
164 };
165
166 self.inner.get_mut().now = presenting.presents_at;
169
170 let _ = presenting.notify.take().unwrap().send(());
172 tokio::task::yield_now().await;
173 }
174
175 self.inner.get_mut().now = time;
177 }
178}
179
180impl TimeSource for TickAdvanceTime {
181 fn now(&self) -> SystemTime {
182 SystemTime::UNIX_EPOCH + self.inner.get().now
183 }
184}
185
186pub fn tick_advance_time_and_sleep() -> (TickAdvanceTime, TickAdvanceSleep) {
190 let inner = SharedInner::default();
191 (
192 TickAdvanceTime {
193 inner: inner.clone(),
194 },
195 TickAdvanceSleep {
196 inner: inner.clone(),
197 },
198 )
199}
200
201#[cfg(test)]
202mod tests {
203 use super::*;
204 use futures_util::FutureExt;
205
206 #[tokio::test]
207 async fn tick_advances() {
208 let (time, sleep) = tick_advance_time_and_sleep();
209
210 assert_eq!(SystemTime::UNIX_EPOCH, time.now());
211 time.tick(Duration::from_secs(1)).await;
212 assert_eq!(SystemTime::UNIX_EPOCH + Duration::from_secs(1), time.now());
213
214 let sleeps = vec![
215 tokio::spawn(sleep.sleep(Duration::from_millis(500))),
216 tokio::spawn(sleep.sleep(Duration::from_secs(1))),
217 tokio::spawn(sleep.sleep(Duration::from_secs(2))),
218 tokio::spawn(sleep.sleep(Duration::from_secs(3))),
219 tokio::spawn(sleep.sleep(Duration::from_secs(4))),
220 ];
221
222 tokio::task::yield_now().await;
223 for sleep in &sleeps {
224 assert!(!sleep.is_finished());
225 }
226
227 time.tick(Duration::from_secs(1)).await;
228 assert_eq!(SystemTime::UNIX_EPOCH + Duration::from_secs(2), time.now());
229 assert!(sleeps[0].is_finished());
230 assert!(sleeps[1].is_finished());
231 assert!(!sleeps[2].is_finished());
232 assert!(!sleeps[3].is_finished());
233 assert!(!sleeps[4].is_finished());
234
235 time.tick(Duration::from_secs(2)).await;
236 assert_eq!(SystemTime::UNIX_EPOCH + Duration::from_secs(4), time.now());
237 assert!(sleeps[2].is_finished());
238 assert!(sleeps[3].is_finished());
239 assert!(!sleeps[4].is_finished());
240
241 time.tick(Duration::from_secs(1)).await;
242 assert_eq!(SystemTime::UNIX_EPOCH + Duration::from_secs(5), time.now());
243 assert!(sleeps[4].is_finished());
244 }
245
246 #[tokio::test]
247 async fn sleep_leading_to_sleep() {
248 let (time, sleep) = tick_advance_time_and_sleep();
249
250 let task = tokio::spawn(async move {
251 sleep.sleep(Duration::from_secs(1)).await;
252 sleep.sleep(Duration::from_secs(2)).await;
253 sleep.sleep(Duration::from_secs(3)).await;
254 });
255 tokio::task::yield_now().await;
256 assert!(!task.is_finished());
257 assert_eq!(SystemTime::UNIX_EPOCH, time.now());
258
259 time.tick(Duration::from_secs(6)).await;
260 assert_eq!(SystemTime::UNIX_EPOCH + Duration::from_secs(6), time.now());
261 task.await.unwrap();
262 }
263
264 #[tokio::test]
265 async fn racing_sleeps() {
266 let (time, sleep) = tick_advance_time_and_sleep();
267
268 let task = tokio::spawn(async move {
269 let sleep1 = sleep.sleep(Duration::from_secs(1)).then({
270 let sleep = sleep.clone();
271 move |_| async move {
272 sleep.sleep(Duration::from_secs(1)).await;
273 }
274 });
275 let sleep2 = sleep.sleep(Duration::from_secs(3));
276 tokio::select! {
277 _ = sleep1 => { }
278 _ = sleep2 => { panic!("sleep2 should not complete before sleep1") }
279 }
280 });
281 tokio::task::yield_now().await;
282 assert!(!task.is_finished());
283 assert_eq!(SystemTime::UNIX_EPOCH, time.now());
284
285 time.tick(Duration::from_secs(6)).await;
286 assert_eq!(SystemTime::UNIX_EPOCH + Duration::from_secs(6), time.now());
287 task.await.unwrap();
288 }
289}