redis应用系列三:延迟消息队列正确实现姿势 
                                                    
                        
                    
                    
  
                    
                    [TOC]
延迟队列和异步队列说明
why
- 1、有效期:限时活动、拼团…
 - 2、超时处理:取消超时未支付订单、超时自动确认收货…
 - 3、延迟处理:机器人点赞/观看数/评论/关注、等待依赖条件…
 - 4、重试:网络异常重试、打车派单、依赖条件未满足重试…
 - 5、定时任务:智能设备定时启动…
 
what
- 延迟
 - 队列(其实不能称为队列,更贴切的应该是不重复的元素集)
 
when
- 延迟任务
 
where
- 数据量大
 - 实时性要求高
 
who
- 时间轮
 - RabbitMQ死性队列
 - 基于redis的zset实现
 
How feel
| 项目 | 比较 | 
|---|---|
| 复杂性 | redis < RabbitMQ < 时间轮 | 
| 安全性 | redis < RabbitMQ <= 时间轮 | 
| 性能 | redis > 时间轮 > RabbitMQ | 
鉴于redis实现延迟队列是一种理解起来较为直观,可以快速落地的方案,使用 Redis 集群来支持高并发和高可用,并且可以依赖redis自身持久化可以实现安全性保证,是一种不错的延迟队列的实现方案,以下实现方案研究均基于redis
how
基于key的过期监听
实现思路:
1.起一个线程,开启对任务key(如:未支付订单单号)的watch;
2.一旦key过期(订单未支付超时时间),触发event进行业务逻辑处理(取消订单)。
注:这种方法处理起来简单,但是在数据量较大的情况下,这种方案对性能影响较大基于zset集合
实现思路:
1.利用zadd向集合中插入元素,以元素(订单)的时间戳(超时时间)作为score
2.利用zrangebyscore以【0-当前时间戳】进行获取需要处理的元素(即为超时待处理订单)
注:这种方法相较于上面方法,实现较为复杂,但是对性能保证较好,因此是应用最广泛的延迟队列实现方案
下面是基于redis的zset实现延迟队列的具体示例
生产者
    /**
     * 生产
     */
    public function producer()
    {
        $orderMap = [
            'orderNo.1' => time() + 10,//订单1需要10s之后进行消费处理
            'orderNo.2' => time() + 100,//订单2需要100s之后进行消费处理
            'orderNo.3' => time() + 1000,//订单3需要1000s之后进行消费处理
        ];
        Redis::connection()->zadd(self::$delayKey, $orderMap);
    }
生产者:
- 利用
 zadd,以延迟时间戳作为score,元素唯一标志(订单号不重复)作为value,向需要处理的key中插入元素即可注意:因为是集合,因此需要保证集合元素的唯一性(score可以相同)
完成生产者,那么如何消费呢?常见的消费有以下几种
消费者
示例1
    /**
     * 消费
     */
    public function consumer1()
    {
        /**
         * 循环执行消费
         */
        while (true) {
            $msg = Redis::connection()->zrangebyscore(self::$delayKey, 0, time(), ['limit' => ['offset' => 0, 'count' => 1]]);
            if ($msg) {
                // 删除当前处理消息
                if (Redis::connection()->zrem(self::$delayKey, $msg)) {
                    // TODO 业务处理并返回处理结果
                }
            }
        }
    }
示例1中:
1.利用zrangebyscore按照score(此处为元素对应超时时间戳)进行获取满足条件的集合元素(超时待取消订单);
2.当有满足的条件的元素(超时待取消订单),先删除该元素(保证不被其他进程取到),再进行业务逻辑处理;
3.此处为了考虑性能,建议每次只处理1(count=1)条消息数据,offset=0,
思考:
在常规处理中,该实现已经能够满足业务需求,
- 但是这种实现有没有什么问题?
 
when延迟队列满足条件的元素为空(或者集合为空)时候:
进程会频繁不断向redis服务获取满足条件元素,这样会造成redis服务资源占用和浪费,- 怎么办呢?
 
可以在没有取到满足的条件时候让程序阻塞一段时间unsleep(100000);
这种方法实际上就是用时间换取资源,注意控制阻塞时间长短,不宜太短,也不宜太长(影响即时性),完整代码见示例2
示例2
    /**
     * 消费
     */
    public function consumer2()
    {
        /**
         * 循环执行消费
         */
        while (true) {
            $msg = Redis::connection()->zrangebyscore(self::$delayKey, 0, time() + 1500, ['limit' => ['offset' => 0, 'count' => 1]]);
            if ($msg) {
                // 删除当前处理消息
                if (Redis::connection()->zrem(self::$delayKey, $msg)) {
                    // TODO 业务处理并返回处理结果
                }
            } else {
                /**
                 * 阻塞1s,减少redis服务资源浪费
                 */
                usleep(100000);
            }
        }
    }
示例2:
思考:
这种实现看起来已经完全没有问题(单例部署架构中)
- 在分布式架构中呢?
 
有可能出现zrangebyscore和zrem非同一个客户端的问题,即原子性问题- 怎么办呢?
 
采用lua脚本解决,见示例3
示例3
    /**
     * 消费
     * 没有备份机制的lua脚本
     */
    public function consumer3()
    {
        $luaScript = <<<EOF
local delayKey = KEYS[1]
local start = ARGV[1]
local endTime = ARGV[2]
local limitNum = ARGV[3]
local result = redis.call('zrangebyscore',delayKey,start,endTime,'limit',0, limitNum)
if next(result) ~= nil  
then 
    local res = redis.call('zrem',delayKey,unpack(result))
    if res > 0
    then
        return result
    end 
else
    return {}
end
EOF;
        /**
         * 循环执行消费
         */
        while (true) {
            $startTime = 0;
            $endTime   = time();
            $limit     = 1;
            /**
             * 为了防止并发,保证原子性操作
             * 利用lua脚本获取满足条件的消息,并删除满足条件消息
             * 返回满足条件的消息
             */
            $msg = Redis::connection()->eval($luaScript, 1, self::$delayKey, $startTime, $endTime + 9999999, $limit);
            if ($msg) {
                // TODO 业务处理并返回处理结果
            } else {
                /**
                 * 阻塞1s,减少redis服务资源浪费
                 */
                usleep(100000);
            }
        }
    }
总结:方案3中解决上面存在的问题,是一种最优解
本作品采用《CC 协议》,转载必须注明作者和本文链接
          
                    
                    
          
          
                关于 LearnKu
              
                    
                    
                    
 
推荐文章: