4.2.可用于生产环境的 Accept 循环
这是一篇协同翻译的文章,你可以点击『我来翻译』按钮来参与翻译。
原文链接:book.async.rs/
可用于生产环境的 Accept 循环
一个可用于生产环境的 Accept 循环需要遵循下面两条规则:
- 处理错误
- 限制并发连接数量,为了防止Dos攻击
错误处理
在accept
循环中有两种错误需要处理:
- 每个新连接都可能出现的错误。当新连接在接收队列中,但对端关闭套接字,此时会触发该错误。操作系统通过这个错误通知用户进程。然而,后续的新连接也会加入到接收队列中,此时我们应该处理该错误,并立即接收其他新连接。
- 系统资源耗尽。当我们遇到这种情况时,没有理由理解接收新连接,因为没有资源分配给这个新连接。此时应该让新连接保持在接收队列中,等待系统资源不那么紧缺时,再尝试接收新连接。
下面是每个新连接都可能面临的错误(在正常和调试模式下):
Error: Connection reset by peer (os error 104)
Error: Os { code: 104, kind: ConnectionReset, message: "Connection reset by peer" }
下面是系统资源短缺最常见的错误:
Error: Too many open files (os error 24)
Error: Os { code: 24, kind: Other, message: "Too many open files" }
Testing Application
To test your application for these errors try the following (this works
on unixes only).
Lower limits and start the application:
$ ulimit -n 100
$ cargo run --example your_app
Compiling your_app v0.1.0 (/work)
Finished dev [unoptimized + debuginfo] target(s) in 5.47s
Running `target/debug/examples/your_app`
Server is listening on: http://127.0.0.1:1234
Then in another console run the [wrk
] benchmark tool:
$ wrk -c 1000 http://127.0.0.1:1234
Running 10s test @ http://localhost:8080/
2 threads and 1000 connections
$ telnet localhost 1234
Trying ::1...
Connected to localhost.
Important is to check the following things:
- The application doesn't crash on error (but may log errors, see below)
- It's possible to connect to the application again once load is stopped
(few seconds afterwrk
). This is whattelnet
does in example above,
make sure it printsConnected to <hostname>
. - The
Too many open files
error is logged in the appropriate log. This
requires to set "maximum number of simultaneous connections" parameter (see
below) of your application to a value greater then100
for this example. - Check CPU usage of the app while doing a test. It should not occupy 100%
of a single CPU core (it's unlikely that you can exhaust CPU by 1000
connections in Rust, so this means error handling is not right).
Testing non-HTTP applications
If it's possible, use the appropriate benchmark tool and set the appropriate
number of connections. For example redis-benchmark
has a -c
parameter for
that, if you implement redis protocol.
Alternatively, can still use wrk
, just make sure that connection is not
immediately closed. If it is, put a temporary timeout before handing
the connection to the protocol handler, like this:
# extern crate async_std;
# use std::time::Duration;
# use async_std::{
# net::{TcpListener, ToSocketAddrs},
# prelude::*,
# };
#
# type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
#
#async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> {
# let listener = TcpListener::bind(addr).await?;
# let mut incoming = listener.incoming();
while let Some(stream) = incoming.next().await {
task::spawn(async {
task::sleep(Duration::from_secs(10)).await; // 1
connection_loop(stream).await;
});
}
# Ok(())
# }
- Make sure the sleep coroutine is inside the spawned task, not in the loop.
Handling Errors Manually
Here is how basic accept loop could look like:
# extern crate async_std;
# use std::time::Duration;
# use async_std::{
# net::{TcpListener, ToSocketAddrs},
# prelude::*,
# };
#
# type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
#
async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> {
let listener = TcpListener::bind(addr).await?;
let mut incoming = listener.incoming();
while let Some(result) = incoming.next().await {
let stream = match stream {
Err(ref e) if is_connection_error(e) => continue, // 1
Err(e) => {
eprintln!("Error: {}. Pausing for 500ms."); // 3
task::sleep(Duration::from_millis(500)).await; // 2
continue;
}
Ok(s) => s,
};
// body
}
Ok(())
}
- Ignore per-connection errors.
- Sleep and continue on resource shortage.
- It's important to log the message, because these errors commonly mean the
misconfiguration of the system and are helpful for operations people running
the application.
Be sure to test your application.
External Crates
The crate async-listen
has a helper to achieve this task:
# extern crate async_std;
# extern crate async_listen;
# use std::time::Duration;
# use async_std::{
# net::{TcpListener, ToSocketAddrs},
# prelude::*,
# };
#
# type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
#
use async_listen::{ListenExt, error_hint};
async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> {
let listener = TcpListener::bind(addr).await?;
let mut incoming = listener
.incoming()
.log_warnings(log_accept_error) // 1
.handle_errors(Duration::from_millis(500));
while let Some(socket) = incoming.next().await { // 2
// body
}
Ok(())
}
fn log_accept_error(e: &io::Error) {
eprintln!("Error: {}. Listener paused for 0.5s. {}", e, error_hint(e)) // 3
}
- Logs resource shortages (
async-listen
calls them warnings). If you uselog
crate or any other in your app this should go to the log. - Stream yields sockets without
Result
wrapper afterhandle_errors
because
all errors are already handled. - Together with the error we print a hint, which explains some errors for end
users. For example, it recommends increasing open file limit and gives
a link.
Be sure to test your application.
Connections Limit
Even if you've applied everything described in
Handling Errors section, there is still a problem.
Let's imagine you have a server that needs to open a file to process
client request. At some point, you might encounter the following situation:
- There are as many client connection as max file descriptors allowed for
the application. - Listener gets
Too many open files
error so it sleeps. - Some client sends a request via the previously open connection.
- Opening a file to serve request fails, because of the same
Too many open files
error, until some other client drops a connection.
There are many more possible situations, this is just a small illustation that
limiting number of connections is very useful. Generally, it's one of the ways
to control resources used by a server and avoiding some kinds of deny of
service (DoS) attacks.
async-listen
crate
Limiting maximum number of simultaneous connections with [async-listen
]
looks like the following:
# extern crate async_std;
# extern crate async_listen;
# use std::time::Duration;
# use async_std::{
# net::{TcpListener, TcpStream, ToSocketAddrs},
# prelude::*,
# };
#
# type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
#
use async_listen::{ListenExt, Token, error_hint};
async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> {
let listener = TcpListener::bind(addr).await?;
let mut incoming = listener
.incoming()
.log_warnings(log_accept_error)
.handle_errors(Duration::from_millis(500)) // 1
.backpressure(100);
while let Some((token, socket)) = incoming.next().await { // 2
task::spawn(async move {
connection_loop(&token, stream).await; // 3
});
}
Ok(())
}
async fn connection_loop(_token: &Token, stream: TcpStream) { // 4
// ...
}
# fn log_accept_error(e: &io::Error) {
# eprintln!("Error: {}. Listener paused for 0.5s. {}", e, error_hint(e));
# }
- We need to handle errors first, because
backpressure
helper expects
stream ofTcpStream
rather thanResult
. - The token yielded by a new stream is what is counted by backpressure helper.
I.e. if you drop a token, new connection can be established. - We give the connection loop a reference to token to bind token's lifetime to
the lifetime of the connection. - The token itsellf in the function can be ignored, hence
_token
Be sure to test this behavior.
本文中的所有译文仅用于学习和交流目的,转载请务必注明文章译者、出处、和本文链接
我们的翻译工作遵照 CC 协议,如果我们的工作有侵犯到您的权益,请及时联系我们。