2.3. Waker唤醒任务
Waker
唤醒任务#
futures
在第一次被 poll
时无法完成是很常见的 。当发生这种情况时,futures
需要确保在准备好取得更多进展后再次对其进行轮询。这是通过 Waker
类型完成的 。
每次轮询 futures
时,都会将其作为 “任务” 的一部分进行轮询。任务是已提交给执行者的顶级 futures
。
Waker
每个都提供了一种 wake()
方法,可以用来告诉执行者他们的相关任务应该被唤醒。当 wake()
调用时,执行程序知道与该关联的任务 Waker
已准备好进行,并且应该再次轮询其 future
。
Waker
还实现了 clone()
以便复制和存储它们。
让我们尝试使用 Waker
实现一个简单的计时器。
应用:构建计时器#
为了示例,我们将在创建计时器时启动新线程,在所需时间内休眠,然后在时间窗口结束时给 future` 计时器发信号。
以下是我们开始时需要的导入:
use {
std::{
future::Future,
pin::Pin,
sync::{Arc, Mutex},
task::{Context, Poll, Waker},
thread,
time::Duration,
},
};
让我们从定义 Future
类型本身开始。我们的 Future
需要一种方法让线程通知计时器已经过去而 Future
应该完成。我们将使用 Arc<Mutex<..>>
共享值在线程和 Future
之间进行通信。
pub struct TimerFuture {
shared_state: Arc<Mutex<SharedState>>,
}
/// `future`与等待线程之间的共享状态
struct SharedState {
/// 用于判断sleep的时间是不是已经过了
completed: bool,
/// 任务的唤醒者 `TimerFuture` 正在上面运行.
/// 线程能够使用这个设置`completed = true`之后去调用
/// `TimerFuture`的任务来唤醒, 观察 `completed = true`, 并前进
waker: Option<Waker>,
}
现在,让我们编写实际 Future
实现!
impl Future for TimerFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// 注意shared_state 并去看计时器(timer)是不是已经完成(completed)了
let mut shared_state = self.shared_state.lock().unwrap();
if shared_state.completed {
Poll::Ready(())
} else {
// 设置唤醒器,以便线程在计时器(timer)完成的时候可以唤醒当前任务
// 确定 future已经再一次被轮询了,并且看`completed = true`.
//
// 这样做一次很诱人,而不是每次都重复克隆唤醒器。
// 但是,`TimerFuture`可以在执行程序上的任务之间移动,
// 这可能会导致过时的唤醒程序指向错误的任务,
// 从而阻止`TimerFuture`正确唤醒。
//
// 注意:可以使用 `Waker::will_wake`这个函数来检查
// 但是为了简单起见,我们忽略了这个。
shared_state.waker = Some(cx.waker().clone());
Poll::Pending
}
}
}
如果线程已经设置 shared_state.completed = true
,我们就完成了!否则,我们为当前任务克隆 LocalWaker
,将其转换为 Waker
,然后传递给 shared_state.waker
以便线程可以将任务唤醒。
重要的是,我们必须在每次轮询 Future
时更新 Waker
,因为 Future
可能已经转移到另一个不同的任务与 Waker
。Future
被轮询之后在任务间传递时会发生这种情况。
最后,我们需要 API 来实际构造计时器并启动线程:
impl TimerFuture {
/// 创建一个新的`TimerFuture` 将在提供timeout之后完成
pub fn new(duration: Duration) -> Self {
let shared_state = Arc::new(Mutex::new(SharedState {
completed: false,
waker: None,
}));
// 引发新的线程
let thread_shared_state = shared_state.clone();
thread::spawn(move || {
thread::sleep(duration);
let mut shared_state = thread_shared_state.lock().unwrap();
// 表示计时器已经完成并唤醒最后一个拥有被轮询过的future的任务,如果它存在的话
shared_state.completed = true;
if let Some(waker) = shared_state.waker.take() {
waker.wake()
}
});
TimerFuture { shared_state }
}
}
Woot! 这就是我们构建简单计时器 Future
所需的全部内容。现在,我们只需有一个执行者来运行 Future
.