翻译进度
7
分块数量
1
参与人数

Shared state

这是一篇协同翻译的文章,你可以点击『我来翻译』按钮来参与翻译。


共享状态

到目前为止,我们有一个在工作的键值对服务器。然而,仍有一个重大的缺陷:状态不能跨连接共享。我们将会在本文中修复这个问题。

策略

在Tokio中有两种不同的方式去共享状态。

  1. 使用Mutex保护共享状态。
  2. 生成一个任务来管理状态并使用消息传递对其进行操作。

通常你想对简单的数据使用第一种方式,对I/O基元那些需要异步工作的情况使用第二种方式。在本章节中,共享状态是 HashMap,操作是 insert 和 get。这两种操作方式没有一种是异步的,所以我们将要使用 Mutex

后一种方式将在下一章节介绍。

添加 bytes 依赖项

不使用 Vec<u8>,Mini-Redis crate使用 Bytescrate中的 Bytes。 Bytes的目标是为网络编程提供一种强健的字节数组结构。与 Vec<u8> 相比它最大的功能是浅克隆。换句话说,在 Bytes 实例上调用 clone() 不会拷贝底层数据。相反, Bytes 实例是一些底层数据的引用计数句柄。 Bytes 类型可以认为是 Arc<Vec<u8>>但是又具备一些额外的功能。

为了要依赖 bytes,请在你的 Cargo.toml 中 [dependencies] 部分添加以下内容:

bytes = "1"

初始化 HashMap

HashMap 将在很多任务和潜在的许多线程中共享。为了支持这一点,它被包裹在 Arc<Mutex<_>>中。

runstone 翻译于 9个月前

First, for convenience, add the following type alias after the use statements.

use bytes::Bytes;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};

type Db = Arc<Mutex<HashMap<String, Bytes>>>;

Then, update the main function to initialize the HashMap and pass an Arc handle to the process function. Using Arc allows the HashMap to be referenced concurrently from many tasks, potentially running on many threads. Throughout Tokio, the term handle is used to reference a value that provides access to some shared state.

use tokio::net::TcpListener;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};

#[tokio::main]
async fn main() {
    let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();

    println!("Listening");

    let db = Arc::new(Mutex::new(HashMap::new()));

    loop {
        let (socket, _) = listener.accept().await.unwrap();
        // Clone the handle to the hash map.
        let db = db.clone();

        println!("Accepted");
        tokio::spawn(async move {
            process(socket, db).await;
        });
    }
}

On using std::sync::Mutex

Note, std::sync::Mutex and not tokio::sync::Mutex is used to guard the HashMap. A common error is to unconditionally use tokio::sync::Mutex from within async code. An async mutex is a mutex that is locked across calls to .await.

A synchronous mutex will block the current thread when waiting to acquire the lock. This, in turn, will block other tasks from processing. However, switching to tokio::sync::Mutex usually does not help as the asynchronous mutex uses a synchronous mutex internally.

As a rule of thumb, using a synchronous mutex from within asynchronous code is fine as long as contention remains low and the lock is not held across calls to .await. Additionally, consider using parking_lot::Mutex as a faster alternative to std::sync::Mutex.

Update process()

The process function no longer initializes a HashMap. Instead, it takes the shared handle to the HashMap as an argument. It also needs to lock the HashMap before using it. Remember that the value's type for the HashMap is now Bytes (which we can cheaply clone), so this needs to be changed as well.

use tokio::net::TcpStream;
use mini_redis::{Connection, Frame};

async fn process(socket: TcpStream, db: Db) {
    use mini_redis::Command::{self, Get, Set};

    // Connection, provided by `mini-redis`, handles parsing frames from
    // the socket
    let mut connection = Connection::new(socket);

    while let Some(frame) = connection.read_frame().await.unwrap() {
        let response = match Command::from_frame(frame).unwrap() {
            Set(cmd) => {
                let mut db = db.lock().unwrap();
                db.insert(cmd.key().to_string(), cmd.value().clone());
                Frame::Simple("OK".to_string())
            }           
            Get(cmd) => {
                let db = db.lock().unwrap();
                if let Some(value) = db.get(cmd.key()) {
                    Frame::Bulk(value.clone())
                } else {
                    Frame::Null
                }
            }
            cmd => panic!("unimplemented {:?}", cmd),
        };

        // Write the response to the client
        connection.write_frame(&response).await.unwrap();
    }
}

Tasks, threads, and contention

Using a blocking mutex to guard short critical sections is an acceptable strategy when contention is minimal. When a lock is contended, the thread executing the task must block and wait on the mutex. This will not only block the current task but it will also block all other tasks scheduled on the current thread.

By default, the Tokio runtime uses a multi-threaded scheduler. Tasks are scheduled on any number of threads managed by the runtime. If a large number of tasks are scheduled to execute and they all require access to the mutex, then there will be contention. On the other hand, if the current_thread runtime flavor is used, then the mutex will never be contended.

The current_thread runtime flavor is a lightweight, single-threaded runtime. It is a good choice when only spawning a few tasks and opening a handful of sockets. For example, this option works well when providing a synchronous API bridge on top of an asynchronous client library.

If contention on a synchronous mutex becomes a problem, the best fix is rarely to switch to the Tokio mutex. Instead, options to consider are:

  • Switching to a dedicated task to manage state and use message passing.
  • Shard the mutex.
  • Restructure the code to avoid the mutex.

In our case, as each key is independent, mutex sharding will work well. To do this, instead of having a single Mutex<HashMap<_, _>> instance, we would introduce N distinct instances.

type ShardedDb = Arc<Vec<Mutex<HashMap<String, Vec<u8>>>>>;

fn new_sharded_db(num_shards: usize) -> ShardedDb {
    let mut db = Vec::with_capacity(num_shards);
    for _ in 0..num_shards {
        db.push(Mutex::new(HashMap::new()));
    }
    Arc::new(db)
}

Then, finding the cell for any given key becomes a two step process. First, the key is used to identify which shard it is part of. Then, the key is looked up in the HashMap.

let shard = db[hash(key) % db.len()].lock().unwrap();
shard.insert(key, value);

The simple implementation outlined above requires using a fixed number of shards, and the number of shards cannot be changed once the sharded map is created. The dashmap crate provides an implementation of a more sophisticated sharded hash map.

Holding a MutexGuard across an .await

You might write code that looks like this:

use std::sync::{Mutex, MutexGuard};

async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
    let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
    *lock += 1;

    do_something_async().await;
} // lock goes out of scope here

When you try to spawn something that calls this function, you will encounter the following error message:

error: future cannot be sent between threads safely
   --> src/lib.rs:13:5
    |
13  |     tokio::spawn(async move {
    |     ^^^^^^^^^^^^ future created by async block is not `Send`
    |
   ::: /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.21/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ---- required by this bound in `tokio::task::spawn::spawn`
    |
    = help: within `impl std::future::Future`, the trait `std::marker::Send` is not implemented for `std::sync::MutexGuard<'_, i32>`
note: future is not `Send` as this value is used across an await
   --> src/lib.rs:7:5
    |
4   |     let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
    |         -------- has type `std::sync::MutexGuard<'_, i32>` which is not `Send`
...
7   |     do_something_async().await;
    |     ^^^^^^^^^^^^^^^^^^^^^^^^^^ await occurs here, with `mut lock` maybe used later
8   | }
    | - `mut lock` is later dropped here

This happens because the std::sync::MutexGuard type is not Send. This means that you can't send a mutex lock to another thread, and the error happens because the Tokio runtime can move a task between threads at every .await. To avoid this, you should restructure your code such that the mutex lock's destructor runs before the .await.

// This works!
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
    {
        let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
        *lock += 1;
    } // lock goes out of scope here

    do_something_async().await;
}

Note that this does not work:

use std::sync::{Mutex, MutexGuard};

// This fails too.
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
    let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
    *lock += 1;
    drop(lock);

    do_something_async().await;
}

This is because the compiler currently calculates whether a future is Send based on scope information only. The compiler will hopefully be updated to support explicitly dropping it in the future, but for now, you must explicitly use a scope.

Note that the error discussed here is also discussed in the Send bound section from the spawning chapter.

You should not try to circumvent this issue by spawning the task in a way that does not require it to be Send, because if Tokio suspends your task at an .await while the task is holding the lock, some other task may be scheduled to run on the same thread, and this other task may also try to lock that mutex, which would result in a deadlock as the task waiting to lock the mutex would prevent the task holding the mutex from releasing the mutex.

We will discuss some approaches to fix the error message below:

Restructure your code to not hold the lock across an .await

We have already seen one example of this in the snippet above, but there are some more robust ways to do this. For example, you can wrap the mutex in a struct, and only ever lock the mutex inside non-async methods on that struct.

use std::sync::Mutex;

struct CanIncrement {
    mutex: Mutex<i32>,
}
impl CanIncrement {
    // This function is not marked async.
    fn increment(&self) {
        let mut lock = self.mutex.lock().unwrap();
        *lock += 1;
    }
}

async fn increment_and_do_stuff(can_incr: &CanIncrement) {
    can_incr.increment();
    do_something_async().await;
}

This pattern guarantees that you won't run into the Send error, because the mutex guard does not appear anywhere in an async function.

Spawn a task to manage the state and use message passing to operate on it

This is the second approach mentioned in the start of this chapter, and is often used when the shared resource is an I/O resource. See the next chapter for more details.

Use Tokio's asynchronous mutex

The tokio::sync::Mutex type provided by Tokio can also be used. The primary feature of the Tokio mutex is that it can be held across an .await without any issues. That said, an asynchronous mutex is more expensive than an ordinary mutex, and it is typically better to use one of the two other approaches.

use tokio::sync::Mutex; // note! This uses the Tokio mutex

// This compiles!
// (but restructuring the code would be better in this case)
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
    let mut lock = mutex.lock().await;
    *lock += 1;

    do_something_async().await;
} // lock goes out of scope here

本文章首发在 LearnKu.com 网站上。

本文中的所有译文仅用于学习和交流目的,转载请务必注明文章译者、出处、和本文链接
我们的翻译工作遵照 CC 协议,如果我们的工作有侵犯到您的权益,请及时联系我们。

贡献者:1
讨论数量: 0
发起讨论 只看当前版本


暂无话题~