redis应用系列三:延迟消息队列正确实现姿势

[TOC]
延迟队列和异步队列说明

why

  • 1、有效期:限时活动、拼团…
  • 2、超时处理:取消超时未支付订单、超时自动确认收货…
  • 3、延迟处理:机器人点赞/观看数/评论/关注、等待依赖条件…
  • 4、重试:网络异常重试、打车派单、依赖条件未满足重试…
  • 5、定时任务:智能设备定时启动…

what

  • 延迟
  • 队列(其实不能称为队列,更贴切的应该是不重复的元素集)

when

  • 延迟任务

where

  • 数据量大
  • 实时性要求高

who

  • 时间轮
  • RabbitMQ死性队列
  • 基于redis的zset实现

How feel

项目 比较
复杂性 redis < RabbitMQ < 时间轮
安全性 redis < RabbitMQ <= 时间轮
性能 redis > 时间轮 > RabbitMQ

鉴于redis实现延迟队列是一种理解起来较为直观,可以快速落地的方案,使用 Redis 集群来支持高并发和高可用,并且可以依赖redis自身持久化可以实现安全性保证,是一种不错的延迟队列的实现方案,以下实现方案研究均基于redis

how

  • 基于key的过期监听

    实现思路:
    1.起一个线程,开启对任务key(如:未支付订单单号)的watch;
    2.一旦key过期(订单未支付超时时间),触发event进行业务逻辑处理(取消订单)。
    注:这种方法处理起来简单,但是在数据量较大的情况下,这种方案对性能影响较大

  • 基于zset集合

    实现思路:
    1.利用zadd向集合中插入元素,以元素(订单)的时间戳(超时时间)作为score
    2.利用zrangebyscore以【0-当前时间戳】进行获取需要处理的元素(即为超时待处理订单)
    注:这种方法相较于上面方法,实现较为复杂,但是对性能保证较好,因此是应用最广泛的延迟队列实现方案

下面是基于redis的zset实现延迟队列的具体示例

生产者

    /**
     * 生产
     */
    public function producer()
    {
        $orderMap = [
            'orderNo.1' => time() + 10,//订单1需要10s之后进行消费处理
            'orderNo.2' => time() + 100,//订单2需要100s之后进行消费处理
            'orderNo.3' => time() + 1000,//订单3需要1000s之后进行消费处理
        ];
        Redis::connection()->zadd(self::$delayKey, $orderMap);
    }

生产者:

  1. 利用zadd,以延迟时间戳作为score,元素唯一标志(订单号不重复)作为value,向需要处理的key中插入元素即可

注意:因为是集合,因此需要保证集合元素的唯一性(score可以相同)

完成生产者,那么如何消费呢?常见的消费有以下几种

消费者

示例1

    /**
     * 消费
     */
    public function consumer1()
    {
        /**
         * 循环执行消费
         */
        while (true) {
            $msg = Redis::connection()->zrangebyscore(self::$delayKey, 0, time(), ['limit' => ['offset' => 0, 'count' => 1]]);
            if ($msg) {
                // 删除当前处理消息
                if (Redis::connection()->zrem(self::$delayKey, $msg)) {
                    // TODO 业务处理并返回处理结果
                }
            }
        }
    }

示例1中:
1.利用zrangebyscore按照score(此处为元素对应超时时间戳)进行获取满足条件的集合元素(超时待取消订单);
2.当有满足的条件的元素(超时待取消订单),先删除该元素(保证不被其他进程取到),再进行业务逻辑处理;
3.此处为了考虑性能,建议每次只处理1(count=1)条消息数据,offset=0,

思考:
在常规处理中,该实现已经能够满足业务需求,

  • 但是这种实现有没有什么问题?
    when延迟队列满足条件的元素为空(或者集合为空)时候:
    进程会频繁不断向redis服务获取满足条件元素,这样会造成redis服务资源占用和浪费,
  • 怎么办呢?
    可以在没有取到满足的条件时候让程序阻塞一段时间unsleep(100000);
    这种方法实际上就是用时间换取资源,注意控制阻塞时间长短,不宜太短,也不宜太长(影响即时性),完整代码见示例2

示例2

    /**
     * 消费
     */
    public function consumer2()
    {
        /**
         * 循环执行消费
         */
        while (true) {
            $msg = Redis::connection()->zrangebyscore(self::$delayKey, 0, time() + 1500, ['limit' => ['offset' => 0, 'count' => 1]]);
            if ($msg) {
                // 删除当前处理消息
                if (Redis::connection()->zrem(self::$delayKey, $msg)) {
                    // TODO 业务处理并返回处理结果
                }

            } else {
                /**
                 * 阻塞1s,减少redis服务资源浪费
                 */
                usleep(100000);
            }
        }
    }

示例2:
思考:
这种实现看起来已经完全没有问题(单例部署架构中)

  • 在分布式架构中呢?
    有可能出现zrangebyscorezrem非同一个客户端的问题,即原子性问题
  • 怎么办呢?
    采用lua脚本解决,见示例3

示例3


    /**
     * 消费
     * 没有备份机制的lua脚本
     */
    public function consumer3()
    {
        $luaScript = <<<EOF
local delayKey = KEYS[1]
local start = ARGV[1]
local endTime = ARGV[2]
local limitNum = ARGV[3]
local result = redis.call('zrangebyscore',delayKey,start,endTime,'limit',0, limitNum)

if next(result) ~= nil  
then 
    local res = redis.call('zrem',delayKey,unpack(result))
    if res > 0
    then
        return result
    end 
else
    return {}
end
EOF;
        /**
         * 循环执行消费
         */
        while (true) {
            $startTime = 0;
            $endTime   = time();
            $limit     = 1;
            /**
             * 为了防止并发,保证原子性操作
             * 利用lua脚本获取满足条件的消息,并删除满足条件消息
             * 返回满足条件的消息
             */
            $msg = Redis::connection()->eval($luaScript, 1, self::$delayKey, $startTime, $endTime + 9999999, $limit);
            if ($msg) {
                // TODO 业务处理并返回处理结果
            } else {
                /**
                 * 阻塞1s,减少redis服务资源浪费
                 */
                usleep(100000);
            }
        }
    }

总结:方案3中解决上面存在的问题,是一种最优解

本作品采用《CC 协议》,转载必须注明作者和本文链接
《L01 基础入门》
我们将带你从零开发一个项目并部署到线上,本课程教授 Web 开发中专业、实用的技能,如 Git 工作流、Laravel Mix 前端工作流等。
《L02 从零构建论坛系统》
以构建论坛项目 LaraBBS 为线索,展开对 Laravel 框架的全面学习。应用程序架构思路贴近 Laravel 框架的设计哲学。
讨论数量: 7
GDDD

有人和我一样无聊3秒之内看完了吗?既然求简单,在创建订单后,直接搞个订单的队列任务去处理,然后延迟分发15分钟,15分钟后还能通知用户支付,然后任务再延迟15分钟,最后再取消,这样不是很舒服?

2年前 评论

@GDDD 哈哈,关于你的问题,我说两点 1.这篇博文我花了不止3个小时才完成 2.此处你说的是延迟队列的在laravel里面使用,业务实现你这么干完全没有毛病,而我本文说的的是延迟队列实现思路

2年前 评论

你这个 zset 和 lua 跟我最近写的一个差不多,哈哈哈哈

2年前 评论

@largezhou 看来我长进了 :stuck_out_tongue_winking_eye:,有链接吗?我去膜拜一下

2年前 评论
largezhou 2年前

unsleep(100000);这个是不是多写了个n,应该是usleep() :grin:

1年前 评论
笨小孩 (楼主) 1年前

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!