3.7.安全停机

未匹配的标注

优雅关闭程序

当前面临的一个问题是,没有处理好如何优雅地关闭程序。如果程序从accept循环中意外退出,其他还没有处理或正在处理的任务将会被丢弃。

关闭程序正确的顺序应该是:

  1. 停止接收客户端
  2. 分发所有没有处理的任务,并等待处理完成
  3. 退出程序

即使可能会出现一些魔幻的手段,但优雅地关闭一个基于通道通信模式的程序是很容易的。在Rust中,当所有的发送端关闭了发送通道,接收通道也会随着关闭。所以,当生产者退出或关闭发送通道,程序很自然就会自动退出。

async_std可以理解为下面两条规则:

  1. 确保通道没有环路
  2. 等待系统中间层(broker层)处理完所有的任务

a-chat程序中,我们已经有一个单向的数据流处理逻辑: reader -> broker -> writer。然而,我们并没有等待brokerwriter处理完就退出程序,可能会造成任务丢失。

下面让我们添加等待逻辑

# extern crate async_std;
# extern crate futures;
# use async_std::{
#     io::{self, BufReader},
#     net::{TcpListener, TcpStream, ToSocketAddrs},
#     prelude::*,
#     task,
# };
# use futures::channel::mpsc;
# use futures::sink::SinkExt;
# use std::{
#     collections::hash_map::{HashMap, Entry},
#     sync::Arc,
# };
#
# type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
# type Sender<T> = mpsc::UnboundedSender<T>;
# type Receiver<T> = mpsc::UnboundedReceiver<T>;
#
# fn spawn_and_log_error<F>(fut: F) -> task::JoinHandle<()>
# where
#     F: Future<Output = Result<()>> + Send + 'static,
# {
#     task::spawn(async move {
#         if let Err(e) = fut.await {
#             eprintln!("{}", e)
#         }
#     })
# }
#
#
# async fn connection_loop(mut broker: Sender<Event>, stream: TcpStream) -> Result<()> {
#     let stream = Arc::new(stream); // 2
#     let reader = BufReader::new(&*stream);
#     let mut lines = reader.lines();
#
#     let name = match lines.next().await {
#         None => Err("peer disconnected immediately")?,
#         Some(line) => line?,
#     };
#     broker.send(Event::NewPeer { name: name.clone(), stream: Arc::clone(&stream) }).await // 3
#         .unwrap();
#
#     while let Some(line) = lines.next().await {
#         let line = line?;
#         let (dest, msg) = match line.find(':') {
#             None => continue,
#             Some(idx) => (&line[..idx], line[idx + 1 ..].trim()),
#         };
#         let dest: Vec<String> = dest.split(',').map(|name| name.trim().to_string()).collect();
#         let msg: String = msg.trim().to_string();
#
#         broker.send(Event::Message { // 4
#             from: name.clone(),
#             to: dest,
#             msg,
#         }).await.unwrap();
#     }
#     Ok(())
# }
#
# async fn connection_writer_loop(
#     mut messages: Receiver<String>,
#     stream: Arc<TcpStream>,
# ) -> Result<()> {
#     let mut stream = &*stream;
#     while let Some(msg) = messages.next().await {
#         stream.write_all(msg.as_bytes()).await?;
#     }
#     Ok(())
# }
#
# #[derive(Debug)]
# enum Event {
#     NewPeer {
#         name: String,
#         stream: Arc<TcpStream>,
#     },
#     Message {
#         from: String,
#         to: Vec<String>,
#         msg: String,
#     },
# }
#
# async fn broker_loop(mut events: Receiver<Event>) -> Result<()> {
#     let mut peers: HashMap<String, Sender<String>> = HashMap::new();
#
#     while let Some(event) = events.next().await {
#         match event {
#             Event::Message { from, to, msg } => {
#                 for addr in to {
#                     if let Some(peer) = peers.get_mut(&addr) {
#                         let msg = format!("from {}: {}\n", from, msg);
#                         peer.send(msg).await?
#                     }
#                 }
#             }
#             Event::NewPeer { name, stream} => {
#                 match peers.entry(name) {
#                     Entry::Occupied(..) => (),
#                     Entry::Vacant(entry) => {
#                         let (client_sender, client_receiver) = mpsc::unbounded();
#                         entry.insert(client_sender); // 4
#                         spawn_and_log_error(connection_writer_loop(client_receiver, stream)); // 5
#                     }
#                 }
#             }
#         }
#     }
#     Ok(())
# }
#
async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> {
    let listener = TcpListener::bind(addr).await?;

    let (broker_sender, broker_receiver) = mpsc::unbounded();
    let broker_handle = task::spawn(broker_loop(broker_receiver));
    let mut incoming = listener.incoming();
    while let Some(stream) = incoming.next().await {
        let stream = stream?;
        println!("Accepting from: {}", stream.peer_addr()?);
        spawn_and_log_error(connection_loop(broker_sender.clone(), stream));
    }
    drop(broker_sender); // 1
    broker_handle.await?; // 5
    Ok(())
}

添加一个代理(中介层)

# extern crate async_std;
# extern crate futures;
# use async_std::{
#     io::{self, BufReader},
#     net::{TcpListener, TcpStream, ToSocketAddrs},
#     prelude::*,
#     task,
# };
# use futures::channel::mpsc;
# use futures::sink::SinkExt;
# use std::{
#     collections::hash_map::{HashMap, Entry},
#     sync::Arc,
# };
#
# type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
# type Sender<T> = mpsc::UnboundedSender<T>;
# type Receiver<T> = mpsc::UnboundedReceiver<T>;
#
# async fn connection_writer_loop(
#     mut messages: Receiver<String>,
#     stream: Arc<TcpStream>,
# ) -> Result<()> {
#     let mut stream = &*stream;
#     while let Some(msg) = messages.next().await {
#         stream.write_all(msg.as_bytes()).await?;
#     }
#     Ok(())
# }
#
# fn spawn_and_log_error<F>(fut: F) -> task::JoinHandle<()>
# where
#     F: Future<Output = Result<()>> + Send + 'static,
# {
#     task::spawn(async move {
#         if let Err(e) = fut.await {
#             eprintln!("{}", e)
#         }
#     })
# }
#
# #[derive(Debug)]
# enum Event {
#     NewPeer {
#         name: String,
#         stream: Arc<TcpStream>,
#     },
#     Message {
#         from: String,
#         to: Vec<String>,
#         msg: String,
#     },
# }
#
async fn broker_loop(mut events: Receiver<Event>) -> Result<()> {
    let mut writers = Vec::new();
    let mut peers: HashMap<String, Sender<String>> = HashMap::new();
    while let Some(event) = events.next().await { // 2
        match event {
            Event::Message { from, to, msg } => {
                for addr in to {
                    if let Some(peer) = peers.get_mut(&addr) {
                        let msg = format!("from {}: {}\n", from, msg);
                        peer.send(msg).await?
                    }
                }
            }
            Event::NewPeer { name, stream} => {
                match peers.entry(name) {
                    Entry::Occupied(..) => (),
                    Entry::Vacant(entry) => {
                        let (client_sender, client_receiver) = mpsc::unbounded();
                        entry.insert(client_sender);
                        let handle = spawn_and_log_error(connection_writer_loop(client_receiver, stream));
                        writers.push(handle); // 4
                    }
                }
            }
        }
    }
    drop(peers); // 3
    for writer in writers { // 4
        writer.await;
    }
    Ok(())
}

你应该注意到,当程序退出accept循环时,所有的通道将会如何变化。

  1. 我们首先关闭broker发送通道
    accept循环退出后,将不会再有数据写入broker的发送通道broker的发送通道也会关闭。
  2. 接着, broker接收通道也会随着关闭。 代码:while let Some(event) = events.next().await
  3. 随后,我们释放peer map,该操作会释放所有的writer 关联的发送通道
  4. 现在,broker等待所有writer返回。
  5. 最后,accept 循环等待broker返回。

这样,所有的任务都能正常处理完,程序也可以优雅退出了。

原文链接:book.async.rs/

本文章首发在 LearnKu.com 网站上。

本译文仅用于学习和交流目的,转载请务必注明文章译者、出处、和本文链接
我们的翻译工作遵照 CC 协议,如果我们的工作有侵犯到您的权益,请及时联系我们。

原文地址:https://learnku.com/docs/rust-async-std/...

译文地址:https://learnku.com/docs/rust-async-std/...

上一篇 下一篇
贡献者:2
讨论数量: 0
发起讨论 只看当前版本


暂无话题~