3.4.发送消息
发送消息
现在到了实现「发送消息」部分的时候了。最显著的实现方式就是在 connection_loop
中对每个客户端的 TcpStream
执行写入操作。通过这种方式,客户端可以直接调用 .write_all
方法将消息发送给接收方。然而,这样做可能会出错:比如 Alice 发送了 bob: foo
,同时 Charley 发送了 bob: bar
,而 Bob 就可能会收到 fobaor
。这是因为通过 socket 发送一条消息可能涉及多次系统调用,所以并发执行 .write_all
可能导致接收方收到的消息顺序错乱!
一般来说,我们只需要保证有一个任务专门向每个 TcpStream
写入即可解决上述问题。
那让我们来创建一个 connection_writer_loop
任务吧,它可以通过一个管道(channel)接收消息,并将消息写入到 socket。我们也可以在该任务中进行消息序列化。最终,如果 Alice 和 Charley 同时给 Bob 发送了消息,Bob 看到消息的顺序就和发送方消息到达管道(channel)的顺序一致了。
# extern crate async_std;
# extern crate futures;
# use async_std::{
# net::TcpStream,
# prelude::*,
# };
use futures::channel::mpsc; // 1
use futures::sink::SinkExt;
use std::sync::Arc;
# type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
type Sender<T> = mpsc::UnboundedSender<T>; // 2
type Receiver<T> = mpsc::UnboundedReceiver<T>;
async fn connection_writer_loop(
mut messages: Receiver<String>,
stream: Arc<TcpStream>, // 3
) -> Result<()> {
let mut stream = &*stream;
while let Some(msg) = messages.next().await {
stream.write_all(msg.as_bytes()).await?;
}
Ok(())
}
- 我们将使用
futures
crate 中的管道(channel); - 简单起见,我们将使用
unbounded
无界管道,并且不会在该教程中介绍「背压(backpressure)」概念。 - 由于
connection_loop
和connection_writer_loop
共享了TcpStream
,我们需要把它放到Arc
中。需要注意的是,客户端只是从流中读取,而connection_writer_loop
负责向流中写入,所以这里并没有竞争存在。
原文链接:book.async.rs/
本译文仅用于学习和交流目的,转载请务必注明文章译者、出处、和本文链接
我们的翻译工作遵照 CC 协议,如果我们的工作有侵犯到您的权益,请及时联系我们。