Framing
这是一篇协同翻译的文章,你可以点击『我来翻译』按钮来参与翻译。
帧化
利用刚刚学到的 I/O 相关知识,我们来实现 Mini-Redis 的帧。将字节流转换为以帧为单位的数据流的过程,称为帧化。节点之间以帧为单位传递数据。 Redis 协议的帧定义如下:
use bytes::Bytes;
enum Frame {
Simple(String),
Error(String),
Integer(u64),
Bulk(Bytes),
Null,
Array(Vec<Frame>),
}
请注意,帧只包含数据不包含其他逻辑。命令的组成和解析是在更高的抽象层次上实现的。
例如在 HTTP 中,帧的定义大概是这样:
enum HttpFrame {
RequestHead {
method: Method,
uri: Uri,
version: Version,
headers: HeaderMap,
},
ResponseHead {
status: StatusCode,
version: Version,
headers: HeaderMap,
},
BodyChunk {
chunk: Bytes,
},
}
为了构建 Mini-Redis 的帧,我们定义 Connection
结构体来包装 TcpStream
,通过它来和 mini_redis::Frame
交互,实现读、写、解析等操作。
use tokio::net::TcpStream;
use mini_redis::{Frame, Result};
struct Connection {
stream: TcpStream,
// ... 其他字段
}
impl Connection {
/// 从连接中读取帧
///
/// 如果读到结束标记则返回 `None`
pub async fn read_frame(&mut self)
-> Result<Option<Frame>>
{
// 具体实现
}
/// 将帧数据写入连接
pub async fn write_frame(&mut self, frame: &Frame)
-> Result<()>
{
// 具体实现
}
}
Redis 通讯协议的细节可以在 这里找到。Connection
的完整代码在 这里。
带缓冲区的读
read_frame
方法,顾名思义,读取到一个完整的数据帧时才会返回。如果只是调用一次 TcpStream::read()
返回的数据量是不确定的,可能包含一个完整的数据帧,也可能只是一部分,还有可能是多个。如果是不完整的,则需要缓存起来并且从套接字读取更多数据;如果是多个,则返回第一帧,剩下的数据缓存起来直到下一次调用 read_frame
。
要添加读缓存,需要为 Connection
添加一个缓存字段:buffer。从套接字读取数据后写入缓存,解析成功后,已被解析的数据会从缓存中移除。
我们使用 BytesMut
作为缓存类型,它是 Bytes
类型的变体,支持可变借用。
use bytes::BytesMut;
use tokio::net::TcpStream;
pub struct Connection {
stream: TcpStream,
buffer: BytesMut,
}
impl Connection {
pub fn new(stream: TcpStream) -> Connection {
Connection {
stream,
// buffer空间需要扩展时,内存分配器会按 4k 容
// 量的整数倍分配空间给 buffer
buffer: BytesMut::with_capacity(4096),
}
}
}
接下来,我们实现 read_frame()
方法。
use tokio::io::AsyncReadExt;
use bytes::Buf;
use mini_redis::Result;
pub async fn read_frame(&mut self)
-> Result<Option<Frame>>
{
loop {
// 尝试从缓冲区解析帧,如果解析成功,返回解析得到的帧
if let Some(frame) = self.parse_frame()? {
return Ok(Some(frame));
}
// 如果缓存数据不足,尝试从套接字读取更多数据。
//
// 读取成功返回读取的字节数。字节数为 `0`则
// 意味着读到了数据流的「终点」
if 0 == self.stream.read_buf(&mut self.buffer).await? {
// 远程连接已断开。如果是正常断开的话,缓冲区应该没有数据。
// 如果有,则说明连接是在发送数据的过程中意外断开的
if self.buffer.is_empty() {
return Ok(None);
} else {
return Err("connection reset by peer".into());
}
}
}
}
让我们来仔细梳理一下: read_frame
方法的逻辑都包含在一个无限循环中。首先,调用self.parse_frame()
尝试从 self.buffer
解析数据帧,如果缓冲区的数据能成功解析一帧数据,则 read_frame()
执行完毕,返回解析后的数据帧;否则尝试从套接字读取更多的数据写入缓冲区,然后再次调用 parse_frame()
,此时如果数据足够,就会成功解析出一帧数据。
从套接字读取数据时,如果读到的字节数为 0
,则意味着对端套接字已关闭,后续再也无法从中读取到任何数据。如果缓冲区内仍有数据存在,则说明这是某个数据帧的一部分,连接在接收数据过程中意外突然中止,这时返回 Err
类型的结果是恰当的。
Buf
特征
read_buf
函数用于从流中读取数据。这个版本的读方法会得到一个实现了 BufMut
特征的值。BufMut
来自 bytes
包(crate,译作包)。
首先,考虑如何使用 read()
来实现相同的读取循环,我们将使用 Vec<u8>
代替 BytesMut
。
use tokio::net::TcpStream;
pub struct Connection {
stream: TcpStream,
buffer: Vec<u8>,
cursor: usize, // 光标指向缓冲区剩余空间内第一个字节
}
impl Connection {
pub fn new(stream: TcpStream) -> Connection {
Connection {
stream,
// 内存分配器按 4k 容量的整数倍为 buffer 分配内存空间
buffer: vec![0; 4096],
cursor: 0,
}
}
}
然后在 Connection
上调用 read_frame()
:
use mini_redis::{Frame, Result};
pub async fn read_frame(&mut self)
-> Result<Option<Frame>>
{
loop {
if let Some(frame) = self.parse_frame()? {
return Ok(Some(frame));
}
// 确认缓冲区剩余空间足够
if self.buffer.len() == self.cursor {
// 增大缓冲区容量
self.buffer.resize(self.cursor * 2, 0);
}
// 将数据读入缓冲区,返回读取的字节数
let n = self.stream.read(
&mut self.buffer[self.cursor..]).await?;
if 0 == n {
if self.cursor == 0 {
return Ok(None);
} else {
return Err("connection reset by peer".into());
}
} else {
// 更新光标位置
self.cursor += n;
}
}
}
read
配合字节数组使用时,我们必须维护光标来跟踪已缓存的数据,以确保传递给 read()
的缓冲区是空白的,否则将会覆盖掉缓存的数据。如果缓冲区已满,我们需要扩展其容量来保证能容纳读到的数据。在 parse_frame()
函数(还没实现)中,我们会解析 self.buffer[..self.cursor]
中的数据。
Because pairing a byte array with a cursor is very common, the bytes
crate provides an abstraction representing a byte array and cursor. The Buf
trait is implemented by types from which data can be read. The BufMut
trait is implemented by types into which data can be written. When passing a T: BufMut
to read_buf()
, the buffer's internal cursor is automatically updated by read_buf
. Because of this, in our version of read_frame
, we do not need to manage our own cursor.
Additionally, when using Vec<u8>
, the buffer must be initialized. vec![0; 4096]
allocates an array of 4096 bytes and writes zero to every entry. When resizing the buffer, the new capacity must also be initialized with zeros. The initialization process is not free. When working with BytesMut
and BufMut
, capacity is uninitialized. The BytesMut
abstraction prevents us from reading the uninitialized memory. This lets us avoid the initialization step.
Parsing
Now, let's look at the parse_frame()
function. Parsing is done in two steps.
- Ensure a full frame is buffered and find the end index of the frame.
- Parse the frame.
The mini-redis
crate provides us with a function for both of these steps:
We will also reuse the Buf
abstraction to help. A Buf
is passed into Frame::check
. As the check
function iterates the passed in buffer, the internal cursor will be advanced. When check
returns, the buffer's internal cursor points to the end of the frame.
对于 Buf
类型, 用 std::io::Cursor<&[u8]>
.
use mini_redis::{Frame, Result};
use mini_redis::frame::Error::Incomplete;
use bytes::Buf;
use std::io::Cursor;
fn parse_frame(&mut self)
-> Result<Option<Frame>>
{
// 创建 `T: Buf` 类型
let mut buf = Cursor::new(&self.buffer[..]);
// 校验是否是一个完整可用数据帧
match Frame::check(&mut buf) {
Ok(_) => {
// Get the byte length of the frame
let len = buf.position() as usize;
// 重置内部游标
// 调用 `parse`.
buf.set_position(0);
// 解析数据帧
let frame = Frame::parse(&mut buf)?;
// 从缓存区丢弃帧
self.buffer.advance(len);
// 将帧返回调用者
Ok(Some(frame))
}
// 缓存数据不足
Err(Incomplete) => Ok(None),
// 遇到错误
Err(e) => Err(e.into()),
}
}
完整的 Frame::check
函数可在 此处 找到。将不对其进行全面介绍。
需要注意的是,使用了 Buf
的 “字节迭代器 ”式 API。这些 API 可获取数据并推进内部游标。例如,在解析帧时,会检查第一个字节以确定帧的类型。使用的函数是 Buf::get_u8
。该函数获取当前光标位置的字节,并将光标向前推进一个字节。
There are more useful methods on the Buf
trait. Check the API docs for more details.
Buffered writes
The other half of the framing API is the write_frame(frame)
function. This function writes an entire frame to the socket. In order to minimize write
syscalls, writes will be buffered. A write buffer is maintained and frames are encoded to this buffer before being written to the socket. However, unlike read_frame()
, the entire frame is not always buffered to a byte array before writing to the socket.
Consider a bulk stream frame. The value being written is Frame::Bulk(Bytes)
. The wire format of a bulk frame is a frame head, which consists of the $
character followed by the data length in bytes. The majority of the frame is the contents of the Bytes
value. If the data is large, copying it to an intermediate buffer would be costly.
To implement buffered writes, we will use the BufWriter
struct. This struct is initialized with a T: AsyncWrite
and implements AsyncWrite
itself. When write
is called on BufWriter
, the write does not go directly to the inner writer, but to a buffer. When the buffer is full, the contents are flushed to the inner writer and the inner buffer is cleared. There are also optimizations that allow bypassing the buffer in certain cases.
We will not attempt a full implementation of write_frame()
as part of the tutorial. See the full implementation here.
First, the Connection
struct is updated:
use tokio::io::BufWriter;
use tokio::net::TcpStream;
use bytes::BytesMut;
pub struct Connection {
stream: BufWriter<TcpStream>,
buffer: BytesMut,
}
impl Connection {
pub fn new(stream: TcpStream) -> Connection {
Connection {
stream: BufWriter::new(stream),
buffer: BytesMut::with_capacity(4096),
}
}
}
Next, write_frame()
is implemented.
use tokio::io::{self, AsyncWriteExt};
use mini_redis::Frame;
async fn write_frame(&mut self, frame: &Frame)
-> io::Result<()>
{
match frame {
Frame::Simple(val) => {
self.stream.write_u8(b'+').await?;
self.stream.write_all(val.as_bytes()).await?;
self.stream.write_all(b"\r\n").await?;
}
Frame::Error(val) => {
self.stream.write_u8(b'-').await?;
self.stream.write_all(val.as_bytes()).await?;
self.stream.write_all(b"\r\n").await?;
}
Frame::Integer(val) => {
self.stream.write_u8(b':').await?;
self.write_decimal(*val).await?;
}
Frame::Null => {
self.stream.write_all(b"$-1\r\n").await?;
}
Frame::Bulk(val) => {
let len = val.len();
self.stream.write_u8(b'$').await?;
self.write_decimal(len as u64).await?;
self.stream.write_all(val).await?;
self.stream.write_all(b"\r\n").await?;
}
Frame::Array(_val) => unimplemented!(),
}
self.stream.flush().await;
Ok(())
}
The functions used here are provided by AsyncWriteExt
. They are available on TcpStream
as well, but it would not be advisable to issue single byte writes without the intermediate buffer.
write_u8
writes a single byte to the writer.write_all
writes the entire slice to the writer.write_decimal
is implemented by mini-redis.
The function ends with a call to self.stream.flush().await
. Because BufWriter
stores writes in an intermediate buffer, calls to write
do not guarantee that the data is written to the socket. Before returning, we want the frame to be written to the socket. The call to flush()
writes any data pending in the buffer to the socket.
Another alternative would be to not call flush()
in write_frame()
. Instead, provide a flush()
function on Connection
. This would allow the caller to write queue multiple small frames in the write buffer then write them all to the socket with one write
syscall. Doing this complicates the Connection
API. Simplicity is one of Mini-Redis' goals, so we decided to include the flush().await
call in fn write_frame()
.
本文中的所有译文仅用于学习和交流目的,转载请务必注明文章译者、出处、和本文链接
我们的翻译工作遵照 CC 协议,如果我们的工作有侵犯到您的权益,请及时联系我们。