翻译进度
8
分块数量
3
参与人数

Framing

这是一篇协同翻译的文章,你可以点击『我来翻译』按钮来参与翻译。


帧化

利用刚刚学到的 I/O 相关知识,我们来实现 Mini-Redis 的帧。将字节流转换为以帧为单位的数据流的过程,称为帧化。节点之间以帧为单位传递数据。 Redis 协议的帧定义如下:

use bytes::Bytes;

enum Frame {
    Simple(String),
    Error(String),
    Integer(u64),
    Bulk(Bytes),
    Null,
    Array(Vec<Frame>),
}

请注意,帧只包含数据不包含其他逻辑。命令的组成和解析是在更高的抽象层次上实现的。

例如在 HTTP 中,帧的定义大概是这样:

enum HttpFrame {
    RequestHead {
        method: Method,
        uri: Uri,
        version: Version,
        headers: HeaderMap,
    },
    ResponseHead {
        status: StatusCode,
        version: Version,
        headers: HeaderMap,
    },
    BodyChunk {
        chunk: Bytes,
    },
}

为了构建 Mini-Redis 的帧,我们定义 Connection 结构体来包装 TcpStream ,通过它来和 mini_redis::Frame 交互,实现读、写、解析等操作。

use tokio::net::TcpStream;
use mini_redis::{Frame, Result};

struct Connection {
    stream: TcpStream,
    // ... 其他字段
}

impl Connection {
    /// 从连接中读取帧
    /// 
    /// 如果读到结束标记则返回 `None`
    pub async fn read_frame(&mut self)
        -> Result<Option<Frame>>
    {
        // 具体实现
    }

    /// 将帧数据写入连接
    pub async fn write_frame(&mut self, frame: &Frame)
        -> Result<()>
    {
        // 具体实现
    }
}

Redis 通讯协议的细节可以在 这里找到。Connection 的完整代码在 这里

带缓冲区的读

read_frame 方法,顾名思义,读取到一个完整的数据帧时才会返回。如果只是调用一次 TcpStream::read() 返回的数据量是不确定的,可能包含一个完整的数据帧,也可能只是一部分,还有可能是多个。如果是不完整的,则需要缓存起来并且从套接字读取更多数据;如果是多个,则返回第一帧,剩下的数据缓存起来直到下一次调用 read_frame

ziyouwa 翻译于 2周前

要添加读缓存,需要为 Connection 添加一个缓存字段:buffer。从套接字读取数据后写入缓存,解析成功后,已被解析的数据会从缓存中移除。

我们使用 BytesMut 作为缓存类型,它是 Bytes 类型的变体,支持可变借用。

use bytes::BytesMut;
use tokio::net::TcpStream;

pub struct Connection {
    stream: TcpStream,
    buffer: BytesMut,
}

impl Connection {
    pub fn new(stream: TcpStream) -> Connection {
        Connection {
            stream,
            // buffer空间需要扩展时,内存分配器会按 4k 容
            // 量的整数倍分配空间给 buffer
            buffer: BytesMut::with_capacity(4096),
        }
    }
}

接下来,我们实现 read_frame() 方法。

use tokio::io::AsyncReadExt;
use bytes::Buf;
use mini_redis::Result;

pub async fn read_frame(&mut self)
    -> Result<Option<Frame>>
{
    loop {
        // 尝试从缓冲区解析帧,如果解析成功,返回解析得到的帧
        if let Some(frame) = self.parse_frame()? {
            return Ok(Some(frame));
        }

        // 如果缓存数据不足,尝试从套接字读取更多数据。
        //
        // 读取成功返回读取的字节数。字节数为 `0`则
        // 意味着读到了数据流的「终点」
        if 0 == self.stream.read_buf(&mut self.buffer).await? {
            // 远程连接已断开。如果是正常断开的话,缓冲区应该没有数据。
            // 如果有,则说明连接是在发送数据的过程中意外断开的
            if self.buffer.is_empty() {
                return Ok(None);
            } else {
                return Err("connection reset by peer".into());
            }
        }
    }
}

让我们来仔细梳理一下: read_frame 方法的逻辑都包含在一个无限循环中。首先,调用self.parse_frame() 尝试从 self.buffer 解析数据帧,如果缓冲区的数据能成功解析一帧数据,则 read_frame() 执行完毕,返回解析后的数据帧;否则尝试从套接字读取更多的数据写入缓冲区,然后再次调用 parse_frame() ,此时如果数据足够,就会成功解析出一帧数据。

ziyouwa 翻译于 2周前

从套接字读取数据时,如果读到的字节数为 0,则意味着对端套接字已关闭,后续再也无法从中读取到任何数据。如果缓冲区内仍有数据存在,则说明这是某个数据帧的一部分,连接在接收数据过程中意外突然中止,这时返回 Err 类型的结果是恰当的。

Buf 特征

read_buf 函数用于从流中读取数据。这个版本的读方法会得到一个实现了 BufMut 特征的值。BufMut来自 bytes 包(crate,译作包)。

首先,考虑如何使用 read() 来实现相同的读取循环,我们将使用 Vec<u8> 代替 BytesMut

use tokio::net::TcpStream;

pub struct Connection {
    stream: TcpStream,
    buffer: Vec<u8>,
    cursor: usize,  // 光标指向缓冲区剩余空间内第一个字节
}

impl Connection {
    pub fn new(stream: TcpStream) -> Connection {
        Connection {
            stream,
            // 内存分配器按 4k 容量的整数倍为 buffer 分配内存空间
            buffer: vec![0; 4096],
            cursor: 0,
        }
    }
}

然后在 Connection 上调用 read_frame():

use mini_redis::{Frame, Result};

pub async fn read_frame(&mut self)
    -> Result<Option<Frame>>
{
    loop {
        if let Some(frame) = self.parse_frame()? {
            return Ok(Some(frame));
        }

        // 确认缓冲区剩余空间足够
        if self.buffer.len() == self.cursor {
            // 增大缓冲区容量
            self.buffer.resize(self.cursor * 2, 0);
        }

        // 将数据读入缓冲区,返回读取的字节数
        let n = self.stream.read(
            &mut self.buffer[self.cursor..]).await?;

        if 0 == n {
            if self.cursor == 0 {
                return Ok(None);
            } else {
                return Err("connection reset by peer".into());
            }
        } else {
            // 更新光标位置
            self.cursor += n;
        }
    }
}

read 配合字节数组使用时,我们必须维护光标来跟踪已缓存的数据,以确保传递给 read() 的缓冲区是空白的,否则将会覆盖掉缓存的数据。如果缓冲区已满,我们需要扩展其容量来保证能容纳读到的数据。在 parse_frame() 函数(还没实现)中,我们会解析 self.buffer[..self.cursor]中的数据。

ziyouwa 翻译于 2周前

Because pairing a byte array with a cursor is very common, the bytes crate provides an abstraction representing a byte array and cursor. The Buf trait is implemented by types from which data can be read. The BufMut trait is implemented by types into which data can be written. When passing a T: BufMut to read_buf(), the buffer's internal cursor is automatically updated by read_buf. Because of this, in our version of read_frame, we do not need to manage our own cursor.

Additionally, when using Vec<u8>, the buffer must be initializedvec![0; 4096] allocates an array of 4096 bytes and writes zero to every entry. When resizing the buffer, the new capacity must also be initialized with zeros. The initialization process is not free. When working with BytesMut and BufMut, capacity is uninitialized. The BytesMut abstraction prevents us from reading the uninitialized memory. This lets us avoid the initialization step.

Parsing

Now, let's look at the parse_frame() function. Parsing is done in two steps.

  1. Ensure a full frame is buffered and find the end index of the frame.
  2. Parse the frame.

The mini-redis crate provides us with a function for both of these steps:

  1. Frame::check
  2. Frame::parse

We will also reuse the Buf abstraction to help. A Buf is passed into Frame::check. As the check function iterates the passed in buffer, the internal cursor will be advanced. When check returns, the buffer's internal cursor points to the end of the frame.

对于 Buf 类型, 用 std::io::Cursor<&[u8]>.

use mini_redis::{Frame, Result};
use mini_redis::frame::Error::Incomplete;
use bytes::Buf;
use std::io::Cursor;

fn parse_frame(&mut self)
    -> Result<Option<Frame>>
{
    // 创建 `T: Buf` 类型
    let mut buf = Cursor::new(&self.buffer[..]);

    // 校验是否是一个完整可用数据帧
    match Frame::check(&mut buf) {
        Ok(_) => {
            // Get the byte length of the frame
            let len = buf.position() as usize;

            // 重置内部游标
            // 调用 `parse`.
            buf.set_position(0);

            // 解析数据帧
            let frame = Frame::parse(&mut buf)?;

            // 从缓存区丢弃帧
            self.buffer.advance(len);

            // 将帧返回调用者
            Ok(Some(frame))
        }
        // 缓存数据不足
        Err(Incomplete) => Ok(None),
        // 遇到错误
        Err(e) => Err(e.into()),
    }
}

完整的 Frame::check函数可在 此处 找到。将不对其进行全面介绍。

需要注意的是,使用了 Buf 的 “字节迭代器 ”式 API。这些 API 可获取数据并推进内部游标。例如,在解析帧时,会检查第一个字节以确定帧的类型。使用的函数是 Buf::get_u8。该函数获取当前光标位置的字节,并将光标向前推进一个字节。

VeryAUX 翻译于 1周前

There are more useful methods on the Buf trait. Check the API docs for more details.

Buffered writes

The other half of the framing API is the write_frame(frame) function. This function writes an entire frame to the socket. In order to minimize write syscalls, writes will be buffered. A write buffer is maintained and frames are encoded to this buffer before being written to the socket. However, unlike read_frame(), the entire frame is not always buffered to a byte array before writing to the socket.

Consider a bulk stream frame. The value being written is Frame::Bulk(Bytes). The wire format of a bulk frame is a frame head, which consists of the $ character followed by the data length in bytes. The majority of the frame is the contents of the Bytes value. If the data is large, copying it to an intermediate buffer would be costly.

To implement buffered writes, we will use the BufWriter struct. This struct is initialized with a T: AsyncWrite and implements AsyncWrite itself. When write is called on BufWriter, the write does not go directly to the inner writer, but to a buffer. When the buffer is full, the contents are flushed to the inner writer and the inner buffer is cleared. There are also optimizations that allow bypassing the buffer in certain cases.

We will not attempt a full implementation of write_frame() as part of the tutorial. See the full implementation here.

First, the Connection struct is updated:

use tokio::io::BufWriter;
use tokio::net::TcpStream;
use bytes::BytesMut;

pub struct Connection {
    stream: BufWriter<TcpStream>,
    buffer: BytesMut,
}

impl Connection {
    pub fn new(stream: TcpStream) -> Connection {
        Connection {
            stream: BufWriter::new(stream),
            buffer: BytesMut::with_capacity(4096),
        }
    }
}

Next, write_frame() is implemented.

use tokio::io::{self, AsyncWriteExt};
use mini_redis::Frame;

async fn write_frame(&mut self, frame: &Frame)
    -> io::Result<()>
{
    match frame {
        Frame::Simple(val) => {
            self.stream.write_u8(b'+').await?;
            self.stream.write_all(val.as_bytes()).await?;
            self.stream.write_all(b"\r\n").await?;
        }
        Frame::Error(val) => {
            self.stream.write_u8(b'-').await?;
            self.stream.write_all(val.as_bytes()).await?;
            self.stream.write_all(b"\r\n").await?;
        }
        Frame::Integer(val) => {
            self.stream.write_u8(b':').await?;
            self.write_decimal(*val).await?;
        }
        Frame::Null => {
            self.stream.write_all(b"$-1\r\n").await?;
        }
        Frame::Bulk(val) => {
            let len = val.len();

            self.stream.write_u8(b'$').await?;
            self.write_decimal(len as u64).await?;
            self.stream.write_all(val).await?;
            self.stream.write_all(b"\r\n").await?;
        }
        Frame::Array(_val) => unimplemented!(),
    }

    self.stream.flush().await;

    Ok(())
}

The functions used here are provided by AsyncWriteExt. They are available on TcpStream as well, but it would not be advisable to issue single byte writes without the intermediate buffer.

The function ends with a call to self.stream.flush().await. Because BufWriter stores writes in an intermediate buffer, calls to write do not guarantee that the data is written to the socket. Before returning, we want the frame to be written to the socket. The call to flush() writes any data pending in the buffer to the socket.

Another alternative would be to not call flush() in write_frame(). Instead, provide a flush() function on Connection. This would allow the caller to write queue multiple small frames in the write buffer then write them all to the socket with one write syscall. Doing this complicates the Connection API. Simplicity is one of Mini-Redis' goals, so we decided to include the flush().await call in fn write_frame().

本文章首发在 LearnKu.com 网站上。

本文中的所有译文仅用于学习和交流目的,转载请务必注明文章译者、出处、和本文链接
我们的翻译工作遵照 CC 协议,如果我们的工作有侵犯到您的权益,请及时联系我们。

贡献者:3
讨论数量: 0
发起讨论 只看当前版本


暂无话题~