用 Rust 封装 HTTP Client

HTTP 是日常开发中,应用最广泛的协议。即使是服务端开发,也已经要使用 HTTP 协议和第三方进行交互,比如微信公众号、支付、回调等场景。

但是大部分时候,我们只是简单的封装一个 HTTP Client,然后发送请求、完成功能而已,并没有考虑封装一个更为全面、健壮的 Client,比如为其加入重试、指数退避等能力。

这篇文章描述了如何使用 Rust 封装一个强大、易用的 Http Client。

从 Hello World 开始

我最近在筹划用 Rust 来写一个微服务版本的 Hello World,目前有一个 demo。我们就来 Rust 来请求它吧。

1. 安装依赖

在开始之前,首先需要安装实验相关的依赖, tokio 和 reqwest:

[dependencies]
tokio = { version = "1.44.2", features = ["full"] }
reqwest = { version = "0.12.15", features = ["json"] }

关于 tokio 是 Rust 中一个异步运行时,而 reqwest 则是一个强大的 Http Client。本文的目标是封装一个 Http Client,但不意味着要造轮子复刻一个 reqwest,而是说基于他进行二次封装。

2. 使用 reqwest 发送请求

在日常的工作中,我们可能只是简单地发送一个请求。牛马般的开发就像高速公路的建造一样,追求里程数,质量可以靠边了。

我们先从一个简单的例子开始,然后逐步扩展能力。一个基础的发送请求的实例如下:

#[tokio::main]
async fn main() {
    let result = reqwest::get("https://rust.demo.testing.icu").await;
    match result {
        Ok(response) => {
            println!("{}", response.text().await.unwrap());
        },
        Err(error) => {
            println!("{}", error.to_string());
        }
    }
}

如果我的服务正常的话,会输出Hello World ,你也可以创建自己的 Mock 服务来做测试,这并不是一件困难的事情。

reqwest 这个库的使用非常简单,但是你有没有考虑过,如果因为网络或其他问题导致请求失败,是否需要重试呢?

重试失败的请求

当我们和第三方通过 HTTP 进行交互的时候,请求失败是常有的事情。有时候是因为对方服务异常、有时候是因为网路波动、有时候可能是因为近期太阳黑子活动剧烈,导致电子设备元件不可靠……这时候,我们可以采取对失败的请求进行重试,说不定再一次请求就成功了呢。

reqwest 是一个轻量级的 HTTP Client, 当中并不包含失败重试的功能。需要我们自行封装一个中间件(Middleware)来处理重试。

1. 定义 Middleware

我们来定义一个中间件,它只有一个字段 Config:

pub struct RetryMiddleware {
    pub config: RetryConfig,
}

2. 定义中间件的配置

RetryConfig 支持间隔固定时间的进行重试:

#[derive(Debug, Clone)]
pub struct RetryConfig {
    pub max_retries: u32,
    pub retry_interval: u64, 
}

为了构建方便,实现 Default trait:

impl Default for RetryConfig {
    fn default() -> Self {
        Self {
            max_retries: 3,
            retry_interval: 1000,
        }
    }
}

这里的配置的含义是,最多重试 3 次,每次固定间隔 1000 毫秒。

3. 实现 Middleware 发送请求

我们先抛开重试这件事情,先完成使用它来发送一个正常的请求:

impl RetryMiddleware {
    pub fn new(config: RetryConfig) -> Self {
        Self { config }
    }
    pub fn default() -> Self {
        Self::new(RetryConfig::default())
    }
    pub async fn execute(&self, client: &Client, request: Request) -> Result<String, Error> {
        match client.execute(request).await {
            Ok(response) => Ok(response.text().await?),
            Err(err) => Err(err),
        }
    }
}

RetryMiddleware 接收了从调用者传递的 client , 然后调用 client.execute() 方法。虽然,到目前为止,这个封装没有什么意义,也暂时没有实现重试的能力,但是为之后打下基础。先来测试一下,是否能正常请求:

use reqwest::{Client, Error, Request};
​
#[tokio::main]
async fn main() {
    let client = Client::new();
    let request = client.get("https://rust.demo.testing.icu").build().unwrap();
    let result = RetryMiddleware::default().execute(&client, request).await;
    match result {
        Ok(response) => {
            println!("{}", response);
        }
        Err(error) => {
            println!("{}", error.to_string());
        }
    }
}

4. 实现重试机制

接下来就要实现重试机制了,如果请求失败,则按照固定的间隔和最大重试次数进行重试,超过最大重试次数则返回错误:

pub async fn execute(&self, client: &Client, mut request: Request) -> Result<String, Error> {
  let mut attempt: u32 = 0;
  loop {
    let req_clone = request.try_clone().expect("Failed to clone request");
    let result = client.execute(request).await;
    if result.is_ok() {
      return Ok(result.unwrap().text().await?);
    }
    if attempt >= self.config.max_retries {
      return Err(result.unwrap_err());
    }
    attempt += 1;
    tokio::time::sleep(Duration::from_millis(self.config.retry_interval)).await;
    // 准备下一次重试
    request = req_clone;
}

通过 loop 来进行重试,每次检查是否返回错误,如果返回错误则进行重试。但是,你有没有想到,是实际的生产中,是否重试的判断可能没有那么简单,除了服务端返回的 HTTP 状态码外,还返回自定义的状态码,或者具体的业务值来综合判断是否需要重试。

所以,是否重试这个逻辑判断按道理应该交给调用者来决定,而不是服务自己拍脑子决定。

5. 自定义重试逻辑

接下来,我们来完成自定义重试的逻辑。调用房可以传入一个匿名函数并返回一个布尔值,来决定是否重试。函数类型的定义如下:

pub type RetryPredicate = Box<dyn Fn(&RequestResult) -> bool + Send + Sync>;

这个函数接收一个 RequestResult 类型,表示请求的结果。所以我们需要对之前的代码稍加改造:

// 创建一个枚举类,表示请求结果
pub enum RequestResult {
    Response(String),
    Error(Error),
}

RetryMiddleware 支持传入一个判断是否重试的函数,当然别忘了,new 方法也要更改:

pub struct RetryMiddleware {
    pub config: RetryConfig,
    pub predicate: Option<RetryPredicate>,
}
pub fn new(config: RetryConfig) -> Self {
    Self { config, predicate: None, }
}

然后,新增一个方法 withPredicate ,可以让调用者传入这个函数:

pub fn with_retry_predicate<F>(mut self, predicate: F) -> Self
where
  F: Fn(&RequestResult) -> bool + Send + Sync + 'static,
{
    self.predicate = Some(Box::new(move |result| predicate(result)));
    self
}

接着,新增一个方法来判断是否重试:

async fn should_retry(&self, result: &RequestResult) -> bool {
  if let Some(predicate) = &self.predicate {
    return predicate(result);
  }
  // 如果没有指定重试的逻辑,则判断是否为服务端错误、超时、连接错误,来决定是否重试
  match result {
    RequestResult::Response(_response) => false,
    RequestResult::Error(err) => {
      if let Some(status) = err.status() {
        status.is_server_error()
      } else {
        err.is_timeout() || err.is_connect()
      }
  }
}

最后,改造 execute 的逻辑,支持调用者自定义重试:

pub async fn execute(&self, client: &Client, mut request: Request) -> Result<String, Error> {
    let mut attempt: u32 = 0;
​
    loop {
        let req_clone = request.try_clone().expect("Failed to clone request");
        let result = match client.execute(request).await {
            Ok(response) => RequestResult::Response(response.text().await?),
            Err(err) => RequestResult::Error(err),
        };if !self.should_retry(&result).await || attempt >= self.config.max_retries {
            return match result {
                RequestResult::Response(response) => Ok(response),
                RequestResult::Error(err) => Err(err),
            };
        }
​
        attempt += 1;
        tokio::time::sleep(Duration::from_millis(self.config.retry_interval)).await;
        // 准备下一次重试
        request = req_clone;
    }
}

然后自定义重试逻辑,测试如下:

let client = Client::new();
let request = client.get("https://rust.demo.testing.icu").build().unwrap();
let result = RetryMiddleware::default().
    with_retry_predicate(|result| match result {
        // 如果不包含 `Hello` 则重试
        RequestResult::Response(response) => !response.contains("Hello"),
        RequestResult::Error(_) => true,
    }).
    execute(&client, request).await;

指数退避

上面只是重试的一种策略,按照固定时间重试,比如 1 秒、2秒、3秒。但是在实际生产中,更倾向于使用指数退避(Exponential Backoff) 策略来降低服务器的压力,比如 0.5秒、1.5秒、3.5秒……直到最大尝试次数之后,每次重试都会呈指数增长。

常规公式如下: Tbase * 2^nn 表示重试的次数,Tbase 表示一个常数,比如 1 秒,第一次就是 1 秒、第二次是 2 秒、第三次是 4 秒、第四次是 8 秒。我们来实现这个逻辑。

1. 实现基础的指数退避

首先对 RetryConfig 增加指数退避相关的配置项:

#[derive(Debug, Clone)]
pub struct RetryConfig {
    pub max_retries: u32,
    pub retry_interval: u64,
    // 增加两个配置项
    pub use_exponential_backoff: bool, // 是否使用指数退避策略
    pub backoff_factor: f64, // 指数退避的基数,公式中的 Tbase
}

然后增加一个方法,来计算每次重试的时间间隔:

fn calculate_wait_time(&self, attempts: u32) -> Duration {
    if self.config.use_exponential_backoff {
        // 计算指数
        let factor = self.config.backoff_factor.powi(attempts as i32);
        // 基数乘指数
        let wait_ms = (self.config.retry_interval as f64 * factor) as u64;
        Duration::from_millis(wait_ms)
    } else {
        // 不使用指数退避,采用固定的时间
        Duration::from_millis(self.config.retry_interval)
    }
}

然后修改 execute() 方法,每次重试的等待时间通过上面这个方法来计算:

pub async fn execute(&self, client: &Client, mut request: Request) -> Result<String, Error> {
    // ......没有变更的代码省略, 修改下面两行代码
    let wait_time = self.calculate_wait_time(attempt);
    tokio::time::sleep(wait_time).await
    // 准备下一次重试
    request = req_clone;
}

2. 惊群效应

如果按照上面的实现,对于客户端来说并没有太大问题,但是对于服务端来说可能会出现惊群效应(Thundering Herd Problem) 。所谓惊群效应指的是,当服务端出现网络波动、过载等问题的时候,多个客户端同时发起重试,进一步增加了服务端的压力,资源竞争加剧、网络更加堵塞。

为了预防出现惊群效应,在指数退避的基础上,每次重试增加一个随机数,可以有效预防或者缓解,修改如下:

use rand::Rng;// 计算指数
let factor = self.config.backoff_factor.powi(attempts as i32);
// 基数乘指数
let wait_ms = (self.config.retry_interval as f64 * factor) as u64;
// 计算抖动值,例如 [-50%, +50%] 的抖动
let jitter_range = (self.config.retry_interval / 2) as i64;
let mut rng = rand::rng();
let jitter = rng.random_range(-jitter_range..=jitter_range); // 生成随机抖动值
// 重试的等待时间加入抖动值,并确保等待的时间不小于 0
let wait_ms = (self.config.retry_interval as i64 + jitter).max(0) as u64;
Duration::from_millis(wait_ms)

总结

这篇文章介绍了如何用 Rust 封装一个强大且灵活的 HTTP 客户端,重点在于实现请求失败后的重试机制,包括固定间隔重试、自定义重试逻辑,以及更高效的指数退避策略(Exponential Backoff)。此外,为避免惊群效应,还在指数退避中加入了随机抖动(Jitter),以分散重试请求的时间,降低服务端压力。

本作品采用《CC 协议》,转载必须注明作者和本文链接
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!