Channels
通道
现在我们已经了解了一些关于Tokio并发性的知识,让我们将其应用于客户端。将我们之前编写的服务器代码放到显式二进制文件中:
mkdir src/bin
mv src/main.rs src/bin/server.rs
创建一个新的二进制文件,其中将包含客户端代码:
touch src/bin/client.rs
在该文件中,您将编写此页面的代码。当您想要运行它时,您必须首先在一个单独的终端窗口中启动服务器:
cargo run --bin server
然后是客户端, 也是单独地:
cargo run --bin client
话虽这么说,让我们开始编码吧!
假设我们要运行两个并发的Redis命令。我们可以为每个命令派生一个任务。然后这两个命令将同时执行。
首先,我们可能会尝试这样的操作:
use mini_redis::client;
#[tokio::main]
async fn main() {
// Establish a connection to the server
let mut client = client::connect("127.0.0.1:6379").await.unwrap();
// Spawn two tasks, one gets a key, the other sets a key
let t1 = tokio::spawn(async {
let res = client.get("hello").await;
});
let t2 = tokio::spawn(async {
client.set("foo", "bar".into()).await;
});
t1.await.unwrap();
t2.await.unwrap();
}
这不会编译,因为这两个任务都需要以某种方式访问客户端。由于 客户端
不实现 复制
,如果没有一些代码来促进这种共享,它将不会进行编译。此外,Client::set
获取 &mut self
,这意味着需要独占访问才能调用它。我们可以为每个任务打开一个连接,但这并不理想。我们不能使用 std::sync::Mutex
,因为需要在持有锁的情况下调用 .await
。我们可以使用 Tokio::Sync::Mutex
,但这只允许一个运行中的请求。如果客户端实现pipelining,则异步互斥会导致连接无法得到充分的利用。
消息传递
答案就是使用消息传递。该模式下会创建专门的任务来管理客户端资源,以下简称管理任务
,其他任何任务请求客户端资源时,都会向管理任务发送请求,此时管理任务
代替请求任务作为发送者发送请求,并将响应返回给实际的发送者——请求任务。
使用该策略时,会建立唯一连接,管理任务
将独占该连接,以执行get
和set
命令。同时,建立的通道还起了缓冲区的作用。管理任务
繁忙时,新的操作请求发送过来会进入缓冲区,待管理任务
可以执行新请求时,会从缓冲区拉取下一个请求执行。这提高了管理任务
的吞吐量,且可以通过扩展支持连接池。
Tokio 通道相关原语
Tokio 为不同的场景,提供了不同的 通道 实现。
- mpsc: 多生产者、单消费者通道,可以发送多条消息。
- oneshot: 单生产者、单消费者通道,仅能发送一条信息。
- broadcast: 多生产者、多消费者通道,可以发送多条信息,每个接收者都能收到这些消息。
- watch: 单生产者、多消费通道,可以发送多条消息,不会保存历史发送,全部接收者都只能看到最新的消息。
如果需要支持多生产者、多消费者,且仅有一个消费者能看到所有消息的通道,可以使用 async-channel
。还有一些在非异步环境使用的通道实现,比如 std::sync::mpsc
和 crossbeam::channel
。这些实现在等待消息时会阻塞当前线程,跟异步的概念是冲突的,所以不用于异步环境。
本章节中,我们将用到 mpsc 和 oneshot。其他类型的消息传递通道后续的章节会陆续介绍。本章节的代码可以在 这里找到。
定义消息类型
通常情况下,使用消息传递时,消息需要响应的命令远不止一种。本章中,我们需要响应 GET
和 SET
命令。为此,我们首先定义一个枚举 Command
,它的每一个变体对应一种消息类型。
use bytes::Bytes;
#[derive(Debug)]
enum Command {
Get {
key: String,
},
Set {
key: String,
val: Bytes,
}
}
创建通道
在 main
函数中,创建一个 mpsc
类型的通道。
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
// 创建 mpsc 类型的通道,能容纳 32 条消息
let (tx, mut rx) = mpsc::channel(32);
// ... 稍后实现
}
这里创建的 mpsc
通道用于 发送 命令给持有 redis 连接的管理任务,类型中的多生产者意味着不同的任务可以同时发送消息,互不干扰。创建时会返回两个变量:发送者 tx
和接收者 rx
,它们可以被分发到不同的任务中,用于从通道发送或接收消息,注意 rx 的类型是可变的。
通道创建时指定了容量是 32,代表可以缓存 32 个 通道指定的消息类型,这里是 Command。如果消息发送速度比接收速度快,未被接收的消息就会存入通道,当通道内消息存储达到容量上限时,发送任务就会暂停并休眠,直到通道中的消息被接收端消费。
需要发送消息时, 克隆 发送者
即可。例如:
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel(32);
let tx2 = tx.clone();
tokio::spawn(async move {
tx.send("sending from first handle").await;
});
tokio::spawn(async move {
tx2.send("sending from second handle").await;
});
while let Some(message) = rx.recv().await {
println!("GOT = {}", message);
}
}
所有的消息都会发送给唯一的 接收者
。mpsc
类型的接收者是不支持 clone 操作的。
当所有的 发送者
离开了它的生命周期或是其他原因导致它被释放时,接收者
上的 接收
操作会收到 None
,这意味这所有的发送者都已经释放,通道已关闭。
我们的例子中,一旦通道关闭,管理 Redis 连接的任务就知道 Redis的连接该关闭了,后续就不会再使用该连接。
创建管理任务
接下来,需要创建管理任务,它首先以 redis 客户端的身份连接到 Redis 服务器,然后启动 rx
接收命令消息,并通过 redis 连接发送给 Redis 服务端。
use mini_redis::client;
// `move` 用于将 `rx` 的所有权 **转移** 到新任务的线程中去
let manager = tokio::spawn(async move {
// 建立到 Redis 服务端的连接
let mut client = client::connect("127.0.0.1:6379").await.unwrap();
// 开始接收命令消息
while let Some(cmd) = rx.recv().await {
use Command::*;
match cmd {
Get { key } => {
client.get(&key).await;
}
Set { key, val } => {
client.set(&key, val).await;
}
}
}
});
接下来,更新两个发送任务,让它们将命令发送到消息管理通道而不是直接发送给 Redis 连接。
// `发送者` 会移动到任务中去,所以两个任务就需要两个 `发送者`
let tx2 = tx.clone();
// 创建任务:一个根据键查询值,一个添加记录到 Redis
let t1 = tokio::spawn(async move {
let cmd = Command::Get {
key: "hello".to_string(),
};
tx.send(cmd).await.unwrap();
});
let t2 = tokio::spawn(async move {
let cmd = Command::Set {
key: "foo".to_string(),
val: "bar".into(),
};
tx2.send(cmd).await.unwrap();
});
main
函数的最后,在程序退出之前,我们通过对任务句柄使用 .await
来等待任务完成。
t1.await.unwrap();
t2.await.unwrap();
manager.await.unwrap();
消息响应
最后,命令发出后,还需要接收来自消息管理任务
的响应。 GET
命令需要获取键对应的值, SET
命令需要知道操作是否成功。
为了传递响应,需要用到 oneshot
通道。 oneshot
是单生产者、单消费者的通道,特意为传递单个值的场景做了优化。在我们的例子中,这里的单个值指的是来自 Redis 服务端的消息响应。
跟 mpsc
一样, oneshot::channel()
返回一对 生产者、消费者
句柄。
use tokio::sync::oneshot;
let (tx, rx) = oneshot::channel();
跟 mpsc
不同的是,oneshot
通道无法指定容量(因为其容量总是 1 ),而且两个句柄都不支持克隆。
为了从 消息管理任务
接收到响应,oneshot
通道会在命令发送前创建,其 发送者
被包含在命令中发送到 消息管理任务
,发送完成后,立即使用 接收者
接收来自后者的响应。
首先,在 Command
中加入 发送者
。为了方便,我们通常定义一个类型别名来指代 发送者
。
use tokio::sync::oneshot;
use bytes::Bytes;
/// 单个消息通道可以发送不同类型的命令
#[derive(Debug)]
enum Command {
Get {
key: String,
resp: Responder<Option<Bytes>>,
},
Set {
key: String,
val: Bytes,
resp: Responder<()>,
},
}
/// 由发起请求的任务创建,`消息管理任务` 用它来传递响应给发起请求的任务
type Responder<T> = oneshot::Sender<mini_redis::Result<T>>;
接下来,在请求任务中加入 oneshot::Sender
.
let t1 = tokio::spawn(async move {
let (resp_tx, resp_rx) = oneshot::channel();
let cmd = Command::Get {
key: "hello".to_string(),
resp: resp_tx,
};
// 发送 GET 命令
tx.send(cmd).await.unwrap();
// 等待响应
let res = resp_rx.await;
println!("GOT = {:?}", res);
});
let t2 = tokio::spawn(async move {
let (resp_tx, resp_rx) = oneshot::channel();
let cmd = Command::Set {
key: "foo".to_string(),
val: "bar".into(),
resp: resp_tx,
};
// 发送 SET 命令
tx2.send(cmd).await.unwrap();
// 等待响应
let res = resp_rx.await;
println!("GOT = {:?}", res);
});
最后,更新 消息管理任务
,将命令执行的响应通过 oneshot
通道返回给请求方。
while let Some(cmd) = rx.recv().await {
match cmd {
Command::Get { key, resp } => {
let res = client.get(&key).await;
// 这里忽略了错误处理
let _ = resp.send(res);
}
Command::Set { key, val, resp } => {
let res = client.set(&key, val).await;
// 这里忽略了错误处理
let _ = resp.send(res);
}
}
}
在 oneshot::Sender
上执行发送会立即完成,不需要 .await
,因为 oneshot
通道的 发送
会立即成功或失败,没有任何形式的等待时间。
当接收者被释放后,在 oneshot
通道中发送值会得到 Err
类型的返回值,这表示接收者不会再接收任何响应。在我们的场景中,接收者可以停止接收消息。resp.send(...)
返回的 Err
不需要处理。
您可以在 这里找到本节的完整代码。
通道的容量和背压
任何时候引入并发和队列,都需要确保其容量限制,确保系统有加载执行它们的能力。没有容量限制的队列终将填满所有的内存,导致系统以不确定的方式崩溃。
Tokio 非常注重避免隐性队列的产生,其中很重要的机制就是:异步操作是惰性的。来看下面的例子:
loop {
async_op();
}
如果异步操作是即时的,loop 循环会在未确认操作已完成的情况下,就创建下一次 async_op
操作,这会产生一个隐性的无限队列。基于回调的系统和 实时 系统中,经常会出现这样的情况。
实际上,使用 Tokio 进行 Rust 异步编程时,如果不使用 .await
,上述代码片段 不会 马上执行 async_op
, 如果加上 .await
,loop 循环将等待当前的 async_op
操作完成后才会继续。
loop {
// 直到 `async_op` 执行完毕才会继续执行
async_op().await;
}
并发和队列必须显示声明,可以通过以下方式:
tokio::spawn
select!
join!
mpsc::channel
使用它们时,必须确认执行的并发总数是有限的。比如,通过无限循环向 TCP 连接写入数据时,必须确保建立的 socket 总数是有限的;当使用 mpsc::channel
时,必须根据应用的实际情况,选择适当的容量限制。
保持关注并选取适当的容量限制是编写可靠的 Tokio 应用时不可或缺的部分。
本译文仅用于学习和交流目的,转载请务必注明文章译者、出处、和本文链接
我们的翻译工作遵照 CC 协议,如果我们的工作有侵犯到您的权益,请及时联系我们。