aws_smithy_async/test_util/
controlled_sleep.rs1use crate::{
7 rt::sleep::{AsyncSleep, Sleep},
8 test_util::ManualTimeSource,
9};
10use std::time::{Duration, SystemTime};
11use std::{
12 collections::VecDeque,
13 sync::{Arc, Mutex},
14};
15use tokio::sync::oneshot;
16use tokio::sync::Barrier;
17use tokio::time::timeout;
18
19#[derive(Debug, Clone)]
23pub struct ControlledSleep {
24 barrier: Arc<Barrier>,
25 log: Arc<Mutex<Vec<Duration>>>,
26 duration: Arc<Mutex<VecDeque<Duration>>>,
27 advance_guard: Arc<Mutex<Option<oneshot::Sender<()>>>>,
28}
29
30impl ControlledSleep {
31 fn new(log: Arc<Mutex<Vec<Duration>>>) -> (ControlledSleep, SleepGate) {
32 let gate = Arc::new(Barrier::new(2));
33 let pending = Arc::new(Mutex::new(VecDeque::new()));
34 let advance_guard: Arc<Mutex<Option<oneshot::Sender<()>>>> = Default::default();
35 (
36 ControlledSleep {
37 barrier: gate.clone(),
38 log,
39 duration: pending.clone(),
40 advance_guard: advance_guard.clone(),
41 },
42 SleepGate {
43 gate,
44 pending,
45 advance_guard,
46 },
47 )
48 }
49}
50
51#[allow(dead_code)] pub struct CapturedSleep<'a>(oneshot::Sender<()>, &'a SleepGate, Duration);
86impl CapturedSleep<'_> {
87 pub fn allow_progress(self) {
106 drop(self)
107 }
108
109 pub fn duration(&self) -> Duration {
111 self.2
112 }
113}
114
115impl AsRef<Duration> for CapturedSleep<'_> {
116 fn as_ref(&self) -> &Duration {
117 &self.2
118 }
119}
120
121pub struct SleepGate {
125 gate: Arc<Barrier>,
126 pending: Arc<Mutex<VecDeque<Duration>>>,
127 advance_guard: Arc<Mutex<Option<oneshot::Sender<()>>>>,
128}
129
130impl SleepGate {
131 pub async fn expect_sleep(&mut self) -> CapturedSleep<'_> {
136 timeout(Duration::from_secs(1), self.gate.wait())
137 .await
138 .expect("timeout");
139 let dur = self
140 .pending
141 .lock()
142 .unwrap()
143 .pop_front()
144 .unwrap_or(Duration::from_secs(123456));
145 let guard = CapturedSleep(
146 self.advance_guard.lock().unwrap().take().unwrap(),
147 self,
148 dur,
149 );
150 guard
151 }
152
153 pub async fn skip_sleep(&mut self) -> Option<Duration> {
155 if timeout(Duration::from_millis(1), self.gate.wait())
156 .await
157 .is_ok()
158 {
159 let _ = self.advance_guard.lock().unwrap().take();
160 self.pending.lock().unwrap().pop_front()
161 } else {
162 None
163 }
164 }
165}
166
167impl AsyncSleep for ControlledSleep {
168 fn sleep(&self, duration: Duration) -> Sleep {
169 let barrier = self.barrier.clone();
170 let log = self.log.clone();
171 let pending = self.duration.clone();
172 let drop_guard = self.advance_guard.clone();
173 Sleep::new(async move {
174 pending.lock().unwrap().push_back(duration);
176 let (tx, rx) = oneshot::channel();
177 *drop_guard.lock().unwrap() = Some(tx);
178 barrier.wait().await;
180 log.lock().unwrap().push(duration);
181 let _ = rx.await;
182 })
183 }
184}
185
186pub fn controlled_time_and_sleep(
191 start_time: SystemTime,
192) -> (ManualTimeSource, ControlledSleep, SleepGate) {
193 let log = Arc::new(Mutex::new(vec![]));
194 let (sleep, gate) = ControlledSleep::new(log.clone());
195 (ManualTimeSource { start_time, log }, sleep, gate)
196}
197
198#[cfg(test)]
199mod test {
200 use crate::rt::sleep::AsyncSleep;
201 use crate::test_util::controlled_time_and_sleep;
202 use crate::time::TimeSource;
203 use std::sync::atomic::{AtomicUsize, Ordering};
204 use std::sync::Arc;
205 use std::time::{Duration, UNIX_EPOCH};
206 use tokio::task::yield_now;
207 use tokio::time::timeout;
208
209 #[tokio::test]
210 async fn test_sleep_gate() {
211 let start = UNIX_EPOCH;
212 let (time, sleep, mut gate) = controlled_time_and_sleep(UNIX_EPOCH);
213 let progress = Arc::new(AtomicUsize::new(0));
214 let task_progress = progress.clone();
215 let task = tokio::spawn(async move {
216 assert_eq!(time.now(), start);
217 let progress = task_progress;
218 progress.store(1, Ordering::Release);
219 sleep.sleep(Duration::from_secs(1)).await;
220 assert_eq!(time.now(), start + Duration::from_secs(1));
221 progress.store(2, Ordering::Release);
222 sleep.sleep(Duration::from_secs(2)).await;
223 assert_eq!(time.now(), start + Duration::from_secs(3));
224 });
225 while progress.load(Ordering::Acquire) != 1 {
226 yield_now().await
227 }
228 let guard = gate.expect_sleep().await;
229 assert_eq!(guard.duration(), Duration::from_secs(1));
230 assert_eq!(progress.load(Ordering::Acquire), 1);
231 guard.allow_progress();
232
233 let guard = gate.expect_sleep().await;
234 assert_eq!(progress.load(Ordering::Acquire), 2);
235 assert!(!task.is_finished(), "task should not be finished");
236 guard.allow_progress();
237 timeout(Duration::from_secs(1), task)
238 .await
239 .expect("no timeout")
240 .expect("successful completion");
241 }
242
243 #[tokio::test]
244 async fn sleep_gate_multiple_sleeps() {
245 let (time, sleep, mut gate) = controlled_time_and_sleep(UNIX_EPOCH);
246 let one = sleep.sleep(Duration::from_secs(1));
247 let two = sleep.sleep(Duration::from_secs(2));
248 let three = sleep.sleep(Duration::from_secs(3));
249
250 let spawn = tokio::spawn(async move {
251 let _ = (one.await, two.await, three.await);
252 });
253
254 assert_eq!(Duration::from_secs(1), gate.expect_sleep().await.duration());
255 gate.skip_sleep().await;
256 assert_eq!(Duration::from_secs(3), gate.expect_sleep().await.duration());
257
258 let _ = spawn.await;
259
260 assert_eq!(UNIX_EPOCH + Duration::from_secs(6), time.now());
261 }
262
263 #[tokio::test]
264 async fn sleep_gate_skipping_a_sleep_doesnt_blow_up_if_no_sleep() {
265 let (time, sleep, mut gate) = controlled_time_and_sleep(UNIX_EPOCH);
266
267 let some_sleep = sleep.sleep(Duration::from_secs(1));
268 let spawn = tokio::spawn(async move {
269 let _ = some_sleep.await;
270 });
271
272 assert_eq!(Some(Duration::from_secs(1)), gate.skip_sleep().await);
273 assert_eq!(None, gate.skip_sleep().await);
274
275 let _ = spawn.await;
276
277 assert_eq!(UNIX_EPOCH + Duration::from_secs(1), time.now());
278 }
279}