2.2. Future 特质
Future
特质
该Future
特质是Rust异步编程中的核心。Future是可以产生异步计算的值(尽管该值可以是空的,例如())。一个简化的Future
特质版本可能是这个样子:
trait SimpleFuture {
type Output;
fn poll(&mut self, wake: fn()) -> Poll<Self::Output>;
}
enum Poll<T> {
Ready(T),
Pending,
}
通过调用该poll
函数可以推进Futures
,这将推动future尽可能地完成。如果Future
完成,它将返回Poll::Ready(result)
。如果Future
尚未完成,它将返回Poll::Pending
并安排在Future
准备好进行更多进展时调用wake()
函数。当wake()
被调用时,executor
(执行者)驱动Future
将再次调用poll
,以便Future
能够取得更多进展。
如果没有wake()
,executor
(执行者)将无法知道特定的Futures
何时可以取得进展,并且必须不断地对每个Futures
进行轮询。有了wake()
,执行者确切地知道哪些future准备好poll
。
例如,考虑我们想要从可能已经或可能没有数据的套接字读取的情况。如果有数据,我们可以读取并返回Poll::Ready(data)
,但如果没有数据准备就绪,我们的Futures
将被阻止,无法再进展。当没有数据可用时,我们必须注册wake
在套接字上在准备好数据时进行调用,这将告诉执行者我们的Futures
已准备好取得进展。
一个简单的SocketRead
Futures
可能看起来像这样:
struct SocketRead<'a> {
socket: &'a Socket,
}
impl SimpleFuture for SocketRead<'_> {
type Output = Vec<u8>;
fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
if self.socket.has_data_to_read() {
// 套接字拥有数据就读取数据到缓冲区并返回数据.
Poll::Ready(self.socket.read_buf())
} else {
// 套接字没有数据
// 安排 `wake` 在有数据之后能够被调用.
// 当数据可获得的时候, `wake` 将被调用
// 并且这个`Future` 的用户将知道再一次调用 `poll` 接收数据
self.socket.set_readable_callback(wake);
Poll::Pending
}
}
}
这种Futures
模型允许将多个异步操作组合在一起,而无需中间分配。一次运行多个Futures
或将Futures
链接在一起可以通过无分配状态机实现,如下所示:
/// 一个SimpleFuture,可以同时运行另外两个future。
///
/// 并发是通过以下事实实现的:每个future都需要进行“poll”
/// 可能会交错,让每个future以自己的步伐前进。
pub struct Join<FutureA, FutureB> {
// 每个字段都可能包含应运行以完成的future。
// 如果future已经完成,则将该字段设置为“None”。
// 这样可以防止我们在完成后轮询(poll)future。
// 如果那样做就违反了`Future` 这个trait的契约(contract).
a: Option<FutureA>,
b: Option<FutureB>,
}
impl<FutureA, FutureB> SimpleFuture for Join<FutureA, FutureB>
where
FutureA: SimpleFuture<Output = ()>,
FutureB: SimpleFuture<Output = ()>,
{
type Output = ();
fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
// 尝试完成 future `a`.
if let Some(a) = &mut self.a {
if let Poll::Ready(()) = a.poll(wake) {
self.a.take();
}
}
// 尝试完成 future `b`.
if let Some(b) = &mut self.b {
if let Poll::Ready(()) = b.poll(wake) {
self.b.take();
}
}
if self.a.is_none() && self.b.is_none() {
// 所有的futures都完成了,我们可以成功的返回
Poll::Ready(())
} else {
// 一个或者全部的futures返回了`Poll::Pending`,说明仍有工作需要去做
// 他们将会调用'wake()',当取得进展时
Poll::Pending
}
}
}
这展示了如何在不需要单独分配的情况下同时运行多个Future
,从而允许更高效的异步程序。同样,多个顺序Future
可以一个接一个地运行,如下所示:
/// SimpleFuture 将一个接一个地运行知道完成
//
// 注意: 为了这个简单的例子, `AndThenFut` 假设两个future在创建时可用
// `AndThen` 组合器允许基于输出的第一个future创建第二个future
// 像这样使用: `get_breakfast.and_then(|food| eat(food))`.
pub struct AndThenFut<FutureA, FutureB> {
first: Option<FutureA>,
second: FutureB,
}
impl<FutureA, FutureB> SimpleFuture for AndThenFut<FutureA, FutureB>
where
FutureA: SimpleFuture<Output = ()>,
FutureB: SimpleFuture<Output = ()>,
{
type Output = ();
fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
if let Some(first) = &mut self.first {
match first.poll(wake) {
// 我们完成了第一个future,之后轮询它并完成第二个
Poll::Ready(()) => self.first.take(),
// 我们还不能完成第一个 future.
Poll::Pending => return Poll::Pending,
};
}
// 现在第一个future已经完成,尝试完成第二个
self.second.poll(wake)
}
}
这些示例展示了如何使用Future
特征来表达异步控制流,而不需要多个已分配的对象和深度嵌套的回调。通过基本的控制流程,让我们来谈谈真正的Future
特征以及它是如何不同的。
trait Future {
type Output;
fn poll(
// 注意到这个从 `&mut self` 到 `Pin<&mut Self>`的更改:
self: Pin<&mut Self>,
// 也注意从 `wake: fn()` 到 `cx: &mut Context<'_>`的更改:
cx: &mut Context<'_>,
) -> Poll<Self::Output>;
}
您将注意到的第一个更改是我们的self
类型不再&mut self
,但已更改为Pin<&mut Self>
。我们将在后面的章节中详细讨论pinning
,但现在知道它允许我们创建不可移动的Future
。不可移动的对象可以在它们的字段之间存储指针,例如struct MyFut { a: i32, ptr_to_a: *const i32 }
。此功能是启用async / await
所必需的。
其次,wake: fn()
已更改为&mut Context<'_>
。在SimpleFuture
,我们使用对函数指针(fn()
)的调用来告诉Future
的执行者应该轮询相关的Future
。但是,由于fn()
它是零大小的,因此无法存储有关哪个task
是唤醒了的数据。
在实际场景中,像Web服务器这样的复杂应用程序可能有数千个不同的连接,其唤醒应该分别进行管理。'Context' 类型通过提供对 'Waker' 类型值的访问来解决此问题,该值可用于唤醒特定任务。