翻译进度
2
分块数量
2
参与人数

3.3.接收消息

这是一篇协同翻译的文章,你可以点击『我来翻译』按钮来参与翻译。

原文链接:book.async.rs/


接收消息

让我们来实现协议接收的部分.
我们需要:

  1. \n 来分割 TcpStream 输入并且以 utf-8编码解码字节
  2. 以第一行作为一个登录来解释
  3. 将其它的行解析为一个 login: message
# extern crate async_std;
# use async_std::{
#     net::{TcpListener, ToSocketAddrs},
#     prelude::*,
#     task,
# };
#
# type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
#
use async_std::{
    io::BufReader,
    net::TcpStream,
};

async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> {
    let listener = TcpListener::bind(addr).await?;
    let mut incoming = listener.incoming();
    while let Some(stream) = incoming.next().await {
        let stream = stream?;
        println!("Accepting from: {}", stream.peer_addr()?);
        let _handle = task::spawn(connection_loop(stream)); // 1
    }
    Ok(())
}

async fn connection_loop(stream: TcpStream) -> Result<()> {
    let reader = BufReader::new(&stream); // 2
    let mut lines = reader.lines();

    let name = match lines.next().await { // 3
        None => Err("peer disconnected immediately")?,
        Some(line) => line?,
    };
    println!("name = {}", name);

    while let Some(line) = lines.next().await { // 4
        let line = line?;
        let (dest, msg) = match line.find(':') { // 5
            None => continue,
            Some(idx) => (&line[..idx], line[idx + 1 ..].trim()),
        };
        let dest: Vec<String> = dest.split(',').map(|name| name.trim().to_string()).collect();
        let msg: String = msg.to_string();
    }
    Ok(())
}
  1. 我们使用 task::spawn 函数为每一个工作的客户端开启一个独立的任务.
    也就是说,在接受客户端后,accept_loop 立即开始等待下一个.
    这是事件驱动架构的核心优势: 我们可以同时为许多客户端提供服务,而无需花费更的多硬线程(译者注: 即操作系统管理的线程资源).

  2. 幸运的是, 以 "行来分割字节流"的函数功能已经实现.
    .lines() 的调用将返回一个 String 格式的流.

  3. 我们得到第一行作为 -- 登录

  4. 并且, 再次, 我们手动实现了一个异步循环.

  5. 最后, 我们将每一行解析为一个目标登录名和消息本身的列表.

dslchd 翻译于 3年前

Managing Errors

One serious problem in the above solution is that, while we correctly propagate errors in the connection_loop, we just drop the error on the floor afterwards!
That is, task::spawn does not return an error immediately (it can't, it needs to run the future to completion first), only after it is joined.
We can "fix" it by waiting for the task to be joined, like this:

# #![feature(async_closure)]
# extern crate async_std;
# use async_std::{
#     io::BufReader,
#     net::{TcpListener, TcpStream, ToSocketAddrs},
#     prelude::*,
#     task,
# };
#
# type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
#
# async fn connection_loop(stream: TcpStream) -> Result<()> {
#     let reader = BufReader::new(&stream); // 2
#     let mut lines = reader.lines();
#
#     let name = match lines.next().await { // 3
#         None => Err("peer disconnected immediately")?,
#         Some(line) => line?,
#     };
#     println!("name = {}", name);
#
#     while let Some(line) = lines.next().await { // 4
#         let line = line?;
#         let (dest, msg) = match line.find(':') { // 5
#             None => continue,
#             Some(idx) => (&line[..idx], line[idx + 1 ..].trim()),
#         };
#         let dest: Vec<String> = dest.split(',').map(|name| name.trim().to_string()).collect();
#         let msg: String = msg.trim().to_string();
#     }
#     Ok(())
# }
#
# async move |stream| {
let handle = task::spawn(connection_loop(stream));
handle.await
# };

The .await waits until the client finishes, and ? propagates the result.

There are two problems with this solution however!
First, because we immediately await the client, we can only handle one client at time, and that completely defeats the purpose of async!
Second, if a client encounters an IO error, the whole server immediately exits.
That is, a flaky internet connection of one peer brings down the whole chat room!

A correct way to handle client errors in this case is log them, and continue serving other clients.
So let's use a helper function for this:

# extern crate async_std;
# use async_std::{
#     io,
#     prelude::*,
#     task,
# };
fn spawn_and_log_error<F>(fut: F) -> task::JoinHandle<()>
where
    F: Future<Output = Result<()>> + Send + 'static,
{
    task::spawn(async move {
        if let Err(e) = fut.await {
            eprintln!("{}", e)
        }
    })
}

本文章首发在 LearnKu.com 网站上。

本文中的所有译文仅用于学习和交流目的,转载请务必注明文章译者、出处、和本文链接
我们的翻译工作遵照 CC 协议,如果我们的工作有侵犯到您的权益,请及时联系我们。

贡献者:2
讨论数量: 0
发起讨论 只看当前版本


暂无话题~