Channels

未匹配的标注

原文链接:tokio.rs/tokio/tutorial

通道

现在我们已经了解了一些关于Tokio并发性的知识,让我们将其应用于客户端。将我们之前编写的服务器代码放到显式二进制文件中:

mkdir src/bin
mv src/main.rs src/bin/server.rs

创建一个新的二进制文件,其中将包含客户端代码:

touch src/bin/client.rs

在该文件中,您将编写此页面的代码。当您想要运行它时,您必须首先在一个单独的终端窗口中启动服务器:

cargo run --bin server

然后是客户端, 也是单独地:

cargo run --bin client

话虽这么说,让我们开始编码吧!

假设我们要运行两个并发的Redis命令。我们可以为每个命令派生一个任务。然后这两个命令将同时执行。

首先,我们可能会尝试这样的操作:

use mini_redis::client;

#[tokio::main]
async fn main() {
    // Establish a connection to the server
    let mut client = client::connect("127.0.0.1:6379").await.unwrap();

    // Spawn two tasks, one gets a key, the other sets a key
    let t1 = tokio::spawn(async {
        let res = client.get("hello").await;
    });

    let t2 = tokio::spawn(async {
        client.set("foo", "bar".into()).await;
    });

    t1.await.unwrap();
    t2.await.unwrap();
}

这不会编译,因为这两个任务都需要以某种方式访问客户端。由于 客户端 不实现 复制 ,如果没有一些代码来促进这种共享,它将不会进行编译。此外,Client::set获取 &mut self,这意味着需要独占访问才能调用它。我们可以为每个任务打开一个连接,但这并不理想。我们不能使用 std::sync::Mutex,因为需要在持有锁的情况下调用 .await。我们可以使用 Tokio::Sync::Mutex,但这只允许一个运行中的请求。如果客户端实现pipelining,则异步互斥会导致连接无法得到充分的利用。

消息传递

答案就是使用消息传递。该模式下会创建专门的任务来管理客户端资源,以下简称管理任务,其他任何任务请求客户端资源时,都会向管理任务发送请求,此时管理任务代替请求任务作为发送者发送请求,并将响应返回给实际的发送者——请求任务。

使用该策略时,会建立唯一连接,管理任务将独占该连接,以执行getset命令。同时,建立的通道还起了缓冲区的作用。管理任务繁忙时,新的操作请求发送过来会进入缓冲区,待管理任务可以执行新请求时,会从缓冲区拉取下一个请求执行。这提高了管理任务的吞吐量,且可以通过扩展支持连接池。

Tokio 通道相关原语

Tokio 为不同的场景,提供了不同的 通道 实现。

  • mpsc: 多生产者、单消费者通道,可以发送多条消息。
  • oneshot: 单生产者、单消费者通道,仅能发送一条信息。
  • broadcast: 多生产者、多消费者通道,可以发送多条信息,每个接收者都能收到这些消息。
  • watch: 单生产者、多消费通道,可以发送多条消息,不会保存历史发送,全部接收者都只能看到最新的消息。

如果需要支持多生产者、多消费者,且仅有一个消费者能看到所有消息的通道,可以使用 async-channel 。还有一些在非异步环境使用的通道实现,比如 std::sync::mpsc 和 crossbeam::channel。这些实现在等待消息时会阻塞当前线程,跟异步的概念是冲突的,所以不用于异步环境。

本章节中,我们将用到 mpsc 和 oneshot。其他类型的消息传递通道后续的章节会陆续介绍。本章节的代码可以在 这里找到。

定义消息类型

通常情况下,使用消息传递时,消息需要响应的命令远不止一种。本章中,我们需要响应 GETSET 命令。为此,我们首先定义一个枚举 Command ,它的每一个变体对应一种消息类型。

use bytes::Bytes;

#[derive(Debug)]
enum Command {
    Get {
        key: String,
    },
    Set {
        key: String,
        val: Bytes,
    }
}

创建通道

在 main 函数中,创建一个 mpsc 类型的通道。

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    // 创建 mpsc 类型的通道,能容纳 32 条消息
    let (tx, mut rx) = mpsc::channel(32);

    // ... 稍后实现
}

 这里创建的 mpsc 通道用于 发送 命令给持有 redis 连接的管理任务,类型中的多生产者意味着不同的任务可以同时发送消息,互不干扰。创建时会返回两个变量:发送者 tx 和接收者 rx,它们可以被分发到不同的任务中,用于从通道发送或接收消息,注意 rx 的类型是可变的。

通道创建时指定了容量是 32,代表可以缓存 32 个 通道指定的消息类型,这里是 Command。如果消息发送速度比接收速度快,未被接收的消息就会存入通道,当通道内消息存储达到容量上限时,发送任务就会暂停并休眠,直到通道中的消息被接收端消费。

需要发送消息时, 克隆 发送者 即可。例如:

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel(32);
    let tx2 = tx.clone();

    tokio::spawn(async move {
        tx.send("sending from first handle").await;
    });

    tokio::spawn(async move {
        tx2.send("sending from second handle").await;
    });

    while let Some(message) = rx.recv().await {
        println!("GOT = {}", message);
    }
}

所有的消息都会发送给唯一的 接收者mpsc 类型的接收者是不支持 clone 操作的

当所有的 发送者 离开了它的生命周期或是其他原因导致它被释放时,接收者 上的 接收 操作会收到 None,这意味这所有的发送者都已经释放,通道已关闭。

我们的例子中,一旦通道关闭,管理 Redis 连接的任务就知道 Redis的连接该关闭了,后续就不会再使用该连接。

创建管理任务

接下来,需要创建管理任务,它首先以 redis 客户端的身份连接到 Redis 服务器,然后启动 rx 接收命令消息,并通过 redis 连接发送给 Redis 服务端。

use mini_redis::client;
//  `move` 用于将 `rx` 的所有权 **转移** 到新任务的线程中去
let manager = tokio::spawn(async move {
    // 建立到 Redis 服务端的连接
    let mut client = client::connect("127.0.0.1:6379").await.unwrap();

    // 开始接收命令消息
    while let Some(cmd) = rx.recv().await {
        use Command::*;

        match cmd {
            Get { key } => {
                client.get(&key).await;
            }
            Set { key, val } => {
                client.set(&key, val).await;
            }
        }
    }
});

接下来,更新两个发送任务,让它们将命令发送到消息管理通道而不是直接发送给 Redis 连接。

//  `发送者` 会移动到任务中去,所以两个任务就需要两个 `发送者`
let tx2 = tx.clone();

// 创建任务:一个根据键查询值,一个添加记录到 Redis
let t1 = tokio::spawn(async move {
    let cmd = Command::Get {
        key: "hello".to_string(),
    };

    tx.send(cmd).await.unwrap();
});

let t2 = tokio::spawn(async move {
    let cmd = Command::Set {
        key: "foo".to_string(),
        val: "bar".into(),
    };

    tx2.send(cmd).await.unwrap();
});

 main 函数的最后,在程序退出之前,我们通过对任务句柄使用 .await 来等待任务完成。

t1.await.unwrap();
t2.await.unwrap();
manager.await.unwrap();

消息响应

最后,命令发出后,还需要接收来自消息管理任务 的响应。 GET 命令需要获取键对应的值, SET 命令需要知道操作是否成功。

为了传递响应,需要用到 oneshot 通道。 oneshot 是单生产者、单消费者的通道,特意为传递单个值的场景做了优化。在我们的例子中,这里的单个值指的是来自 Redis 服务端的消息响应。

跟 mpsc 一样, oneshot::channel() 返回一对 生产者、消费者 句柄。

use tokio::sync::oneshot;

let (tx, rx) = oneshot::channel();

跟 mpsc 不同的是,oneshot 通道无法指定容量(因为其容量总是 1 ),而且两个句柄都不支持克隆。

为了从 消息管理任务 接收到响应,oneshot 通道会在命令发送前创建,其 发送者 被包含在命令中发送到 消息管理任务,发送完成后,立即使用 接收者 接收来自后者的响应。

首先,在 Command 中加入 发送者。为了方便,我们通常定义一个类型别名来指代 发送者

use tokio::sync::oneshot;
use bytes::Bytes;

/// 单个消息通道可以发送不同类型的命令
#[derive(Debug)]
enum Command {
    Get {
        key: String,
        resp: Responder<Option<Bytes>>,
    },
    Set {
        key: String,
        val: Bytes,
        resp: Responder<()>,
    },
}

/// 由发起请求的任务创建,`消息管理任务` 用它来传递响应给发起请求的任务
type Responder<T> = oneshot::Sender<mini_redis::Result<T>>;

接下来,在请求任务中加入 oneshot::Sender.

let t1 = tokio::spawn(async move {
    let (resp_tx, resp_rx) = oneshot::channel();
    let cmd = Command::Get {
        key: "hello".to_string(),
        resp: resp_tx,
    };

    // 发送 GET 命令
    tx.send(cmd).await.unwrap();

    // 等待响应
    let res = resp_rx.await;
    println!("GOT = {:?}", res);
});

let t2 = tokio::spawn(async move {
    let (resp_tx, resp_rx) = oneshot::channel();
    let cmd = Command::Set {
        key: "foo".to_string(),
        val: "bar".into(),
        resp: resp_tx,
    };

    // 发送 SET 命令
    tx2.send(cmd).await.unwrap();

    // 等待响应
    let res = resp_rx.await;
    println!("GOT = {:?}", res);
});

最后,更新 消息管理任务 ,将命令执行的响应通过  oneshot 通道返回给请求方。

while let Some(cmd) = rx.recv().await {
    match cmd {
        Command::Get { key, resp } => {
            let res = client.get(&key).await;
            // 这里忽略了错误处理
            let _ = resp.send(res);
        }
        Command::Set { key, val, resp } => {
            let res = client.set(&key, val).await;
            // 这里忽略了错误处理
            let _ = resp.send(res);
        }
    }
}

在 oneshot::Sender 上执行发送会立即完成,不需要 .await,因为 oneshot 通道的 发送  会立即成功或失败,没有任何形式的等待时间。

当接收者被释放后,在 oneshot 通道中发送值会得到 Err 类型的返回值,这表示接收者不会再接收任何响应。在我们的场景中,接收者可以停止接收消息。resp.send(...) 返回的 Err 不需要处理。

您可以在 这里找到本节的完整代码。

通道的容量和背压

任何时候引入并发和队列,都需要确保其容量限制,确保系统有加载执行它们的能力。没有容量限制的队列终将填满所有的内存,导致系统以不确定的方式崩溃。

Tokio 非常注重避免隐性队列的产生,其中很重要的机制就是:异步操作是惰性的。来看下面的例子:

loop {
    async_op();
}

如果异步操作是即时的,loop 循环会在未确认操作已完成的情况下,就创建下一次 async_op 操作,这会产生一个隐性的无限队列。基于回调的系统和 实时 系统中,经常会出现这样的情况。

实际上,使用 Tokio 进行 Rust 异步编程时,如果不使用 .await,上述代码片段 不会 马上执行 async_op , 如果加上 .await,loop 循环将等待当前的 async_op 操作完成后才会继续。

loop {
    // 直到 `async_op` 执行完毕才会继续执行
    async_op().await;
}

并发和队列必须显示声明,可以通过以下方式:

  • tokio::spawn
  • select!
  • join!
  • mpsc::channel

使用它们时,必须确认执行的并发总数是有限的。比如,通过无限循环向 TCP 连接写入数据时,必须确保建立的 socket 总数是有限的;当使用 mpsc::channel 时,必须根据应用的实际情况,选择适当的容量限制。

保持关注并选取适当的容量限制是编写可靠的 Tokio 应用时不可或缺的部分。

本文章首发在 LearnKu.com 网站上。

本译文仅用于学习和交流目的,转载请务必注明文章译者、出处、和本文链接
我们的翻译工作遵照 CC 协议,如果我们的工作有侵犯到您的权益,请及时联系我们。

原文地址:https://learnku.com/docs/tokio-doc/chann...

译文地址:https://learnku.com/docs/tokio-doc/chann...

上一篇 下一篇
贡献者:5
讨论数量: 0
发起讨论 只看当前版本


暂无话题~