翻译进度
7
分块数量
5
参与人数

Shared state

这是一篇协同翻译的文章,你可以点击『我来翻译』按钮来参与翻译。


共享状态

到目前为止,我们有一个在工作的键值对服务器。然而,仍有一个重大的缺陷:状态不能跨连接共享。我们将会在本文中修复这个问题。

策略

在Tokio中有两种不同的方式去共享状态。

  1. 使用Mutex保护共享状态。
  2. 生成一个任务来管理状态并使用消息传递对其进行操作。

通常你想对简单的数据使用第一种方式,对I/O基元那些需要异步工作的情况使用第二种方式。在本章节中,共享状态是 HashMap,操作是 insert 和 get。这两种操作方式没有一种是异步的,所以我们将要使用 Mutex

后一种方式将在下一章节介绍。

添加 bytes 依赖项

不使用 Vec<u8>,Mini-Redis crate使用 Bytescrate中的 Bytes。 Bytes的目标是为网络编程提供一种强健的字节数组结构。与 Vec<u8> 相比它最大的功能是浅克隆。换句话说,在 Bytes 实例上调用 clone() 不会拷贝底层数据。相反, Bytes 实例是一些底层数据的引用计数句柄。 Bytes 类型可以认为是 Arc<Vec<u8>>但是又具备一些额外的功能。

为了要依赖 bytes,请在你的 Cargo.toml 中 [dependencies] 部分添加以下内容:

bytes = "1"

初始化 HashMap

HashMap 将在很多任务和潜在的许多线程中共享。为了支持这一点,它被包裹在 Arc<Mutex<_>>中。

runstone 翻译于 1年前

首先,为方便起见,在use语句后添加以下类型别名:

use bytes::Bytes;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};

type Db = Arc<Mutex<HashMap<String, Bytes>>>;

然后,更新主函数以初始化HashMap,并将一个Arc句柄传递给 process 函数。使用Arc允许从许多任务中并发地引用HashMap,可能在许多线程上运行。在整个Tokio中,术语 handle 被用来引用一个提供对一些共享状态的访问的值。

use tokio::net::TcpListener;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};

#[tokio::main]
async fn main() {
    let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();

    println!("Listening");

    let db = Arc::new(Mutex::new(HashMap::new()));

    loop {
        let (socket, _) = listener.accept().await.unwrap();
        // Clone the handle to the hash map.
        let db = db.clone();

        println!("Accepted");
        tokio::spawn(async move {
            process(socket, db).await;
        });
    }
}

关于 std::sync::Mutex使用

注意,使用 std::sync::Mutex而不是 tokio::Mutex来保护 HashMap。一个常见的错误是在异步代码中无条件地使用  tokio::sync::Mutex。异步Mutex是一个跨调用 .await而被锁定的Mutex。

同步的mutex在等待获得锁的时候会阻塞当前线程。这反过来又会阻塞其他任务的处理。然而,切换到 tokio::sync::Mutex通常没有帮助,因为异步mutex内部使用同步mutex。

作为一个经验法则,在异步代码中使用同步的mutex是可以的,只要竞争保持在较低的水平,并且在调用 .await时不保持锁。此外,可以考虑使用parking_lot::Mutex 作为 std::sync::Mutex的更快的替代品。

CookieYang 翻译于 4个月前

更新 process()

process函数不再初始化 HashMap,相反, 它将 HashMap的共享句柄作为参数. 在使用HashMap之前还需要对其进行上锁的操作 。切记HashMap的值的类型现在是Bytes (我们可以很小开销的去复制它的值), 所以也需要修改。

use tokio::net::TcpStream;
use mini_redis::{Connection, Frame};

async fn process(socket: TcpStream, db: Db) {
    use mini_redis::Command::{self, Get, Set};

    // 由'mini-redis'提供的连接,用来分析来自套接字的数据帧
    let mut connection = Connection::new(socket);

    while let Some(frame) = connection.read_frame().await.unwrap() {
        let response = match Command::from_frame(frame).unwrap() {
            Set(cmd) => {
                let mut db = db.lock().unwrap();
                db.insert(cmd.key().to_string(), cmd.value().clone());
                Frame::Simple("OK".to_string())
            }           
            Get(cmd) => {
                let db = db.lock().unwrap();
                if let Some(value) = db.get(cmd.key()) {
                    Frame::Bulk(value.clone())
                } else {
                    Frame::Null
                }
            }
            cmd => panic!("unimplemented {:?}", cmd),
        };

        // 将响应写入客户端
        connection.write_frame(&response).await.unwrap();
    }
}

任务, 线程, 和竞争

当数据竞争最小时,使用互斥锁来保护临界资源是一种可接受的策略,当锁被一个线程持有时,执行任务的线程必须阻塞等待锁被释放,这不仅会阻塞当前线程,还会阻塞当前线程上调度的所有其他任务。

默认情况下, Tokio运行时,使用多线程调度程序,任务被调度到运行时管理的任意线程上,如果大量的线程被调度执行且都需要访问锁的话就会发生数据竞争的情况。但是,如果使用Tokio的运行时中的
[current_thread]宏,那么就能避免数据竞争的发生。(docs.rs/tokio/1/tokio/runtime/inde...)

current_thread runtime flavor 是一个轻量级的单线程运行时。当仅生成一些任务并打开少量套接字时,这是一个不错的选择。例如,当在异步客户端库之上提供同步 API 时,此选项效果很好。

Drakezz 翻译于 2个月前

If contention on a synchronous mutex becomes a problem, the best fix is rarely to switch to the Tokio mutex. Instead, options to consider are:

  • Switching to a dedicated task to manage state and use message passing.
  • Shard the mutex.
  • Restructure the code to avoid the mutex.

In our case, as each key is independent, mutex sharding will work well. To do this, instead of having a single Mutex<HashMap<_, _>> instance, we would introduce N distinct instances.

type ShardedDb = Arc<Vec<Mutex<HashMap<String, Vec<u8>>>>>;

fn new_sharded_db(num_shards: usize) -> ShardedDb {
    let mut db = Vec::with_capacity(num_shards);
    for _ in 0..num_shards {
        db.push(Mutex::new(HashMap::new()));
    }
    Arc::new(db)
}

Then, finding the cell for any given key becomes a two step process. First, the key is used to identify which shard it is part of. Then, the key is looked up in the HashMap.

let shard = db[hash(key) % db.len()].lock().unwrap();
shard.insert(key, value);

The simple implementation outlined above requires using a fixed number of shards, and the number of shards cannot be changed once the sharded map is created. The dashmap crate provides an implementation of a more sophisticated sharded hash map.

Holding a MutexGuard across an .await

You might write code that looks like this:

use std::sync::{Mutex, MutexGuard};

async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
    let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
    *lock += 1;

    do_something_async().await;
} // lock goes out of scope here

When you try to spawn something that calls this function, you will encounter the following error message:

error: future cannot be sent between threads safely
   --> src/lib.rs:13:5
    |
13  |     tokio::spawn(async move {
    |     ^^^^^^^^^^^^ future created by async block is not `Send`
    |
   ::: /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.21/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ---- required by this bound in `tokio::task::spawn::spawn`
    |
    = help: within `impl std::future::Future`, the trait `std::marker::Send` is not implemented for `std::sync::MutexGuard<'_, i32>`
note: future is not `Send` as this value is used across an await
   --> src/lib.rs:7:5
    |
4   |     let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
    |         -------- has type `std::sync::MutexGuard<'_, i32>` which is not `Send`
...
7   |     do_something_async().await;
    |     ^^^^^^^^^^^^^^^^^^^^^^^^^^ await occurs here, with `mut lock` maybe used later
8   | }
    | - `mut lock` is later dropped here

This happens because the std::sync::MutexGuard type is not Send. This means that you can't send a mutex lock to another thread, and the error happens because the Tokio runtime can move a task between threads at every .await. To avoid this, you should restructure your code such that the mutex lock's destructor runs before the .await.

// This works!
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
    {
        let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
        *lock += 1;
    } // lock goes out of scope here

    do_something_async().await;
}

Note that this does not work:

use std::sync::{Mutex, MutexGuard};

// This fails too.
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
    let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
    *lock += 1;
    drop(lock);

    do_something_async().await;
}

This is because the compiler currently calculates whether a future is Send based on scope information only. The compiler will hopefully be updated to support explicitly dropping it in the future, but for now, you must explicitly use a scope.

Note that the error discussed here is also discussed in the Send bound section from the spawning chapter.

You should not try to circumvent this issue by spawning the task in a way that does not require it to be Send, because if Tokio suspends your task at an .await while the task is holding the lock, some other task may be scheduled to run on the same thread, and this other task may also try to lock that mutex, which would result in a deadlock as the task waiting to lock the mutex would prevent the task holding the mutex from releasing the mutex.

We will discuss some approaches to fix the error message below:

Restructure your code to not hold the lock across an .await

We have already seen one example of this in the snippet above, but there are some more robust ways to do this. For example, you can wrap the mutex in a struct, and only ever lock the mutex inside non-async methods on that struct.

use std::sync::Mutex;

struct CanIncrement {
    mutex: Mutex<i32>,
}
impl CanIncrement {
    // This function is not marked async.
    fn increment(&self) {
        let mut lock = self.mutex.lock().unwrap();
        *lock += 1;
    }
}

async fn increment_and_do_stuff(can_incr: &CanIncrement) {
    can_incr.increment();
    do_something_async().await;
}

This pattern guarantees that you won't run into the Send error, because the mutex guard does not appear anywhere in an async function.

Spawn a task to manage the state and use message passing to operate on it

This is the second approach mentioned in the start of this chapter, and is often used when the shared resource is an I/O resource. See the next chapter for more details.

Use Tokio's asynchronous mutex

The tokio::sync::Mutex type provided by Tokio can also be used. The primary feature of the Tokio mutex is that it can be held across an .await without any issues. That said, an asynchronous mutex is more expensive than an ordinary mutex, and it is typically better to use one of the two other approaches.

use tokio::sync::Mutex; // note! This uses the Tokio mutex

// This compiles!
// (but restructuring the code would be better in this case)
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
    let mut lock = mutex.lock().await;
    *lock += 1;

    do_something_async().await;
} // lock goes out of scope here

本文章首发在 LearnKu.com 网站上。

本文中的所有译文仅用于学习和交流目的,转载请务必注明文章译者、出处、和本文链接
我们的翻译工作遵照 CC 协议,如果我们的工作有侵犯到您的权益,请及时联系我们。

贡献者:5
讨论数量: 0
发起讨论 只看当前版本


暂无话题~