5.1. Stream特质

未匹配的标注

Stream特质

Stream特质类似于Future,但可以在完成之前得到多个值,类似于标准库的Iterator特质:

trait Stream {
    /// The type of value yielded by the stream.
    type Item;

    /// Attempt to resolve the next item in the stream.
    /// Returns `Poll::Pending` if not ready, `Poll::Ready(Some(x))` if a value
    /// is ready, and `Poll::Ready(None)` if the stream has completed.
    fn poll_next(self: Pin<&mut Self>, lw: &LocalWaker)
        -> Poll<Option<Self::Item>>;
}

Stream的一个常见的例子是 来自futures箱子的Receiver通道类型。每次从Sender端发送一个值时它都会产生某个值,并且一旦Sender端被删除,它就会产生None并且收到端暂停所有消息接收:

use futures::channel::mpsc;
use futures::prelude::*;

let fut = async {
    let (tx, rx) = mpsc::channel(BUFFER_SIZE);
    await!(tx.send(1)).unwrap();
    await!(tx.send(2)).unwrap();
    drop(tx);

    // `StreamExt::next` is similar to `Iterator::next`, but returns a
    // type that implements `Future<Output = Option<T>>`.
    assert_eq!(Some(1), await!(rx.next()));
    assert_eq!(Some(2), await!(rx.next()));
    assert_eq!(None, await!(rx.next()));
};

模式:迭代和并发

与同步Iterators 类似,有许多不同的方法可以迭代和处理Stream中的值。有组合子式的方法,如mapfilterfold,和try_maptry_filtertry_fold

不幸的是,for循环不能用于Streams,但是对于命令式代码,while letfor_each可用:

use futures::prelude::*;

let fut = async {
    let mut stream: impl Stream<Item = Result<i32, io::Error>> = ...;

    // processing with `try_for_each`:
    await!(stream.try_for_each(async |item| {
        // handle `item`
        Ok(())
    }))?;

    // processing with `while let`:
    while let Some(item) = await!(stream.try_next())? {
        // handle `item`
    }

    ...

    Ok(())
};

但是,如果我们一次只处理一个元素,那么我们可能会失去并发机会,毕竟,这就是我们为什么要编写异步代码的原因。要同时处理流中的多个项,请使用for_each_concurrenttry_for_each_concurrent 方法:

use futures::prelude::*;

let fut = async {
    let mut stream: impl Stream<Item = Result<i32, io::Error>> = ...;

    await!(stream.try_for_each_concurrent(MAX_CONCURRENT_JUMPERS, async |num| {
        await!(jump_n_times(num))?;
        await!(report_jumps(num))?;
        Ok(())
    })?;

    ...
    Ok(())
};

这种方法允许最多MAX_CONCURRENT_JUMPERS一次跳转(或对项执行任何操作,就此而言 - API并不严格依赖于跳跃)。如果您希望一次允许无限数量的操作,您可以使用None而不是MAX_CONCURRENT_...,但要注意,如果stream来自不受信任的用户输入,这可能允许不端的客户端行为使系统同时请求过多而过载。

本文章首发在 LearnKu.com 网站上。

上一篇 下一篇
讨论数量: 0
发起讨论 只看当前版本


暂无话题~