Spawning
Spawning#
我们要做切换了,开始实现 Redis server 。
首先将上一节的 SET
/GET
代码移动到示例文件中。 这样我们就可以针对我们自己的 server 来运行它。
$ mkdir -p examples
$ mv src/main.rs examples/hello-redis.rs
然后创建一个新的空白 src/main.rs
文件并继续。
接收 sockets#
我们的 Redis 服务器需要做的第一件事是接收入站 TCP 套接字请求。这里可以通过 tokio::net::TcpListener
来实现。
Tokio 中的许多类型和在 Rust 标准库中的同步实现命名相同。您应该可以感觉到,Tokio 公开与
std
具有相同的 API,但使用async fn
。
我们创建一个 TcpListener
并绑定到 6379 端口, 然后在 loop 循环中接收套接字请求, 每个请求处理完之后关闭。 现在,我们将读取命令,将其打印到标准输出并返回一个错误响应。
use tokio::net::{TcpListener, TcpStream};
use mini_redis::{Connection, Frame};
#[tokio::main]
async fn main() {
// 将listener绑定到指定地址端口
let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();
loop {
// 第二项包含新连接的IP和端口
let (socket, _) = listener.accept().await.unwrap();
process(socket).await;
}
}
async fn process(socket: TcpStream) {
// `Connection` 允许我们读取/写入 Redis **帧** 而不是字节流
// `Connection` 类型由 mini-redis 定义。
let mut connection = Connection::new(socket);
if let Some(frame) = connection.read_frame().await.unwrap() {
println!("GOT: {:?}", frame);
// 返回一个错误
let response = Frame::Error("unimplemented".to_string());
connection.write_frame(&response).await.unwrap();
}
}
现在,我们来运行代码:
$ cargo run
在新的终端窗口中, 运行 hello-redis
示例 (上一节中的 SET
/GET
命令):
$ cargo run --example hello-redis
输出应该是:
Error: "unimplemented"
在服务端终端中,输出应该是:
GOT: Array([Bulk(b"set"), Bulk(b"hello"), Bulk(b"world")])
并发#
我们的服务器有个小问题 (除了只会输出错误之外)。它一次只能处理一个入站请求。当一个连接被接受时,服务器会停留在接收的循环块中,直到响应完全写入套接字。
我们希望我们的 Redis 服务器能够处理 更多的 并发请求。 为此,我们还需要做一些改造。
并发和并行不是一回事。如果您在两个任务之间交替,那么您可以并发处理这两个任务,但不是并行的。要使其符合并行条件,您需要两个人,一个人专门负责每个任务。
使用 Tokio 的优点之一就是异步,这可以允许您同时处理许多任务,而不必使用普通线程并行处理它们。事实上,Tokio 可以在单个线程上同时运行许多任务!
要同时处理连接,就要为每个入站连接创建一个新的 task。该连接在该 task 上处理。
这样循环接收内部就变成了这样:
use tokio::net::TcpListener;
#[tokio::main]
async fn main() {
let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();
loop {
let (socket, _) = listener.accept().await.unwrap();
// 为每个入站套接字生成一个新任务。
// 套接字被移动到新任务并在那里进行处理。
tokio::spawn(async move {
process(socket).await;
});
}
}
Tasks(任务)#
一个 Tokio 任务就是一个异步绿色线程。它们是通过将一个 async
块传递给 tokio::spawn
来创建的。tokio::spawn
函数返回一个 JoinHandle
,调用者可以使用它与生成的任务进行交互。async
块可能有一个返回值。调用者可以使用 JoinHandle
上的.await
获取返回值。
比如:
#[tokio::main]
async fn main() {
let handle = tokio::spawn(async {
// 做一些异步工作
"return value"
});
// 做一些其他工作
let out = handle.await.unwrap();
println!("GOT {}", out);
}
等待 JoinHandle
返回一个 Result
。当任务在执行过程中遇到错误时,JoinHandle
将返回 Err
。当任务恐慌或运行时强制取消取消任务时,就会发生这种情况。
任务是由调度程序管理的执行单元。生成任务将其提交给 Tokio 调度程序,然后该调度程序确保任务在有工作要做时执行。生成的任务可以在生成它的同一线程上执行,也可以在不同的运行时线程上执行。任务也可以在生成后在线程之间移动。
Tokio 中的任务非常轻量。在底层,它们只需要一个分配 64 字节的内存。应用程序应该可以随意生成数千个甚至数百万个任务。
'static
bound#
当您在 Tokio 运行时生成任务时,其类型的生命周期必须是'static
。这意味着生成的任务不得包含对任务外部拥有的数据的任何引用。
一个常见的误解是
'static
总是意味着 “永生”,但事实并非如此。 仅仅因为一个值是'static
并不意味着你有内存泄漏。 你可以通过 常见的 Rust 生命周期误解查看更多。
比如,下面的代码就无法编译:
use tokio::task;
#[tokio::main]
async fn main() {
let v = vec![1, 2, 3];
task::spawn(async {
println!("Here's a vec: {:?}", v);
});
}
尝试编译会报下面的错误:
error[E0373]: async block may outlive the current function, but
it borrows `v`, which is owned by the current function
--> src/main.rs:7:23
|
7 | task::spawn(async {
| _______________________^
8 | | println!("Here's a vec: {:?}", v);
| | - `v` is borrowed here
9 | | });
| |_____^ may outlive borrowed value `v`
|
note: function requires argument type to outlive `'static`
--> src/main.rs:7:17
|
7 | task::spawn(async {
| _________________^
8 | | println!("Here's a vector: {:?}", v);
9 | | });
| |_____^
help: to force the async block to take ownership of `v` (and any other
referenced variables), use the `move` keyword
|
7 | task::spawn(async move {
8 | println!("Here's a vec: {:?}", v);
9 | });
|
发生这种情况是因为默认情况下,变量不会移动到 async 代码块中。v
向量仍然属于 main
函数。println!
这一行借用了 v
。rust 编译器向我们解释了这一点,甚至建议修复!将第 7 行更改为 task::spawn(async move {
将指示编译器将移动v
到生成的任务中。现在,任务拥有其所有数据,使其'static
。
如果必须同时从多个任务访问单个数据,则必须使用同步原语(例如 Arc
)共享它。
请注意,错误消息还提到函数参数类型要比'static
生命周期长。这个术语可能容易让你凌乱,因为'static
生命周期持续到程序结束,所以如果要比它还长,难道不会有内存泄漏吗?实际上它是类型,而不是值必须比'static
生命周期长,并且该值可能会在其类型不再有效之前被销毁。
当我们说一个值是 静态
的时候,这意味着永远保留这个值并不是不正确的。这一点很重要,因为编译器无法推断新生的任务会持续多长时间,因此确保任务不会生存太长时间的唯一方式就是确保它永远存在。
前面的信息框链接到的文章使用术语 " 受 '静态'
约束 ",而不是" 它的类型比 '静态'
更长久 "或" 值是 '静态'
来指代 T: '静态
。这些都意味相同的事情,但与采用 &'static T
中的 " 用 'static
注释 " 不同。
Send
绑定#
由 tokio::spawn
产生的任务必须实现 Send
。这允许 Tokio 运行时在任务被 .await
挂起时,可以在线程间移动任务。
当任务中跨 .await
调用持有的所有数据都实现 Send
时,这些任务就是 Send
的。这点很微妙。当调用 .await
的时候,任务会让步给调度器。下一次任务执行的时候,它会从上次暂停的点起继续执行。为了实现这一点,任务必须保存所有在 .await
之后使用的状态信息。如果这个状态信息实现了 Send
,即可以在线程间移动,那么该任务本身就可以在线程间移动。反之,如果状态信息不实现 Send
,那么任务本身也不具备 Send
属性。
例如,这个是正确的:
use tokio::task::yield_now;
use std::rc::Rc;
#[tokio::main]
async fn main() {
tokio::spawn(async {
// The scope forces `rc` to drop before `.await`.
{
let rc = Rc::new("hello");
println!("{}", rc);
}
// `rc` is no longer used. It is **not** persisted when
// the task yields to the scheduler
yield_now().await;
});
}
这个是不正确的:
use tokio::task::yield_now;
use std::rc::Rc;
#[tokio::main]
async fn main() {
tokio::spawn(async {
let rc = Rc::new("hello");
// `rc` is used after `.await`. It must be persisted to
// the task's state.
yield_now().await;
println!("{}", rc);
});
}
尝试编译代码片段会导致:
error: future cannot be sent between threads safely
--> src/main.rs:6:5
|
6 | tokio::spawn(async {
| ^^^^^^^^^^^^ future created by async block is not `Send`
|
::: [..]spawn.rs:127:21
|
127 | T: Future + Send + 'static,
| ---- required by this bound in
| `tokio::task::spawn::spawn`
|
= help: within `impl std::future::Future`, the trait
| `std::marker::Send` is not implemented for
| `std::rc::Rc<&str>`
note: future is not `Send` as this value is used across an await
--> src/main.rs:10:9
|
7 | let rc = Rc::new("hello");
| -- has type `std::rc::Rc<&str>` which is not `Send`
...
10 | yield_now().await;
| ^^^^^^^^^^^^^^^^^ await occurs here, with `rc` maybe
| used later
11 | println!("{}", rc);
12 | });
| - `rc` is later dropped here
我们将更深入地讨论这种错误的一个特殊案例 在下一章节.
储存值#
我们现在将会实现 process
函数来处理传入的命令。我们将用一个 HashMap
来储存值。SET
命令将会向 HashMap
插入值而 GET
将会加载值。除此之外,我们将会使用一个循环来接受每次连接的多个指令。
use tokio::net::TcpStream;
use mini_redis::{Connection, Frame};
async fn process(socket: TcpStream) {
use mini_redis::Command::{self, Get, Set};
use std::collections::HashMap;
// A hashmap is used to store data
let mut db = HashMap::new();
// Connection, provided by `mini-redis`, handles parsing frames from
// the socket
let mut connection = Connection::new(socket);
// Use `read_frame` to receive a command from the connection.
while let Some(frame) = connection.read_frame().await.unwrap() {
let response = match Command::from_frame(frame).unwrap() {
Set(cmd) => {
// The value is stored as `Vec<u8>`
db.insert(cmd.key().to_string(), cmd.value().to_vec());
Frame::Simple("OK".to_string())
}
Get(cmd) => {
if let Some(value) = db.get(cmd.key()) {
// `Frame::Bulk` expects data to be of type `Bytes`. This
// type will be covered later in the tutorial. For now,
// `&Vec<u8>` is converted to `Bytes` using `into()`.
Frame::Bulk(value.clone().into())
} else {
Frame::Null
}
}
cmd => panic!("unimplemented {:?}", cmd),
};
// Write the response to the client
connection.write_frame(&response).await.unwrap();
}
}
现在,来启动服务:
$ cargo run
并在一个单独的终端窗口,运行 hello-redis
示例:
$ cargo run --example hello-redis
现在的输出将会是:
got value from the server; result=Some(b"world")
我们现在可以获取和设置值,但是有个问题:连接之间没有共享值。如果另一个套接字连接并试图通过 GET
获取 hello
这个 key,它将啥也查不到。
你可以找到完整的代码 这里.
在下一个章节,我们将会为所有的套接字实现数据的持久化。
本译文仅用于学习和交流目的,转载请务必注明文章译者、出处、和本文链接
我们的翻译工作遵照 CC 协议,如果我们的工作有侵犯到您的权益,请及时联系我们。