2.4. 应用:构建执行者

未匹配的标注

应用:构建执行者

Futures是懒惰的,必须积极驱动完成才能做任何事情。驱动Futures完成的一种常见方法是在asyn函数内部的await!,但这只会将问题提升一级:谁将运行从顶级async函数返回的Futures?答案是我们需要一个Future执行者。

Future执行者获取一组顶级Futures并通过pollFuture可以取得进展时调用它们来完成。通常,执行者将在开始时执行poll一次。当Futures通过调用wake()表示他们已经准备好取得进展时,他们被放回队列poll再次被调用,重复直到Future完成。

在本节中,我们将编写自己的简单执行者,能够同时运行大量顶级Futures

对于这个,我们将不得不包括futures crate以获得FutureObj类型,这是一个动态调度Future,类似于Box<dyn Future<Output = T>>Cargo.toml应该看起来像这样:

[package]
name = "xyz"
version = "0.1.0"
authors = ["XYZ Author"]
edition = "2018"

[dependencies]
futures-preview = "0.3.0-alpha.9"

接下来,我们需要在src/main.rs顶部的以下导入:

#![feature(arbitrary_self_types, async_await, await_macro, futures_api, pin)]

use {
    futures::future::FutureObj,
    std::{
        future::Future,
        pin::Pin,
        sync::{Arc, Mutex},
        sync::mpsc::{sync_channel, SyncSender, Receiver},
        task::{
            local_waker_from_nonlocal,
            Poll, Wake,
        },
    },
};

我们执行者的工作是通过通道发送任务运行。执行者将从通道中提取事件并运行它们。当一个任务准备好做更多的工作(被唤醒)时,它可以通过将自己放回到通道上来安排自己再次被轮询。

在此设计中,执行者本身只需要在通道的接收端接受任务。用户将获得发送端,以便他们可以创建新的futures。任务本身只是可以重新安排自己的futures,所以我们将它们存储为与发送端配对的futures,可以用来重新排队。

/// Task executor that receives tasks off of a channel and runs them.
struct Executor {
    ready_queue: Receiver<Arc<Task>>,
}

/// `Spawner` spawns new futures onto the task channel.
#[derive(Clone)]
struct Spawner {
    task_sender: SyncSender<Arc<Task>>,
}

/// A future that can reschedule itself to be polled using a channel.
struct Task {
    // In-progress future that should be pushed to completion
    //
    // The `Mutex` is not necessary for correctness, since we only have
    // one thread executing tasks at once. However, `rustc` isn't smart
    // enough to know that `future` is only mutated from one thread,
    // so we use it in order to provide safety. A production executor would
    // not need this, and could use `UnsafeCell` instead.
    future: Mutex<Option<FutureObj<'static, ()>>>,

    // Handle to spawn tasks onto the task queue
    task_sender: SyncSender<Arc<Task>>,
}

fn new_executor_and_spawner() -> (Executor, Spawner) {
    // Maximum number of tasks to allow queueing in the channel at once.
    // This is just to make `sync_channel` happy, and wouldn't be present in
    // a real executor.
    const MAX_QUEUED_TASKS: usize = 10_000;
    let (task_sender, ready_queue) = sync_channel(MAX_QUEUED_TASKS);
    (Executor { ready_queue }, Spawner { task_sender})
}

我们还要为 spawner 添加一种方法,以便轻松生成新的future。此方法将携带future类型,将其包装并放入FutureObj中,创建一个Arc<Task>,可以将其排入执行者。

impl Spawner {
    fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) {
        let future_obj = FutureObj::new(Box::new(future));
        let task = Arc::new(Task {
            future: Mutex::new(Some(future_obj)),
            task_sender: self.task_sender.clone(),
        });
        self.task_sender.send(task).expect("too many tasks queued");
    }
}

在排序轮询futures时,我们还需要创建一个LocalWaker提供轮询。正如任务唤醒部分所讨论的那样,LocalWakers负责在wake调用后再次调度要轮询的任务。请记住, LocalWakers告诉执行者确切地准备了哪个任务,允许他们仅轮询准备好进展的futures。创建新的LocalWaker最简单方法是实现Wake特征,然后使用local_waker_from_nonlocallocal_waker函数将Arc<T: Wake> 转换为LocalWaker。让我们为我们的任务实现Wake,让它们可以转换成LocalWakers并唤醒:

impl Wake for Task {
    fn wake(arc_self: &Arc<Self>) {
        // Implement `wake` by sending this task back onto the task channel
        // so that it will be polled again by the executor.
        let cloned = arc_self.clone();
        arc_self.task_sender.send(cloned).expect("too many tasks queued");
    }
}

当从Arc<Task>创建LocalWaker时,调用wake()会导致Arc复制并被发送到任务通道。然后我们的执行者接收任务并进行轮询。让我们实现:

impl Executor {
    fn run(&self) {
        while let Ok(task) = self.ready_queue.recv() {
            let mut future_slot = task.future.lock().unwrap();
            // Take the future, and if it has not yet completed (is still Some),
            // poll it in an attempt to complete it.
            if let Some(mut future) = future_slot.take() {
                // Create a `LocalWaker` from the task itself
                let lw = local_waker_from_nonlocal(task.clone());
                if let Poll::Pending = Pin::new(&mut future).poll(&lw) {
                    // We're not done processing the future, so put it
                    // back in its task to be run again in the future.
                    *future_slot = Some(future);
                }
            }
        }
    }
}

恭喜!我们现在有一个工作的futures执行者。我们甚至可以用它来运行async/await!代码和自定义futures,比如我们之前写的TimerFuture

fn main() {
    let (executor, spawner) = new_executor_and_spawner();
    spawner.spawn(async {
        println!("howdy!");
        // Wait for our timer future to complete after two seconds.
        await!(TimerFuture::new(Duration::new(2, 0)));
        println!("done!");
    });
    executor.run();
}

本文章首发在 LearnKu.com 网站上。

上一篇 下一篇
讨论数量: 0
发起讨论 只看当前版本


暂无话题~