2.3. Waker唤醒任务

未匹配的标注

Waker唤醒任务

futures在第一次被poll时无法完成是很常见的 。当发生这种情况时,futures需要确保在准备好取得更多进展后再次对其进行轮询。这是通过Waker类型完成的 。

每次轮询futures时,都会将其作为“任务”的一部分进行轮询。任务是已提交给执行者的顶级futures

Waker每个都提供了一种wake()方法,可以用来告诉执行者他们的相关任务应该被唤醒。当wake()调用时,执行程序知道与该关联的任务Waker已准备好进行,并且应该再次轮询其future

Waker还实现了clone()以便复制和存储它们。

让我们尝试使用Waker实现一个简单的计时器。

应用:构建计时器

为了示例,我们将在创建计时器时启动新线程,在所需时间内休眠,然后在时间窗口结束时给future`计时器发信号。

以下是我们开始时需要的导入:

use {
    std::{
        future::Future,
        pin::Pin,
        sync::{Arc, Mutex},
        task::{Context, Poll, Waker},
        thread,
        time::Duration,
    },
};

让我们从定义Future类型本身开始。我们的Future需要一种方法让线程通知计时器已经过去而Future应该完成。我们将使用Arc<Mutex<..>>共享值在线程和Future之间进行通信。

pub struct TimerFuture {
    shared_state: Arc<Mutex<SharedState>>,
}

/// `future`与等待线程之间的共享状态
struct SharedState {
    /// 用于判断sleep的时间是不是已经过了
    completed: bool,

    /// 任务的唤醒者 `TimerFuture` 正在上面运行.
    /// 线程能够使用这个设置`completed = true`之后去调用 
    /// `TimerFuture`的任务来唤醒, 观察 `completed = true`, 并前进
    waker: Option<Waker>,
}

现在,让我们编写实际Future实现!

impl Future for TimerFuture {
    type Output = ();
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // 注意shared_state 并去看计时器(timer)是不是已经完成(completed)了
        let mut shared_state = self.shared_state.lock().unwrap();
        if shared_state.completed {
            Poll::Ready(())
        } else {
            // 设置唤醒器,以便线程在计时器(timer)完成的时候可以唤醒当前任务
            // 确定 future已经再一次被轮询了,并且看`completed = true`.
            //
            // 这样做一次很诱人,而不是每次都重复克隆唤醒器。
            // 但是,`TimerFuture`可以在执行程序上的任务之间移动,
            // 这可能会导致过时的唤醒程序指向错误的任务,
            // 从而阻止`TimerFuture`正确唤醒。
            //
            // 注意:可以使用 `Waker::will_wake`这个函数来检查
            // 但是为了简单起见,我们忽略了这个。
            shared_state.waker = Some(cx.waker().clone());
            Poll::Pending
        }
    }
}

如果线程已经设置shared_state.completed = true,我们就完成了!否则,我们为当前任务克隆LocalWaker,将其转换为Waker,然后传递给shared_state.waker以便线程可以将任务唤醒。

重要的是,我们必须在每次轮询Future时更新Waker,因为Future可能已经转移到另一个不同的任务与WakerFuture被轮询之后在任务间传递时会发生这种情况。

最后,我们需要API来实际构造计时器并启动线程:

impl TimerFuture {
    /// 创建一个新的`TimerFuture` 将在提供timeout之后完成
    pub fn new(duration: Duration) -> Self {
        let shared_state = Arc::new(Mutex::new(SharedState {
            completed: false,
            waker: None,
        }));

        // 引发新的线程
        let thread_shared_state = shared_state.clone();
        thread::spawn(move || {
            thread::sleep(duration);
            let mut shared_state = thread_shared_state.lock().unwrap();
            // 表示计时器已经完成并唤醒最后一个拥有被轮询过的future的任务,如果它存在的话
            shared_state.completed = true;
            if let Some(waker) = shared_state.waker.take() {
                waker.wake()
            }
        });

        TimerFuture { shared_state }
    }
}

Woot! 这就是我们构建简单计时器Future所需的全部内容。现在,我们只需有一个执行者来运行Future.

本文章首发在 LearnKu.com 网站上。

上一篇 下一篇
贡献者:1
讨论数量: 0
发起讨论 只看当前版本


暂无话题~