月半谈(二)基于 Redis 的限流器-简单计数器
说到限流器,大家可能脑袋中浮现的就是三种方案,计数器-基于redis zset
结构的滑动窗口,思路类似但实现相反的漏斗与令牌桶 算法。
我先说需求,就是实现类似于微信公众号每日2000的获取 access_token
的调用限制,无需统计与清空~
当然这种需求,第一反应就是 加个计时器,到 2000 的时候我就不让调用了,基于这种朴质的观念,就有了以下的简单实现
class TestLimit
{
private $redis;
public function __construct()
{
$this->redis = new Redis();
$this->redis->connect("192.168.0.111", 16379, 5, null, 3, 5);
}
function testTimesLimit(string $user, string $resource, int $maxCount)
{
$key = "{$user}:{$resource}";
$expiredTime = strtotime(date('Y-m-d 23:59:59')) - time();
$expiredTime = $expiredTime > 0 ? $expiredTime : 1;
// 不存在时去设置
if($this->redis->set($key, 1, ['NX', 'EX' => $expiredTime])){
return true;
}
$limitTimes = (int)$this->redis->get($key);
if ($limitTimes < $maxCount) {
$this->redis->incr($key);
return true;
}
return false;
}
}
$obj = new TestLimit();
$success = 0;
$err = 0;
// 单进程测试
for ($i = 0; $i < 20; $i++) {
$obj->TestTimesLimit("test_man", "test_resource", 10) ? $success++ : $err++;
}
echo "success{$success} times; fail {$err} times" . PHP_EOL;
看上去好像没问题对吧,但是看过我上篇文章的朋友都知道,这样写在并发请求下绝对会超出限制,但是,你用 单进程测不出来,我们开多进程试试?
// 多进程测试
if (extension_loaded("pcntl")) {
$pids = [];// 父级进程 Id
// 创建 20个进程,同时去 跑并发测试
for ($i = 0; $i < 20; $i++) {
$pid = pcntl_fork();
if ($pid === -1) {
echo "failed to fork!" . PHP_EOL;
exit;
}
if ($pid) {
$pids[] = $pid;
} else {
$obj = new TestLimit();
$success = 0;
$err = 0;
// 子进程测试
for ($t = 0; $t < 20; $t++) {
$obj->TestTimesLimit("test_man", "test_resource", 150) ? $success++ : $err++;
}
$pid = posix_getpid();
echo microtime(true) . "cid {$pid} " . "success {$success} times; fail {$err} times" . PHP_EOL;
exit(); // 执行完要结束,不然就会走进创建子进程的死循环
}
}
foreach ($pids as $pid) {
pcntl_waitpid($pid, $status);// 等子进程测试完毕
}
}
根据上图的显示 我们 花了大约230毫秒 跑完了 整个测试,结果就是 150的 限制 被轻易突破了。
并发那就加🔒吧
调整过后的代码如下
class TestLimit
{
private $redis;
public function __construct()
{
$this->redis = new Redis();
$this->redis->connect("192.168.0.111", 16379, 5, null, 3, 5);
}
function testTimesLimit(string $user, string $resource, int $maxCount)
{
$key = "{$user}:{$resource}";
$expiredTime = strtotime(date('Y-m-d 23:59:59')) - time();
$expiredTime = $expiredTime > 0 ? $expiredTime : 1;
// 不存在时去设置
if ($this->redis->set($key, 1, ['NX', 'EX' => $expiredTime])) {
return true;
}
$lockKey = "testTimesLimit";
$lockUniqueValue = time() . mt_rand(100000, 999999);
try {
if ($this->lock($lockKey, $lockUniqueValue,3,100)) {
$limitTimes = (int)$this->redis->get($key);
if ($limitTimes < $maxCount) {
$this->redis->incr($key);
return true;
}
return false;
}
return false;
} catch (\Throwable $throwable) {
// log exception info ...
return false;
} finally {
$this->unLock($lockKey, $lockUniqueValue);
}
}
/**
* redis 锁
*
* @param $key
* @param $uniqueValue
* @param int $times 尝试获取🔒的次数
* @param int $time 每多少ms去获取一次🔒
* @return bool
*/
function lock($key, $uniqueValue, $times = 3, $time = 100): bool
{
while ($times > 0) {
if ($this->redis->set($key, $uniqueValue, ['NX', 'EX' => 10])) {
return true;
}
$times--;
usleep(1000 * $time); // $time ms 后继续 尝试获取锁
}
return false;
}
/**
* redis 解锁
*
* @param $key
* @param $uniqueValue
* @return bool
*/
function unLock($key, $uniqueValue): bool
{
$script = 'if redis.call("get",KEYS[1]) == ARGV[1]
then
return redis.call("del",KEYS[1])
else
return 0
end';
return $this->redis->eval($script, [$key, $uniqueValue], 1) ? true:false;
}
}
加锁之后,我们再进行尝试,
虽然确实是没有超出限制,但是这个效率 1480ms 差距差不多有10倍,而且后续的进程因为重试时间获取到🔒的几率也很不均衡,很显然这是因为🔒的尝试次数跟尝试时间设置不合理导致的,当我们将尝试次数设置为1,尝试时间设置为30ms 时候 耗时320ms ,效率对比之前 已经有了很大的提升。
以上就是关于计数器更为简易版本的实现,当然简易也不简单,控制加锁之后的尝试时间跟尝试次数可以将突发流量转成平滑流量,🔒保证了数据精确度的需要。对时间上进行调整,其实我们这个也可以设置成滑动窗口的模式,可能粒度上会更大一些,对比与zset
每次都要插入与删除不在窗口内的数据,这些也都能接受。
当然我们今天不详细的对比另外三种,下次碰到有需要的时候,再进行对比。因为拖了一天,本周再写一篇新的文章,内容大概是关于 trace 的设计~
这是月半谈的第二篇,定时更新确实有点难,有收获的话,请👍 支持下吧~
本作品采用《CC 协议》,转载必须注明作者和本文链接
推荐文章: