Spawning

未匹配的标注

原文链接:tokio.rs/tokio/tutorial

Spawning

我们要做切换了,开始实现 Redis server 。

首先将上一节的 SET/GET 代码移动到示例文件中。 这样我们就可以针对我们自己的 server 来运行它。

$ mkdir -p examples
$ mv src/main.rs examples/hello-redis.rs

然后创建一个新的空白 src/main.rs 文件并继续。

接收 sockets

我们的Redis服务器需要做的第一件事是接收入站TCP套接字请求。这里可以通过   tokio::net::TcpListener 来实现。

Tokio中的许多类型和在Rust标准库中的同步实现命名相同。您应该可以感觉到,Tokio公开与std具有相同的API,但使用async fn

我们创建一个TcpListener 并绑定到 6379端口, 然后在 loop 循环中接收套接字请求, 每个请求处理完之后关闭。 现在,我们将读取命令,将其打印到标准输出并返回一个错误响应。

use tokio::net::{TcpListener, TcpStream};
use mini_redis::{Connection, Frame};

#[tokio::main]
async fn main() {
    // 将listener绑定到指定地址端口
    let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();

    loop {
        // 第二项包含新连接的IP和端口
        let (socket, _) = listener.accept().await.unwrap();
        process(socket).await;
    }
}

async fn process(socket: TcpStream) {
    // `Connection` 允许我们读取/写入 Redis **帧** 而不是字节流
    // `Connection` 类型由 mini-redis 定义。
    let mut connection = Connection::new(socket);

    if let Some(frame) = connection.read_frame().await.unwrap() {
        println!("GOT: {:?}", frame);

        // 返回一个错误
        let response = Frame::Error("unimplemented".to_string());
        connection.write_frame(&response).await.unwrap();
    }
}

现在,我们来运行代码:

$ cargo run

在新的终端窗口中, 运行 hello-redis 示例 (上一节中的 SET/GET 命令):

$ cargo run --example hello-redis

输出应该是:

Error: "unimplemented"

在服务端终端中,输出应该是:

GOT: Array([Bulk(b"set"), Bulk(b"hello"), Bulk(b"world")])

并发

我们的服务器有个小问题 (除了只会输出错误之外)。它一次只能处理一个入站请求。当一个连接被接受时,服务器会停留在接收的循环块中,直到响应完全写入套接字。

我们希望我们的 Redis 服务器能够处理 更多的 并发请求。 为此,我们还需要做一些改造。

并发和并行不是一回事。如果您在两个任务之间交替,那么您可以并发处理这两个任务,但不是并行的。要使其符合并行条件,您需要两个人,一个人专门负责每个任务。

使用Tokio的优点之一就是异步,这可以允许您同时处理许多任务,而不必使用普通线程并行处理它们。事实上,Tokio可以在单个线程上同时运行许多任务!

要同时处理连接,就要为每个入站连接创建一个新的 task。该连接在该task上处理。

这样循环接收内部就变成了这样:

use tokio::net::TcpListener;

#[tokio::main]
async fn main() {
    let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();

    loop {
        let (socket, _) = listener.accept().await.unwrap();
        // 为每个入站套接字生成一个新任务。 
        // 套接字被移动到新任务并在那里进行处理。
        tokio::spawn(async move {
            process(socket).await;
        });
    }
}

Tasks(任务)

一个Tokio 任务就是一个异步绿色线程。它们是通过将一个async块传递给tokio::spawn来创建的。tokio::spawn函数返回一个JoinHandle,调用者可以使用它与生成的任务进行交互。async块可能有一个返回值。调用者可以使用JoinHandle上的.await获取返回值。

比如:

#[tokio::main]
async fn main() {
    let handle = tokio::spawn(async {
        // 做一些异步工作
        "return value"
    });

    // 做一些其他工作

    let out = handle.await.unwrap();
    println!("GOT {}", out);
}

等待JoinHandle返回一个Result。当任务在执行过程中遇到错误时,JoinHandle将返回Err。当任务恐慌或运行时强制取消取消任务时,就会发生这种情况。

任务是由调度程序管理的执行单元。生成任务将其提交给Tokio调度程序,然后该调度程序确保任务在有工作要做时执行。生成的任务可以在生成它的同一线程上执行,也可以在不同的运行时线程上执行。任务也可以在生成后在线程之间移动。

Tokio中的任务非常轻量。在底层,它们只需要一个分配64字节的内存。应用程序应该可以随意生成数千个甚至数百万个任务。

'static bound

当您在Tokio运行时生成任务时,其类型的生命周期必须是'static。这意味着生成的任务不得包含对任务外部拥有的数据的任何引用。

一个常见的误解是'static总是意味着“永生”,但事实并非如此。 仅仅因为一个值是'static并不意味着你有内存泄漏。 你可以通过 常见的 Rust 生命周期误解查看更多。

比如,下面的代码就无法编译:

use tokio::task;

#[tokio::main]
async fn main() {
    let v = vec![1, 2, 3];

    task::spawn(async {
        println!("Here's a vec: {:?}", v);
    });
}

尝试编译会报下面的错误:

error[E0373]: async block may outlive the current function, but
              it borrows `v`, which is owned by the current function
 --> src/main.rs:7:23
  |
7 |       task::spawn(async {
  |  _______________________^
8 | |         println!("Here's a vec: {:?}", v);
  | |                                        - `v` is borrowed here
9 | |     });
  | |_____^ may outlive borrowed value `v`
  |
note: function requires argument type to outlive `'static`
 --> src/main.rs:7:17
  |
7 |       task::spawn(async {
  |  _________________^
8 | |         println!("Here's a vector: {:?}", v);
9 | |     });
  | |_____^
help: to force the async block to take ownership of `v` (and any other
      referenced variables), use the `move` keyword
  |
7 |     task::spawn(async move {
8 |         println!("Here's a vec: {:?}", v);
9 |     });
  |

发生这种情况是因为默认情况下,变量不会移动到async代码块中。v向量仍然属于main函数。println!这一行借用了v。rust编译器向我们解释了这一点,甚至建议修复!将第7行更改为task::spawn(async move { 将指示编译器将移动v到生成的任务中。现在,任务拥有其所有数据,使其'static

如果必须同时从多个任务访问单个数据,则必须使用同步原语(例如Arc)共享它。

请注意,错误消息还提到函数参数类型要比'static生命周期长。这个术语可能容易让你凌乱,因为'static生命周期持续到程序结束,所以如果要比它还长,难道不会有内存泄漏吗?实际上它是类型,而不是必须比'static生命周期长,并且该值可能会在其类型不再有效之前被销毁。

当我们说一个值是 静态 的时候,这意味着永远保留这个值并不是不正确的。这一点很重要,因为编译器无法推断新生的任务会持续多长时间,因此确保任务不会生存太长时间的唯一方式就是确保它永远存在。

前面的信息框链接到的文章使用术语 "受 '静态' 约束",而不是 "它的类型比 '静态' 更长久" 或 "值是 '静态' 来指代 T: '静态。这些都意味相同的事情,但与采用 &'static T 中的 "用 'static注释" 不同。

Send 绑定

tokio::spawn 产生的任务必须实现 Send。这允许Tokio运行时在任务被 .await 挂起时,可以在线程间移动任务。

当任务中跨 .await 调用持有的所有数据都实现 Send 时,这些任务就是 Send 的。这点很微妙。当调用 .await 的时候,任务会让步给调度器。下一次任务执行的时候,它会从上次暂停的点起继续执行。为了实现这一点,任务必须保存所有在 .await 之后使用的状态信息。如果这个状态信息实现了 Send,即可以在线程间移动,那么该任务本身就可以在线程间移动。反之,如果状态信息不实现 Send,那么任务本身也不具备 Send 属性。

例如,这个是正确的:

use tokio::task::yield_now;
use std::rc::Rc;

#[tokio::main]
async fn main() {
    tokio::spawn(async {
        // The scope forces `rc` to drop before `.await`.
        {
            let rc = Rc::new("hello");
            println!("{}", rc);
        }

        // `rc` is no longer used. It is **not** persisted when
        // the task yields to the scheduler
        yield_now().await;
    });
}

这个是不正确的:

use tokio::task::yield_now;
use std::rc::Rc;

#[tokio::main]
async fn main() {
    tokio::spawn(async {
        let rc = Rc::new("hello");

        // `rc` is used after `.await`. It must be persisted to
        // the task's state.
        yield_now().await;

        println!("{}", rc);
    });
}

尝试编译代码片段会导致:

error: future cannot be sent between threads safely
   --> src/main.rs:6:5
    |
6   |     tokio::spawn(async {
    |     ^^^^^^^^^^^^ future created by async block is not `Send`
    | 
   ::: [..]spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ---- required by this bound in
    |                          `tokio::task::spawn::spawn`
    |
    = help: within `impl std::future::Future`, the trait
    |       `std::marker::Send` is not  implemented for
    |       `std::rc::Rc<&str>`
note: future is not `Send` as this value is used across an await
   --> src/main.rs:10:9
    |
7   |         let rc = Rc::new("hello");
    |             -- has type `std::rc::Rc<&str>` which is not `Send`
...
10  |         yield_now().await;
    |         ^^^^^^^^^^^^^^^^^ await occurs here, with `rc` maybe
    |                           used later
11  |         println!("{}", rc);
12  |     });
    |     - `rc` is later dropped here

我们将更深入地讨论这种错误的一个特殊案例 在下一章节.

储存值

我们现在将会实现 process 函数来处理传入的命令。我们将用一个 HashMap 来储存值。SET 命令将会向 HashMap 插入值而 GET 将会加载值。除此之外,我们将会使用一个循环来接受每次连接的多个指令。

use tokio::net::TcpStream;
use mini_redis::{Connection, Frame};

async fn process(socket: TcpStream) {
    use mini_redis::Command::{self, Get, Set};
    use std::collections::HashMap;

    // A hashmap is used to store data
    let mut db = HashMap::new();

    // Connection, provided by `mini-redis`, handles parsing frames from
    // the socket
    let mut connection = Connection::new(socket);

    // Use `read_frame` to receive a command from the connection.
    while let Some(frame) = connection.read_frame().await.unwrap() {
        let response = match Command::from_frame(frame).unwrap() {
            Set(cmd) => {
                // The value is stored as `Vec<u8>`
                db.insert(cmd.key().to_string(), cmd.value().to_vec());
                Frame::Simple("OK".to_string())
            }
            Get(cmd) => {
                if let Some(value) = db.get(cmd.key()) {
                    // `Frame::Bulk` expects data to be of type `Bytes`. This
                    // type will be covered later in the tutorial. For now,
                    // `&Vec<u8>` is converted to `Bytes` using `into()`.
                    Frame::Bulk(value.clone().into())
                } else {
                    Frame::Null
                }
            }
            cmd => panic!("unimplemented {:?}", cmd),
        };

        // Write the response to the client
        connection.write_frame(&response).await.unwrap();
    }
}

现在,来启动服务:

$ cargo run

并在一个单独的终端窗口,运行 hello-redis 示例:

$ cargo run --example hello-redis

现在的输出将会是:

got value from the server; result=Some(b"world")

我们现在可以获取和设置值,但是有个问题:连接之间没有共享值。如果另一个套接字连接并试图通过 GET 获取 hello 这个key,它将啥也查不到。

你可以找到完整的代码 这里.

在下一个章节,我们将会为所有的套接字实现数据的持久化。

本文章首发在 LearnKu.com 网站上。

本译文仅用于学习和交流目的,转载请务必注明文章译者、出处、和本文链接
我们的翻译工作遵照 CC 协议,如果我们的工作有侵犯到您的权益,请及时联系我们。

原文地址:https://learnku.com/docs/tokio-doc/spawn...

译文地址:https://learnku.com/docs/tokio-doc/spawn...

上一篇 下一篇
贡献者:3
讨论数量: 0
发起讨论 只看当前版本


暂无话题~