翻译进度
8
分块数量
4
参与人数

I/O

这是一篇协同翻译的文章,你可以点击『我来翻译』按钮来参与翻译。


I/O

在 Tokio 中, I/O 操作除了支持异步之外,几乎跟 std 一模一样。它有一个用于读取的 特征 ( 译者注:trait,后续都译为特征 ) (AsyncRead) 和用于写入的特征 (AsyncWrite)。特定的类型根据需要实现这些特征,比如 (TcpStreamFileStdout).。部分数据结构也实现了 AsyncRead 和 AsyncWrite ,比如 Vec<u8> 和 &[u8],这提供了使用字节数组作为读写器的能力。

本章将通过一些例子说明 Tokio 中基本的 I/O 读写操作,更深入的实例将在下一章节进行介绍。

AsyncRead 和 AsyncWrite

这两个特征提供了从字节流中异步读写数据的能力。就像您通常不会手动从 Future 特征调用 poll 方法一样,它们提供的方法一般不会被直接调用,通常是通过实现了 AsyncReadExt 和 AsyncWriteExt 的工具函数进行调用。

让我们简单地看看其中的一些方法,注意这些方法都是 异步 的,必须搭配 .await 使用。

ziyouwa 翻译于 3周前

async fn read()

AsyncReadExt::read 提供了异步读取数据到缓冲区的能力,返回值是读取的字节数。

注意: 当 read() 返回 Ok(0)时,意味着流已经关闭。任何调用 read() 的异步任务( future:翻译为异步任务)都将立即完成并返回 Ok(0)。 在 TcpStream 实例中,意味着套接字的读取一方已经关闭。

use tokio::fs::File;
use tokio::io::{self, AsyncReadExt};

#[tokio::main]
async fn main() -> io::Result<()> {
    let mut f = File::open("foo.txt").await?;
    let mut buffer = [0; 10];

    // 读取最多 10 字节
    let n = f.read(&mut buffer[..]).await?;

    println!("The bytes: {:?}", &buffer[..n]);
    Ok(())
}

async fn read_to_end()

AsyncReadExt::read_to_end 从流中读取数据,直到遇到EOF( EOF, End Of File(文件结束符)的缩写,用于表示输入或文件的结尾)

use tokio::io::{self, AsyncReadExt};
use tokio::fs::File;

#[tokio::main]
async fn main() -> io::Result<()> {
    let mut f = File::open("foo.txt").await?;
    let mut buffer = Vec::new();

    // 读取整个文件
    f.read_to_end(&mut buffer).await?;
    Ok(())
}

async fn write()

AsyncWriteExt::write 将缓存数据写入写入器,返回写入的字节数。

use tokio::io::{self, AsyncWriteExt};
use tokio::fs::File;

#[tokio::main]
async fn main() -> io::Result<()> {
    let mut file = File::create("foo.txt").await?;

    // 从头开始写入字符串,但不一定是全部
    let n = file.write(b"some bytes").await?;

    println!("Wrote the first {} bytes of 'some bytes'.", n);
    Ok(())
}
ziyouwa 翻译于 3周前

async fn write_all()

AsyncWriteExt::write_all 用于将整个缓冲区写入到写入器中

use tokio::io::{self, AsyncWriteExt};
use tokio::fs::File;

#[tokio::main]
async fn main() -> io::Result<()> {
    let mut file = File::create("foo.txt").await?;

    file.write_all(b"some bytes").await?;
    Ok(())
}

这两个特征还包括许多其他有用的方法。请参阅API文档以获得全面的列表。

辅助函数

此外, 就像 std, 的 tokio::io 模块包含许多有用的实用程序函数以及用于处理的api : standard inputstandard output 和 standard error. 例如, tokio::io::copy 可以异步地将读取器的全部内容复制到写入器中。

use tokio::fs::File;
use tokio::io;

#[tokio::main]
async fn main() -> io::Result<()> {
    let mut reader: &[u8] = b"hello";
    let mut file = File::create("foo.txt").await?;

    io::copy(&mut reader, &mut file).await?;
    Ok(())
}

请注意,这是因为字节数组也实现了AsyncRead.

Echo 服务器

让我们练习一下异步I/O。我们将编写一个echo服务器.

echo服务器绑定一个TcpListener并在循环中接受入站连接。对于每个入站连接,从套接字读取数据并立即将其写回套接字。客户端向服务器发送数据并接收完全相同的数据。

EdingWater 翻译于 1年前

We will implement the echo server twice, using slightly different strategies.

Using io::copy()

To start, we will implement the echo logic using the io::copy utility.

You can write up this code in a new binary file:

touch src/bin/echo-server-copy.rs

That you can launch (or just check the compilation) with:

cargo run --bin echo-server-copy

You will be able to try the server using a standard command-line tool such as telnet, or by writing a simple client like the one found in the documentation for tokio::net::TcpStream.

This is a TCP server and needs an accept loop. A new task is spawned to process each accepted socket.

use tokio::io;
use tokio::net::TcpListener;

#[tokio::main]
async fn main() -> io::Result<()> {
    let listener = TcpListener::bind("127.0.0.1:6142").await?;

    loop {
        let (mut socket, _) = listener.accept().await?;

        tokio::spawn(async move {
            // Copy data here
        });
    }
}

As seen earlier, this utility function takes a reader and a writer and copies data from one to the other. However, we only have a single TcpStream. This single value implements both AsyncRead and AsyncWrite. Because io::copy requires &mut for both the reader and the writer, the socket cannot be used for both arguments.

// This fails to compile
io::copy(&mut socket, &mut socket).await

Splitting a reader + writer

To work around this problem, we must split the socket into a reader handle and a writer handle. The best way to split a reader/writer combo depends on the specific type.

Any reader + writer type can be split using the io::split utility. This function takes a single value and returns separate reader and writer handles. These two handles can be used independently, including from separate tasks.

For example, the echo client could handle concurrent reads and writes like this:

use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;

#[tokio::main]
async fn main() -> io::Result<()> {
    let socket = TcpStream::connect("127.0.0.1:6142").await?;
    let (mut rd, mut wr) = io::split(socket);

    // Write data in the background
    tokio::spawn(async move {
        wr.write_all(b"hello\r\n").await?;
        wr.write_all(b"world\r\n").await?;

        // Sometimes, the rust type inferencer needs
        // a little help
        Ok::<_, io::Error>(())
    });

    let mut buf = vec![0; 128];

    loop {
        let n = rd.read(&mut buf).await?;

        if n == 0 {
            break;
        }

        println!("GOT {:?}", &buf[..n]);
    }

    Ok(())
}

Because io::split supports any value that implements AsyncRead + AsyncWrite and returns independent handles, internally io::split uses an Arc and a Mutex. This overhead can be avoided with TcpStreamTcpStream offers two specialized split functions.

TcpStream::split takes a reference to the stream and returns a reader and writer handle. Because a reference is used, both handles must stay on the same task that split() was called from. This specialized split is zero-cost. There is no Arc or Mutex needed. TcpStream also provides into_split which supports handles that can move across tasks at the cost of only an Arc.

Because io::copy() is called on the same task that owns the TcpStream, we can use TcpStream::split. The task that processes the echo logic in the server becomes:

tokio::spawn(async move {
    let (mut rd, mut wr) = socket.split();

    if io::copy(&mut rd, &mut wr).await.is_err() {
        eprintln!("failed to copy");
    }
});

You can find the entire code here.

Manual copying

Now let's look at how we would write the echo server by copying the data manually. To do this, we use AsyncReadExt::read and AsyncWriteExt::write_all.

The full echo server is as follows:

use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;

#[tokio::main]
async fn main() -> io::Result<()> {
    let listener = TcpListener::bind("127.0.0.1:6142").await?;

    loop {
        let (mut socket, _) = listener.accept().await?;

        tokio::spawn(async move {
            let mut buf = vec![0; 1024];

            loop {
                match socket.read(&mut buf).await {
                    // Return value of `Ok(0)` signifies that the remote has
                    // closed
                    Ok(0) => return,
                    Ok(n) => {
                        // Copy the data back to socket
                        if socket.write_all(&buf[..n]).await.is_err() {
                            // Unexpected socket error. There isn't much we can
                            // do here so just stop processing.
                            return;
                        }
                    }
                    Err(_) => {
                        // Unexpected socket error. There isn't much we can do
                        // here so just stop processing.
                        return;
                    }
                }
            }
        });
    }
}

(You can put this code into src/bin/echo-server.rs and launch it with cargo run --bin echo-server).

Let's break it down. First, since the AsyncRead and AsyncWrite utilities are used, the extension traits must be brought into scope.

use tokio::io::{self, AsyncReadExt, AsyncWriteExt};

Allocating a buffer

The strategy is to read some data from the socket into a buffer then write the contents of the buffer back to the socket.

let mut buf = vec![0; 1024];

A stack buffer is explicitly avoided. Recall from earlier, we noted that all task data that lives across calls to .await must be stored by the task. In this case, buf is used across .await calls. All task data is stored in a single allocation. You can think of it as an enum where each variant is the data that needs to be stored for a specific call to .await.

If the buffer is represented by a stack array, the internal structure for tasks spawned per accepted socket might look something like:

struct Task {
    // internal task fields here
    task: enum {
        AwaitingRead {
            socket: TcpStream,
            buf: [BufferType],
        },
        AwaitingWriteAll {
            socket: TcpStream,
            buf: [BufferType],
        }

    }
}

If a stack array is used as the buffer type, it will be stored inline in the task structure. This will make the task structure very big. Additionally, buffer sizes are often page sized. This will, in turn, make Task an awkward size: $page-size + a-few-bytes.

The compiler optimizes the layout of async blocks further than a basic enum. In practice, variables are not moved around between variants as would be required with an enum. However, the task struct size is at least as big as the largest variable.

Because of this, it is usually more efficient to use a dedicated allocation for the buffer.

Handling EOF

When the read half of the TCP stream is shut down, a call to read() returns Ok(0). It is important to exit the read loop at this point. Forgetting to break from the read loop on EOF is a common source of bugs.

loop {
    match socket.read(&mut buf).await {
        // Return value of `Ok(0)` signifies that the remote has
        // closed
        Ok(0) => return,
        // ... other cases handled here
    }
}

Forgetting to break from the read loop usually results in a 100% CPU infinite loop situation. As the socket is closed, socket.read() returns immediately. The loop then repeats forever.

Full code can be found here.

本文章首发在 LearnKu.com 网站上。

本文中的所有译文仅用于学习和交流目的,转载请务必注明文章译者、出处、和本文链接
我们的翻译工作遵照 CC 协议,如果我们的工作有侵犯到您的权益,请及时联系我们。

贡献者:4
讨论数量: 0
发起讨论 查看所有版本


暂无话题~