翻译进度
1
分块数量
0
参与人数
3.6.功能整合
这是一篇协同翻译的文章,你可以点击『我来翻译』按钮来参与翻译。
原文链接:book.async.rs/
All Together
At this point, we only need to start the broker to get a fully-functioning (in the happy case!) chat:
# extern crate async_std;
# extern crate futures;
use async_std::{
io::BufReader,
net::{TcpListener, TcpStream, ToSocketAddrs},
prelude::*,
task,
};
use futures::channel::mpsc;
use futures::sink::SinkExt;
use std::{
collections::hash_map::{HashMap, Entry},
sync::Arc,
};
type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
type Sender<T> = mpsc::UnboundedSender<T>;
type Receiver<T> = mpsc::UnboundedReceiver<T>;
// main
fn run() -> Result<()> {
task::block_on(accept_loop("127.0.0.1:8080"))
}
fn spawn_and_log_error<F>(fut: F) -> task::JoinHandle<()>
where
F: Future<Output = Result<()>> + Send + 'static,
{
task::spawn(async move {
if let Err(e) = fut.await {
eprintln!("{}", e)
}
})
}
async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> {
let listener = TcpListener::bind(addr).await?;
let (broker_sender, broker_receiver) = mpsc::unbounded(); // 1
let _broker_handle = task::spawn(broker_loop(broker_receiver));
let mut incoming = listener.incoming();
while let Some(stream) = incoming.next().await {
let stream = stream?;
println!("Accepting from: {}", stream.peer_addr()?);
spawn_and_log_error(connection_loop(broker_sender.clone(), stream));
}
Ok(())
}
async fn connection_loop(mut broker: Sender<Event>, stream: TcpStream) -> Result<()> {
let stream = Arc::new(stream); // 2
let reader = BufReader::new(&*stream);
let mut lines = reader.lines();
let name = match lines.next().await {
None => Err("peer disconnected immediately")?,
Some(line) => line?,
};
broker.send(Event::NewPeer { name: name.clone(), stream: Arc::clone(&stream) }).await // 3
.unwrap();
while let Some(line) = lines.next().await {
let line = line?;
let (dest, msg) = match line.find(':') {
None => continue,
Some(idx) => (&line[..idx], line[idx + 1 ..].trim()),
};
let dest: Vec<String> = dest.split(',').map(|name| name.trim().to_string()).collect();
let msg: String = msg.to_string();
broker.send(Event::Message { // 4
from: name.clone(),
to: dest,
msg,
}).await.unwrap();
}
Ok(())
}
async fn connection_writer_loop(
mut messages: Receiver<String>,
stream: Arc<TcpStream>,
) -> Result<()> {
let mut stream = &*stream;
while let Some(msg) = messages.next().await {
stream.write_all(msg.as_bytes()).await?;
}
Ok(())
}
#[derive(Debug)]
enum Event {
NewPeer {
name: String,
stream: Arc<TcpStream>,
},
Message {
from: String,
to: Vec<String>,
msg: String,
},
}
async fn broker_loop(mut events: Receiver<Event>) -> Result<()> {
let mut peers: HashMap<String, Sender<String>> = HashMap::new();
while let Some(event) = events.next().await {
match event {
Event::Message { from, to, msg } => {
for addr in to {
if let Some(peer) = peers.get_mut(&addr) {
let msg = format!("from {}: {}\n", from, msg);
peer.send(msg).await?
}
}
}
Event::NewPeer { name, stream} => {
match peers.entry(name) {
Entry::Occupied(..) => (),
Entry::Vacant(entry) => {
let (client_sender, client_receiver) = mpsc::unbounded();
entry.insert(client_sender); // 4
spawn_and_log_error(connection_writer_loop(client_receiver, stream)); // 5
}
}
}
}
}
Ok(())
}
- Inside the
accept_loop
, we create the broker's channel andtask
. - Inside
connection_loop
, we need to wrapTcpStream
into anArc
, to be able to share it with theconnection_writer_loop
. - On login, we notify the broker.
Note that we.unwrap
on send: broker should outlive all the clients and if that's not the case the broker probably panicked, so we can escalate the panic as well. - Similarly, we forward parsed messages to the broker, assuming that it is alive.
本文中的所有译文仅用于学习和交流目的,转载请务必注明文章译者、出处、和本文链接
我们的翻译工作遵照 CC 协议,如果我们的工作有侵犯到您的权益,请及时联系我们。