月半谈(二)基于 Redis 的限流器-简单计数器

说到限流器,大家可能脑袋中浮现的就是三种方案,计数器-基于redis zset 结构的滑动窗口,思路类似但实现相反的漏斗与令牌桶 算法。

我先说需求,就是实现类似于微信公众号每日2000的获取 access_token 的调用限制,无需统计与清空~

当然这种需求,第一反应就是 加个计时器,到 2000 的时候我就不让调用了,基于这种朴质的观念,就有了以下的简单实现

class TestLimit
{
    private $redis;

    public function __construct()
    {
        $this->redis = new Redis();
        $this->redis->connect("192.168.0.111", 16379, 5, null, 3, 5);
    }

    function testTimesLimit(string $user, string $resource, int $maxCount)
    {
        $key = "{$user}:{$resource}";

        $expiredTime = strtotime(date('Y-m-d 23:59:59')) - time();
        $expiredTime = $expiredTime > 0 ? $expiredTime : 1;

        // 不存在时去设置
        if($this->redis->set($key, 1, ['NX', 'EX' => $expiredTime])){
            return true;
        }

        $limitTimes = (int)$this->redis->get($key);
        if ($limitTimes < $maxCount) {
            $this->redis->incr($key);
            return true;
        }
        return false;
    }
}

$obj = new  TestLimit();

$success = 0;
$err     = 0;
// 单进程测试
for ($i = 0; $i < 20; $i++) {
    $obj->TestTimesLimit("test_man", "test_resource", 10) ? $success++ : $err++;
}
echo "success{$success} times; fail {$err} times" . PHP_EOL;

看上去好像没问题对吧,但是看过我上篇文章的朋友都知道,这样写在并发请求下绝对会超出限制,但是,你用 单进程测不出来,我们开多进程试试?

// 多进程测试
if (extension_loaded("pcntl")) {

    $pids = [];// 父级进程 Id
    // 创建 20个进程,同时去 跑并发测试
    for ($i = 0; $i < 20; $i++) {

        $pid = pcntl_fork();

        if ($pid === -1) {
            echo "failed to fork!" . PHP_EOL;
            exit;
        }

        if ($pid) {
            $pids[] = $pid;
        } else {
            $obj     = new  TestLimit();
            $success = 0;
            $err     = 0;
            // 子进程测试
            for ($t = 0; $t < 20; $t++) {
                $obj->TestTimesLimit("test_man", "test_resource", 150) ? $success++ : $err++;
            }
            $pid = posix_getpid();
            echo microtime(true) . "cid {$pid} " . "success {$success} times; fail {$err} times" . PHP_EOL;

            exit(); // 执行完要结束,不然就会走进创建子进程的死循环
        }
    }

    foreach ($pids as $pid) {
        pcntl_waitpid($pid, $status);// 等子进程测试完毕
    }

}

根据上图的显示 我们 花了大约230毫秒 跑完了 整个测试,结果就是 150的 限制 被轻易突破了。

并发那就加🔒吧

调整过后的代码如下

class TestLimit
{
    private $redis;

    public function __construct()
    {
        $this->redis = new Redis();
        $this->redis->connect("192.168.0.111", 16379, 5, null, 3, 5);
    }

    function testTimesLimit(string $user, string $resource, int $maxCount)
    {
        $key = "{$user}:{$resource}";

        $expiredTime = strtotime(date('Y-m-d 23:59:59')) - time();
        $expiredTime = $expiredTime > 0 ? $expiredTime : 1;

        // 不存在时去设置
        if ($this->redis->set($key, 1, ['NX', 'EX' => $expiredTime])) {
            return true;
        }

        $lockKey         = "testTimesLimit";
        $lockUniqueValue = time() . mt_rand(100000, 999999);
        try {
            if ($this->lock($lockKey, $lockUniqueValue,3,100)) {
                $limitTimes = (int)$this->redis->get($key);
                if ($limitTimes < $maxCount) {
                    $this->redis->incr($key);
                    return true;
                }
                return false;
            }
            return false;
        } catch (\Throwable $throwable) {
//             log exception info ...
            return false;
        } finally {
            $this->unLock($lockKey, $lockUniqueValue);
        }
    }

    /**
     * redis 锁
     *
     * @param $key
     * @param $uniqueValue
     * @param int $times    尝试获取🔒的次数
     * @param int $time     每多少ms去获取一次🔒
     * @return bool
     */
    function lock($key, $uniqueValue, $times = 3, $time = 100): bool
    {
        while ($times > 0) {
            if ($this->redis->set($key, $uniqueValue, ['NX', 'EX' => 10])) {
                return true;
            }
            $times--;
            usleep(1000 * $time); // $time ms 后继续 尝试获取锁
        }
        return false;
    }

    /**
     * redis 解锁
     *
     * @param $key
     * @param $uniqueValue
     * @return bool
     */
    function unLock($key, $uniqueValue): bool
    {
        $script = 'if redis.call("get",KEYS[1]) == ARGV[1]
                  then
                      return redis.call("del",KEYS[1])
                  else
                      return 0
                  end';
        return $this->redis->eval($script, [$key, $uniqueValue], 1) ? true:false;
    }

}

加锁之后,我们再进行尝试,

虽然确实是没有超出限制,但是这个效率 1480ms 差距差不多有10倍,而且后续的进程因为重试时间获取到🔒的几率也很不均衡,很显然这是因为🔒的尝试次数跟尝试时间设置不合理导致的,当我们将尝试次数设置为1,尝试时间设置为30ms 时候 耗时320ms ,效率对比之前 已经有了很大的提升。

以上就是关于计数器更为简易版本的实现,当然简易也不简单,控制加锁之后的尝试时间跟尝试次数可以将突发流量转成平滑流量,🔒保证了数据精确度的需要。对时间上进行调整,其实我们这个也可以设置成滑动窗口的模式,可能粒度上会更大一些,对比与zset 每次都要插入与删除不在窗口内的数据,这些也都能接受。

当然我们今天不详细的对比另外三种,下次碰到有需要的时候,再进行对比。因为拖了一天,本周再写一篇新的文章,内容大概是关于 trace 的设计~

这是月半谈的第二篇,定时更新确实有点难,有收获的话,请👍 支持下吧~

本作品采用《CC 协议》,转载必须注明作者和本文链接
《L03 构架 API 服务器》
你将学到如 RESTFul 设计风格、PostMan 的使用、OAuth 流程,JWT 概念及使用 和 API 开发相关的进阶知识。
《G01 Go 实战入门》
从零开始带你一步步开发一个 Go 博客项目,让你在最短的时间内学会使用 Go 进行编码。项目结构很大程度上参考了 Laravel。
讨论数量: 3
$this->redis->incr($key); //直接用key自增,如果并发场景下,刚好过期,可能会导致incr 的key 过期时间变成-1。
3年前 评论
snakelis (作者) 3年前
李铭昕 3年前
李铭昕 3年前
李铭昕

不用加锁啊。。。

        $limitTimes = (int)$this->redis->get($key);
        if ($limitTimes < $maxCount) {
            $this->redis->incr($key);
            return true;
        }

上述代码改一下。。。

        if ($this->redis->incr($key) <= $maxCount) {
            return true;
        }

这样还少一次 get 调用。。

3年前 评论
漫天风雨下西楼 (楼主) 3年前

先不谈用不用加锁,这锁的设计就很不正常,使用sleep去尝试获取锁的都是不正确的姿势,1不平滑2没性能,因为这种设计没有高性能可谈,又怎么用在高并发去?

3年前 评论
漫天风雨下西楼 (楼主) 3年前

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!