用 Rust 封装 HTTP Client
HTTP 是日常开发中,应用最广泛的协议。即使是服务端开发,也已经要使用 HTTP 协议和第三方进行交互,比如微信公众号、支付、回调等场景。
但是大部分时候,我们只是简单的封装一个 HTTP Client,然后发送请求、完成功能而已,并没有考虑封装一个更为全面、健壮的 Client,比如为其加入重试、指数退避等能力。
这篇文章描述了如何使用 Rust 封装一个强大、易用的 Http Client。
从 Hello World 开始
我最近在筹划用 Rust 来写一个微服务版本的 Hello World,目前有一个 demo。我们就来 Rust 来请求它吧。
1. 安装依赖
在开始之前,首先需要安装实验相关的依赖, tokio 和 reqwest:
[dependencies]
tokio = { version = "1.44.2", features = ["full"] }
reqwest = { version = "0.12.15", features = ["json"] }
关于 tokio 是 Rust 中一个异步运行时,而 reqwest 则是一个强大的 Http Client。本文的目标是封装一个 Http Client,但不意味着要造轮子复刻一个 reqwest,而是说基于他进行二次封装。
2. 使用 reqwest 发送请求
在日常的工作中,我们可能只是简单地发送一个请求。牛马般的开发就像高速公路的建造一样,追求里程数,质量可以靠边了。
我们先从一个简单的例子开始,然后逐步扩展能力。一个基础的发送请求的实例如下:
#[tokio::main]
async fn main() {
let result = reqwest::get("https://rust.demo.testing.icu").await;
match result {
Ok(response) => {
println!("{}", response.text().await.unwrap());
},
Err(error) => {
println!("{}", error.to_string());
}
}
}
如果我的服务正常的话,会输出Hello World
,你也可以创建自己的 Mock 服务来做测试,这并不是一件困难的事情。
reqwest 这个库的使用非常简单,但是你有没有考虑过,如果因为网络或其他问题导致请求失败,是否需要重试呢?
重试失败的请求
当我们和第三方通过 HTTP 进行交互的时候,请求失败是常有的事情。有时候是因为对方服务异常、有时候是因为网路波动、有时候可能是因为近期太阳黑子活动剧烈,导致电子设备元件不可靠……这时候,我们可以采取对失败的请求进行重试,说不定再一次请求就成功了呢。
reqwest 是一个轻量级的 HTTP Client, 当中并不包含失败重试的功能。需要我们自行封装一个中间件(Middleware)来处理重试。
1. 定义 Middleware
我们来定义一个中间件,它只有一个字段 Config
:
pub struct RetryMiddleware {
pub config: RetryConfig,
}
2. 定义中间件的配置
而 RetryConfig
支持间隔固定时间的进行重试:
#[derive(Debug, Clone)]
pub struct RetryConfig {
pub max_retries: u32,
pub retry_interval: u64,
}
为了构建方便,实现 Default
trait:
impl Default for RetryConfig {
fn default() -> Self {
Self {
max_retries: 3,
retry_interval: 1000,
}
}
}
这里的配置的含义是,最多重试 3 次,每次固定间隔 1000 毫秒。
3. 实现 Middleware 发送请求
我们先抛开重试这件事情,先完成使用它来发送一个正常的请求:
impl RetryMiddleware {
pub fn new(config: RetryConfig) -> Self {
Self { config }
}
pub fn default() -> Self {
Self::new(RetryConfig::default())
}
pub async fn execute(&self, client: &Client, request: Request) -> Result<String, Error> {
match client.execute(request).await {
Ok(response) => Ok(response.text().await?),
Err(err) => Err(err),
}
}
}
RetryMiddleware
接收了从调用者传递的 client
, 然后调用 client.execute()
方法。虽然,到目前为止,这个封装没有什么意义,也暂时没有实现重试的能力,但是为之后打下基础。先来测试一下,是否能正常请求:
use reqwest::{Client, Error, Request};
#[tokio::main]
async fn main() {
let client = Client::new();
let request = client.get("https://rust.demo.testing.icu").build().unwrap();
let result = RetryMiddleware::default().execute(&client, request).await;
match result {
Ok(response) => {
println!("{}", response);
}
Err(error) => {
println!("{}", error.to_string());
}
}
}
4. 实现重试机制
接下来就要实现重试机制了,如果请求失败,则按照固定的间隔和最大重试次数进行重试,超过最大重试次数则返回错误:
pub async fn execute(&self, client: &Client, mut request: Request) -> Result<String, Error> {
let mut attempt: u32 = 0;
loop {
let req_clone = request.try_clone().expect("Failed to clone request");
let result = client.execute(request).await;
if result.is_ok() {
return Ok(result.unwrap().text().await?);
}
if attempt >= self.config.max_retries {
return Err(result.unwrap_err());
}
attempt += 1;
tokio::time::sleep(Duration::from_millis(self.config.retry_interval)).await;
// 准备下一次重试
request = req_clone;
}
通过 loop
来进行重试,每次检查是否返回错误,如果返回错误则进行重试。但是,你有没有想到,是实际的生产中,是否重试的判断可能没有那么简单,除了服务端返回的 HTTP 状态码外,还返回自定义的状态码,或者具体的业务值来综合判断是否需要重试。
所以,是否重试这个逻辑判断按道理应该交给调用者来决定,而不是服务自己拍脑子决定。
5. 自定义重试逻辑
接下来,我们来完成自定义重试的逻辑。调用房可以传入一个匿名函数并返回一个布尔值,来决定是否重试。函数类型的定义如下:
pub type RetryPredicate = Box<dyn Fn(&RequestResult) -> bool + Send + Sync>;
这个函数接收一个 RequestResult
类型,表示请求的结果。所以我们需要对之前的代码稍加改造:
// 创建一个枚举类,表示请求结果
pub enum RequestResult {
Response(String),
Error(Error),
}
RetryMiddleware
支持传入一个判断是否重试的函数,当然别忘了,new
方法也要更改:
pub struct RetryMiddleware {
pub config: RetryConfig,
pub predicate: Option<RetryPredicate>,
}
pub fn new(config: RetryConfig) -> Self {
Self { config, predicate: None, }
}
然后,新增一个方法 withPredicate
,可以让调用者传入这个函数:
pub fn with_retry_predicate<F>(mut self, predicate: F) -> Self
where
F: Fn(&RequestResult) -> bool + Send + Sync + 'static,
{
self.predicate = Some(Box::new(move |result| predicate(result)));
self
}
接着,新增一个方法来判断是否重试:
async fn should_retry(&self, result: &RequestResult) -> bool {
if let Some(predicate) = &self.predicate {
return predicate(result);
}
// 如果没有指定重试的逻辑,则判断是否为服务端错误、超时、连接错误,来决定是否重试
match result {
RequestResult::Response(_response) => false,
RequestResult::Error(err) => {
if let Some(status) = err.status() {
status.is_server_error()
} else {
err.is_timeout() || err.is_connect()
}
}
}
最后,改造 execute
的逻辑,支持调用者自定义重试:
pub async fn execute(&self, client: &Client, mut request: Request) -> Result<String, Error> {
let mut attempt: u32 = 0;
loop {
let req_clone = request.try_clone().expect("Failed to clone request");
let result = match client.execute(request).await {
Ok(response) => RequestResult::Response(response.text().await?),
Err(err) => RequestResult::Error(err),
};
if !self.should_retry(&result).await || attempt >= self.config.max_retries {
return match result {
RequestResult::Response(response) => Ok(response),
RequestResult::Error(err) => Err(err),
};
}
attempt += 1;
tokio::time::sleep(Duration::from_millis(self.config.retry_interval)).await;
// 准备下一次重试
request = req_clone;
}
}
然后自定义重试逻辑,测试如下:
let client = Client::new();
let request = client.get("https://rust.demo.testing.icu").build().unwrap();
let result = RetryMiddleware::default().
with_retry_predicate(|result| match result {
// 如果不包含 `Hello` 则重试
RequestResult::Response(response) => !response.contains("Hello"),
RequestResult::Error(_) => true,
}).
execute(&client, request).await;
指数退避
上面只是重试的一种策略,按照固定时间重试,比如 1 秒、2秒、3秒。但是在实际生产中,更倾向于使用指数退避(Exponential Backoff) 策略来降低服务器的压力,比如 0.5秒、1.5秒、3.5秒……直到最大尝试次数之后,每次重试都会呈指数增长。
常规公式如下: Tbase * 2^n
,n
表示重试的次数,Tbase
表示一个常数,比如 1 秒,第一次就是 1 秒、第二次是 2 秒、第三次是 4 秒、第四次是 8 秒。我们来实现这个逻辑。
1. 实现基础的指数退避
首先对 RetryConfig
增加指数退避相关的配置项:
#[derive(Debug, Clone)]
pub struct RetryConfig {
pub max_retries: u32,
pub retry_interval: u64,
// 增加两个配置项
pub use_exponential_backoff: bool, // 是否使用指数退避策略
pub backoff_factor: f64, // 指数退避的基数,公式中的 Tbase
}
然后增加一个方法,来计算每次重试的时间间隔:
fn calculate_wait_time(&self, attempts: u32) -> Duration {
if self.config.use_exponential_backoff {
// 计算指数
let factor = self.config.backoff_factor.powi(attempts as i32);
// 基数乘指数
let wait_ms = (self.config.retry_interval as f64 * factor) as u64;
Duration::from_millis(wait_ms)
} else {
// 不使用指数退避,采用固定的时间
Duration::from_millis(self.config.retry_interval)
}
}
然后修改 execute()
方法,每次重试的等待时间通过上面这个方法来计算:
pub async fn execute(&self, client: &Client, mut request: Request) -> Result<String, Error> {
// ......没有变更的代码省略, 修改下面两行代码
let wait_time = self.calculate_wait_time(attempt);
tokio::time::sleep(wait_time).await
// 准备下一次重试
request = req_clone;
}
2. 惊群效应
如果按照上面的实现,对于客户端来说并没有太大问题,但是对于服务端来说可能会出现惊群效应(Thundering Herd Problem) 。所谓惊群效应指的是,当服务端出现网络波动、过载等问题的时候,多个客户端同时发起重试,进一步增加了服务端的压力,资源竞争加剧、网络更加堵塞。
为了预防出现惊群效应,在指数退避的基础上,每次重试增加一个随机数,可以有效预防或者缓解,修改如下:
use rand::Rng;
// 计算指数
let factor = self.config.backoff_factor.powi(attempts as i32);
// 基数乘指数
let wait_ms = (self.config.retry_interval as f64 * factor) as u64;
// 计算抖动值,例如 [-50%, +50%] 的抖动
let jitter_range = (self.config.retry_interval / 2) as i64;
let mut rng = rand::rng();
let jitter = rng.random_range(-jitter_range..=jitter_range); // 生成随机抖动值
// 重试的等待时间加入抖动值,并确保等待的时间不小于 0
let wait_ms = (self.config.retry_interval as i64 + jitter).max(0) as u64;
Duration::from_millis(wait_ms)
总结
这篇文章介绍了如何用 Rust 封装一个强大且灵活的 HTTP 客户端,重点在于实现请求失败后的重试机制,包括固定间隔重试、自定义重试逻辑,以及更高效的指数退避策略(Exponential Backoff)。此外,为避免惊群效应,还在指数退避中加入了随机抖动(Jitter),以分散重试请求的时间,降低服务端压力。
本作品采用《CC 协议》,转载必须注明作者和本文链接
推荐文章: