翻译进度
2
分块数量
2
参与人数

3.5.连接 Readers 和 Writers

这是一篇协同翻译的文章,你可以点击『我来翻译』按钮来参与翻译。

原文链接:book.async.rs/


Connecting Readers and Writers

So how do we make sure that messages read in connection_loop flow into the relevant connection_writer_loop?
We should somehow maintain a peers: HashMap<String, Sender<String>> map which allows a client to find destination channels.
However, this map would be a bit of shared mutable state, so we'll have to wrap an RwLock over it and answer tough questions of what should happen if the client joins at the same moment as it receives a message.

One trick to make reasoning about state simpler comes from the actor model.
We can create a dedicated broker task which owns the peers map and communicates with other tasks using channels.
By hiding peers inside such an "actor" task, we remove the need for mutexes and also make the serialization point explicit.
The order of events "Bob sends message to Alice" and "Alice joins" is determined by the order of the corresponding events in the broker's event queue.

# extern crate async_std;
# extern crate futures;
# use async_std::{
#     net::TcpStream,
#     prelude::*,
#     task,
# };
# use futures::channel::mpsc;
# use futures::sink::SinkExt;
# use std::sync::Arc;
#
# type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
# type Sender<T> = mpsc::UnboundedSender<T>;
# type Receiver<T> = mpsc::UnboundedReceiver<T>;
#
# async fn connection_writer_loop(
#     mut messages: Receiver<String>,
#     stream: Arc<TcpStream>,
# ) -> Result<()> {
#     let mut stream = &*stream;
#     while let Some(msg) = messages.next().await {
#         stream.write_all(msg.as_bytes()).await?;
#     }
#     Ok(())
# }
#
# fn spawn_and_log_error<F>(fut: F) -> task::JoinHandle<()>
# where
#     F: Future<Output = Result<()>> + Send + 'static,
# {
#     task::spawn(async move {
#         if let Err(e) = fut.await {
#             eprintln!("{}", e)
#         }
#     })
# }
#
use std::collections::hash_map::{Entry, HashMap};

#[derive(Debug)]
enum Event { // 1
    NewPeer {
        name: String,
        stream: Arc<TcpStream>,
    },
    Message {
        from: String,
        to: Vec<String>,
        msg: String,
    },
}

async fn broker_loop(mut events: Receiver<Event>) -> Result<()> {
    let mut peers: HashMap<String, Sender<String>> = HashMap::new(); // 2

    while let Some(event) = events.next().await {
        match event {
            Event::Message { from, to, msg } => {  // 3
                for addr in to {
                    if let Some(peer) = peers.get_mut(&addr) {
                        let msg = format!("from {}: {}\n", from, msg);
                        peer.send(msg).await?
                    }
                }
            }
            Event::NewPeer { name, stream } => {
                match peers.entry(name) {
                    Entry::Occupied(..) => (),
                    Entry::Vacant(entry) => {
                        let (client_sender, client_receiver) = mpsc::unbounded();
                        entry.insert(client_sender); // 4
                        spawn_and_log_error(connection_writer_loop(client_receiver, stream)); // 5
                    }
                }
            }
        }
    }
    Ok(())
}
  1. 代理task应该处理两种类型的事件: 消息或者新的对等链接.
  2. 代理的内部状态是一个 HashMap.
    请注意, 这里我们没有使用 Mutex (排它锁) 并且可以很自信是说 , 在代理循环的每次迭代中, 都可以知道当前的对等链接是什么.
  3. 为了处理一个消息, 我们通过通道(channel)将其发送每一个目的地.
  4. 为了处理每一个新的对等链接, 首先要在一个对等链接的map中注册它 ...
  5. ... 然后生成一个专门的 task 将消息实际写入socket(套接字)中.
dslchd 翻译于 3年前

本文章首发在 LearnKu.com 网站上。

本文中的所有译文仅用于学习和交流目的,转载请务必注明文章译者、出处、和本文链接
我们的翻译工作遵照 CC 协议,如果我们的工作有侵犯到您的权益,请及时联系我们。

贡献者:2
讨论数量: 0
发起讨论 只看当前版本


暂无话题~