Channels
通道
现在我们已经了解了一些关于Tokio并发性的知识,让我们将其应用于客户端。将我们之前编写的服务器代码放到显式二进制文件中:
mkdir src/bin
mv src/main.rs src/bin/server.rs
创建一个新的二进制文件,其中将包含客户端代码:
touch src/bin/client.rs
在该文件中,您将编写此页面的代码。当您想要运行它时,您必须首先在一个单独的终端窗口中启动服务器:
cargo run --bin server
然后是客户端, 也是单独地:
cargo run --bin client
话虽这么说,让我们开始编码吧!
假设我们要运行两个并发的Redis命令。我们可以为每个命令派生一个任务。然后这两个命令将同时执行。
首先,我们可能会尝试这样的操作:
use mini_redis::client;
#[tokio::main]
async fn main() {
// Establish a connection to the server
let mut client = client::connect("127.0.0.1:6379").await.unwrap();
// Spawn two tasks, one gets a key, the other sets a key
let t1 = tokio::spawn(async {
let res = client.get("hello").await;
});
let t2 = tokio::spawn(async {
client.set("foo", "bar".into()).await;
});
t1.await.unwrap();
t2.await.unwrap();
}
这不会编译,因为这两个任务都需要以某种方式访问客户端。由于 客户端 不实现 复制 ,如果没有一些代码来促进这种共享,它将不会进行编译。此外,Client::set获取 &mut self,这意味着需要独占访问才能调用它。我们可以为每个任务打开一个连接,但这并不理想。我们不能使用 std::sync::Mutex,因为需要在持有锁的情况下调用 .await。我们可以使用 Tokio::Sync::Mutex,但这只允许一个运行中的请求。如果客户端实现pipelining,则异步互斥会导致连接无法得到充分的利用。
消息传递
答案就是使用消息传递。该模式下会创建专门的任务来管理客户端资源,以下简称管理任务,其他任何任务请求客户端资源时,都会向管理任务发送请求,此时管理任务代替请求任务作为发送者发送请求,并将响应返回给实际的发送者——请求任务。
使用该策略时,会建立唯一连接,管理任务将独占该连接,以执行get和set命令。同时,建立的通道还起了缓冲区的作用。管理任务繁忙时,新的操作请求发送过来会进入缓冲区,待管理任务可以执行新请求时,会从缓冲区拉取下一个请求执行。这提高了管理任务的吞吐量,且可以通过扩展支持连接池。
Tokio 通道相关原语
Tokio 为不同的场景,提供了不同的 通道 实现。
- mpsc: 多生产者、单消费者通道,可以发送多条消息。
- oneshot: 单生产者、单消费者通道,仅能发送一条信息。
- broadcast: 多生产者、多消费者通道,可以发送多条信息,每个接收者都能收到这些消息。
- watch: 单生产者、多消费通道,可以发送多条消息,不会保存历史发送,全部接收者都只能看到最新的消息。
如果需要支持多生产者、多消费者,且仅有一个消费者能看到所有消息的通道,可以使用 async-channel 。还有一些在非异步环境使用的通道实现,比如 std::sync::mpsc 和 crossbeam::channel。这些实现在等待消息时会阻塞当前线程,跟异步的概念是冲突的,所以不用于异步环境。
本章节中,我们将用到 mpsc 和 oneshot。其他类型的消息传递通道后续的章节会陆续介绍。本章节的代码可以在 这里找到。
定义消息类型
通常情况下,使用消息传递时,消息需要响应的命令远不止一种。本章中,我们需要响应 GET 和 SET 命令。为此,我们首先定义一个枚举 Command ,它的每一个变体对应一种消息类型。
use bytes::Bytes;
#[derive(Debug)]
enum Command {
Get {
key: String,
},
Set {
key: String,
val: Bytes,
}
}
创建通道
在 main 函数中,创建一个 mpsc 类型的通道。
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
// 创建 mpsc 类型的通道,能容纳 32 条消息
let (tx, mut rx) = mpsc::channel(32);
// ... 稍后实现
}
这里创建的 mpsc 通道用于 发送 命令给持有 redis 连接的管理任务,类型中的多生产者意味着不同的任务可以同时发送消息,互不干扰。创建时会返回两个变量:发送者 tx 和接收者 rx,它们可以被分发到不同的任务中,用于从通道发送或接收消息,注意 rx 的类型是可变的。
通道创建时指定了容量是 32,代表可以缓存 32 个 通道指定的消息类型,这里是 Command。如果消息发送速度比接收速度快,未被接收的消息就会存入通道,当通道内消息存储达到容量上限时,发送任务就会暂停并休眠,直到通道中的消息被接收端消费。
需要发送消息时, 克隆 发送者 即可。例如:
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel(32);
let tx2 = tx.clone();
tokio::spawn(async move {
tx.send("sending from first handle").await;
});
tokio::spawn(async move {
tx2.send("sending from second handle").await;
});
while let Some(message) = rx.recv().await {
println!("GOT = {}", message);
}
}
所有的消息都会发送给唯一的 接收者。mpsc 类型的接收者是不支持 clone 操作的。
当所有的 发送者 离开了它的生命周期或是其他原因导致它被释放时,接收者 上的 接收 操作会收到 None,这意味这所有的发送者都已经释放,通道已关闭。
我们的例子中,一旦通道关闭,管理 Redis 连接的任务就知道 Redis的连接该关闭了,后续就不会再使用该连接。
创建管理任务
接下来,需要创建管理任务,它首先以 redis 客户端的身份连接到 Redis 服务器,然后启动 rx 接收命令消息,并通过 redis 连接发送给 Redis 服务端。
use mini_redis::client;
// `move` 用于将 `rx` 的所有权 **转移** 到新任务的线程中去
let manager = tokio::spawn(async move {
// 建立到 Redis 服务端的连接
let mut client = client::connect("127.0.0.1:6379").await.unwrap();
// 开始接收命令消息
while let Some(cmd) = rx.recv().await {
use Command::*;
match cmd {
Get { key } => {
client.get(&key).await;
}
Set { key, val } => {
client.set(&key, val).await;
}
}
}
});
接下来,更新两个发送任务,让它们将命令发送到消息管理通道而不是直接发送给 Redis 连接。
// `发送者` 会移动到任务中去,所以两个任务就需要两个 `发送者`
let tx2 = tx.clone();
// 创建任务:一个根据键查询值,一个添加记录到 Redis
let t1 = tokio::spawn(async move {
let cmd = Command::Get {
key: "hello".to_string(),
};
tx.send(cmd).await.unwrap();
});
let t2 = tokio::spawn(async move {
let cmd = Command::Set {
key: "foo".to_string(),
val: "bar".into(),
};
tx2.send(cmd).await.unwrap();
});
main 函数的最后,在程序退出之前,我们通过对任务句柄使用 .await 来等待任务完成。
t1.await.unwrap();
t2.await.unwrap();
manager.await.unwrap();
消息响应
最后,命令发出后,还需要接收来自消息管理任务 的响应。 GET 命令需要获取键对应的值, SET 命令需要知道操作是否成功。
为了传递响应,需要用到 oneshot 通道。 oneshot 是单生产者、单消费者的通道,特意为传递单个值的场景做了优化。在我们的例子中,这里的单个值指的是来自 Redis 服务端的消息响应。
跟 mpsc 一样, oneshot::channel() 返回一对 生产者、消费者 句柄。
use tokio::sync::oneshot;
let (tx, rx) = oneshot::channel();
跟 mpsc 不同的是,oneshot 通道无法指定容量(因为其容量总是 1 ),而且两个句柄都不支持克隆。
为了从 消息管理任务 接收到响应,oneshot 通道会在命令发送前创建,其 发送者 被包含在命令中发送到 消息管理任务,发送完成后,立即使用 接收者 接收来自后者的响应。
首先,在 Command 中加入 发送者。为了方便,我们通常定义一个类型别名来指代 发送者。
use tokio::sync::oneshot;
use bytes::Bytes;
/// 单个消息通道可以发送不同类型的命令
#[derive(Debug)]
enum Command {
Get {
key: String,
resp: Responder<Option<Bytes>>,
},
Set {
key: String,
val: Bytes,
resp: Responder<()>,
},
}
/// 由发起请求的任务创建,`消息管理任务` 用它来传递响应给发起请求的任务
type Responder<T> = oneshot::Sender<mini_redis::Result<T>>;
接下来,在请求任务中加入 oneshot::Sender.
let t1 = tokio::spawn(async move {
let (resp_tx, resp_rx) = oneshot::channel();
let cmd = Command::Get {
key: "hello".to_string(),
resp: resp_tx,
};
// 发送 GET 命令
tx.send(cmd).await.unwrap();
// 等待响应
let res = resp_rx.await;
println!("GOT = {:?}", res);
});
let t2 = tokio::spawn(async move {
let (resp_tx, resp_rx) = oneshot::channel();
let cmd = Command::Set {
key: "foo".to_string(),
val: "bar".into(),
resp: resp_tx,
};
// 发送 SET 命令
tx2.send(cmd).await.unwrap();
// 等待响应
let res = resp_rx.await;
println!("GOT = {:?}", res);
});
最后,更新 消息管理任务 ,将命令执行的响应通过 oneshot 通道返回给请求方。
while let Some(cmd) = rx.recv().await {
match cmd {
Command::Get { key, resp } => {
let res = client.get(&key).await;
// 这里忽略了错误处理
let _ = resp.send(res);
}
Command::Set { key, val, resp } => {
let res = client.set(&key, val).await;
// 这里忽略了错误处理
let _ = resp.send(res);
}
}
}
在 oneshot::Sender 上执行发送会立即完成,不需要 .await,因为 oneshot 通道的 发送 会立即成功或失败,没有任何形式的等待时间。
当接收者被释放后,在 oneshot 通道中发送值会得到 Err 类型的返回值,这表示接收者不会再接收任何响应。在我们的场景中,接收者可以停止接收消息。resp.send(...) 返回的 Err 不需要处理。
您可以在 这里找到本节的完整代码。
通道的容量和背压
任何时候引入并发和队列,都需要确保其容量限制,确保系统有加载执行它们的能力。没有容量限制的队列终将填满所有的内存,导致系统以不确定的方式崩溃。
Tokio 非常注重避免隐性队列的产生,其中很重要的机制就是:异步操作是惰性的。来看下面的例子:
loop {
async_op();
}
如果异步操作是即时的,loop 循环会在未确认操作已完成的情况下,就创建下一次 async_op 操作,这会产生一个隐性的无限队列。基于回调的系统和 实时 系统中,经常会出现这样的情况。
实际上,使用 Tokio 进行 Rust 异步编程时,如果不使用 .await,上述代码片段 不会 马上执行 async_op , 如果加上 .await,loop 循环将等待当前的 async_op 操作完成后才会继续。
loop {
// 直到 `async_op` 执行完毕才会继续执行
async_op().await;
}
并发和队列必须显示声明,可以通过以下方式:
tokio::spawnselect!join!mpsc::channel
使用它们时,必须确认执行的并发总数是有限的。比如,通过无限循环向 TCP 连接写入数据时,必须确保建立的 socket 总数是有限的;当使用 mpsc::channel 时,必须根据应用的实际情况,选择适当的容量限制。
保持关注并选取适当的容量限制是编写可靠的 Tokio 应用时不可或缺的部分。
本译文仅用于学习和交流目的,转载请务必注明文章译者、出处、和本文链接
我们的翻译工作遵照 CC 协议,如果我们的工作有侵犯到您的权益,请及时联系我们。
Tokio 中文文档
关于 LearnKu