2.4. 应用:构建执行者

未匹配的标注

应用:构建执行者#

Futures 是懒惰的,必须积极驱动完成才能做任何事情。驱动 Futures 完成的一种常见方法是在 asyn 函数内部的 await!,但这只会将问题提升一级:谁将运行从顶级 async 函数返回的 Futures?答案是我们需要一个 Future 执行者。

Future 执行者获取一组顶级 Futures 并通过 pollFuture 可以取得进展时调用它们来完成。通常,执行者将在开始时执行 poll 一次。当 Futures 通过调用 wake() 表示他们已经准备好取得进展时,他们被放回队列 poll 再次被调用,重复直到 Future 完成。

在本节中,我们将编写自己的简单执行者,能够同时运行大量顶级 Futures

对于这个,我们将不得不包括 futures crate 以获得 FutureObj 类型,这是一个动态调度 Future,类似于 Box<dyn Future<Output = T>>Cargo.toml 应该看起来像这样:

[package]
name = "xyz"
version = "0.1.0"
authors = ["XYZ Author"]
edition = "2018"

[dependencies]
futures-preview = "0.3.0-alpha.9"

接下来,我们需要在 src/main.rs 顶部的以下导入:

#![feature(arbitrary_self_types, async_await, await_macro, futures_api, pin)]

use {
    futures::future::FutureObj,
    std::{
        future::Future,
        pin::Pin,
        sync::{Arc, Mutex},
        sync::mpsc::{sync_channel, SyncSender, Receiver},
        task::{
            local_waker_from_nonlocal,
            Poll, Wake,
        },
    },
};

我们执行者的工作是通过通道发送任务运行。执行者将从通道中提取事件并运行它们。当一个任务准备好做更多的工作(被唤醒)时,它可以通过将自己放回到通道上来安排自己再次被轮询。

在此设计中,执行者本身只需要在通道的接收端接受任务。用户将获得发送端,以便他们可以创建新的 futures。任务本身只是可以重新安排自己的 futures,所以我们将它们存储为与发送端配对的 futures,可以用来重新排队。

/// Task executor that receives tasks off of a channel and runs them.
struct Executor {
    ready_queue: Receiver<Arc<Task>>,
}

/// `Spawner` spawns new futures onto the task channel.
#[derive(Clone)]
struct Spawner {
    task_sender: SyncSender<Arc<Task>>,
}

/// A future that can reschedule itself to be polled using a channel.
struct Task {
    // In-progress future that should be pushed to completion
    //
    // The `Mutex` is not necessary for correctness, since we only have
    // one thread executing tasks at once. However, `rustc` isn't smart
    // enough to know that `future` is only mutated from one thread,
    // so we use it in order to provide safety. A production executor would
    // not need this, and could use `UnsafeCell` instead.
    future: Mutex<Option<FutureObj<'static, ()>>>,

    // Handle to spawn tasks onto the task queue
    task_sender: SyncSender<Arc<Task>>,
}

fn new_executor_and_spawner() -> (Executor, Spawner) {
    // Maximum number of tasks to allow queueing in the channel at once.
    // This is just to make `sync_channel` happy, and wouldn't be present in
    // a real executor.
    const MAX_QUEUED_TASKS: usize = 10_000;
    let (task_sender, ready_queue) = sync_channel(MAX_QUEUED_TASKS);
    (Executor { ready_queue }, Spawner { task_sender})
}

我们还要为 spawner 添加一种方法,以便轻松生成新的 future。此方法将携带 future 类型,将其包装并放入 FutureObj 中,创建一个 Arc<Task>,可以将其排入执行者。

impl Spawner {
    fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) {
        let future_obj = FutureObj::new(Box::new(future));
        let task = Arc::new(Task {
            future: Mutex::new(Some(future_obj)),
            task_sender: self.task_sender.clone(),
        });
        self.task_sender.send(task).expect("too many tasks queued");
    }
}

在排序轮询 futures 时,我们还需要创建一个 LocalWaker 提供轮询。正如任务唤醒部分所讨论的那样,LocalWakers 负责在 wake 调用后再次调度要轮询的任务。请记住, LocalWakers 告诉执行者确切地准备了哪个任务,允许他们仅轮询准备好进展的 futures。创建新的 LocalWaker 最简单方法是实现 Wake 特征,然后使用 local_waker_from_nonlocallocal_waker 函数将 Arc<T: Wake> 转换为 LocalWaker。让我们为我们的任务实现 Wake,让它们可以转换成 LocalWakers 并唤醒:

impl Wake for Task {
    fn wake(arc_self: &Arc<Self>) {
        // Implement `wake` by sending this task back onto the task channel
        // so that it will be polled again by the executor.
        let cloned = arc_self.clone();
        arc_self.task_sender.send(cloned).expect("too many tasks queued");
    }
}

当从 Arc<Task> 创建 LocalWaker 时,调用 wake() 会导致 Arc 复制并被发送到任务通道。然后我们的执行者接收任务并进行轮询。让我们实现:

impl Executor {
    fn run(&self) {
        while let Ok(task) = self.ready_queue.recv() {
            let mut future_slot = task.future.lock().unwrap();
            // Take the future, and if it has not yet completed (is still Some),
            // poll it in an attempt to complete it.
            if let Some(mut future) = future_slot.take() {
                // Create a `LocalWaker` from the task itself
                let lw = local_waker_from_nonlocal(task.clone());
                if let Poll::Pending = Pin::new(&mut future).poll(&lw) {
                    // We're not done processing the future, so put it
                    // back in its task to be run again in the future.
                    *future_slot = Some(future);
                }
            }
        }
    }
}

恭喜!我们现在有一个工作的 futures 执行者。我们甚至可以用它来运行 async/await! 代码和自定义 futures,比如我们之前写的 TimerFuture

fn main() {
    let (executor, spawner) = new_executor_and_spawner();
    spawner.spawn(async {
        println!("howdy!");
        // Wait for our timer future to complete after two seconds.
        await!(TimerFuture::new(Duration::new(2, 0)));
        println!("done!");
    });
    executor.run();
}

本文章首发在 LearnKu.com 网站上。

上一篇 下一篇
讨论数量: 0
发起讨论 只看当前版本


暂无话题~