redis应用系列三:延迟消息队列正确实现姿势
[TOC]
延迟队列和异步队列说明
why
- 1、有效期:限时活动、拼团…
- 2、超时处理:取消超时未支付订单、超时自动确认收货…
- 3、延迟处理:机器人点赞/观看数/评论/关注、等待依赖条件…
- 4、重试:网络异常重试、打车派单、依赖条件未满足重试…
- 5、定时任务:智能设备定时启动…
what
- 延迟
- 队列(其实不能称为队列,更贴切的应该是不重复的元素集)
when
- 延迟任务
where
- 数据量大
- 实时性要求高
who
- 时间轮
- RabbitMQ死性队列
- 基于redis的zset实现
How feel
项目 | 比较 |
---|---|
复杂性 | redis < RabbitMQ < 时间轮 |
安全性 | redis < RabbitMQ <= 时间轮 |
性能 | redis > 时间轮 > RabbitMQ |
鉴于redis实现延迟队列是一种理解起来较为直观,可以快速落地的方案,使用 Redis 集群来支持高并发和高可用,并且可以依赖redis自身持久化可以实现安全性保证,是一种不错的延迟队列的实现方案,以下实现方案研究均基于redis
how
基于key的过期监听
实现思路:
1.起一个线程,开启对任务key(如:未支付订单单号)的watch
;
2.一旦key过期(订单未支付超时时间),触发event
进行业务逻辑处理(取消订单)。
注:这种方法处理起来简单,但是在数据量较大的情况下,这种方案对性能影响较大基于zset集合
实现思路:
1.利用zadd
向集合中插入元素,以元素(订单)的时间戳(超时时间)作为score
2.利用zrangebyscore
以【0-当前时间戳】进行获取需要处理的元素(即为超时待处理订单)
注:这种方法相较于上面方法,实现较为复杂,但是对性能保证较好,因此是应用最广泛的延迟队列实现方案
下面是基于redis的zset实现延迟队列的具体示例
生产者
/**
* 生产
*/
public function producer()
{
$orderMap = [
'orderNo.1' => time() + 10,//订单1需要10s之后进行消费处理
'orderNo.2' => time() + 100,//订单2需要100s之后进行消费处理
'orderNo.3' => time() + 1000,//订单3需要1000s之后进行消费处理
];
Redis::connection()->zadd(self::$delayKey, $orderMap);
}
生产者:
- 利用
zadd
,以延迟时间戳作为score,元素唯一标志(订单号不重复)作为value,向需要处理的key中插入元素即可注意:因为是集合,因此需要保证集合元素的唯一性(score可以相同)
完成生产者,那么如何消费呢?常见的消费有以下几种
消费者
示例1
/**
* 消费
*/
public function consumer1()
{
/**
* 循环执行消费
*/
while (true) {
$msg = Redis::connection()->zrangebyscore(self::$delayKey, 0, time(), ['limit' => ['offset' => 0, 'count' => 1]]);
if ($msg) {
// 删除当前处理消息
if (Redis::connection()->zrem(self::$delayKey, $msg)) {
// TODO 业务处理并返回处理结果
}
}
}
}
示例1中:
1.利用zrangebyscore
按照score(此处为元素对应超时时间戳)进行获取满足条件的集合元素(超时待取消订单);
2.当有满足的条件的元素(超时待取消订单),先删除该元素(保证不被其他进程取到),再进行业务逻辑处理;
3.此处为了考虑性能,建议每次只处理1(count=1)条消息数据,offset=0,
思考:
在常规处理中,该实现已经能够满足业务需求,
- 但是这种实现有没有什么问题?
when延迟队列满足条件的元素为空(或者集合为空)时候:
进程会频繁不断向redis服务获取满足条件元素,这样会造成redis服务资源占用和浪费,- 怎么办呢?
可以在没有取到满足的条件时候让程序阻塞一段时间unsleep(100000);
这种方法实际上就是用时间换取资源,注意控制阻塞时间长短,不宜太短,也不宜太长(影响即时性),完整代码见示例2
示例2
/**
* 消费
*/
public function consumer2()
{
/**
* 循环执行消费
*/
while (true) {
$msg = Redis::connection()->zrangebyscore(self::$delayKey, 0, time() + 1500, ['limit' => ['offset' => 0, 'count' => 1]]);
if ($msg) {
// 删除当前处理消息
if (Redis::connection()->zrem(self::$delayKey, $msg)) {
// TODO 业务处理并返回处理结果
}
} else {
/**
* 阻塞1s,减少redis服务资源浪费
*/
usleep(100000);
}
}
}
示例2:
思考:
这种实现看起来已经完全没有问题(单例部署架构中)
- 在分布式架构中呢?
有可能出现zrangebyscore
和zrem
非同一个客户端的问题,即原子性问题- 怎么办呢?
采用lua脚本解决,见示例3
示例3
/**
* 消费
* 没有备份机制的lua脚本
*/
public function consumer3()
{
$luaScript = <<<EOF
local delayKey = KEYS[1]
local start = ARGV[1]
local endTime = ARGV[2]
local limitNum = ARGV[3]
local result = redis.call('zrangebyscore',delayKey,start,endTime,'limit',0, limitNum)
if next(result) ~= nil
then
local res = redis.call('zrem',delayKey,unpack(result))
if res > 0
then
return result
end
else
return {}
end
EOF;
/**
* 循环执行消费
*/
while (true) {
$startTime = 0;
$endTime = time();
$limit = 1;
/**
* 为了防止并发,保证原子性操作
* 利用lua脚本获取满足条件的消息,并删除满足条件消息
* 返回满足条件的消息
*/
$msg = Redis::connection()->eval($luaScript, 1, self::$delayKey, $startTime, $endTime + 9999999, $limit);
if ($msg) {
// TODO 业务处理并返回处理结果
} else {
/**
* 阻塞1s,减少redis服务资源浪费
*/
usleep(100000);
}
}
}
总结:方案3中解决上面存在的问题,是一种最优解
本作品采用《CC 协议》,转载必须注明作者和本文链接
推荐文章: