redis应用系列二:异步消息队列:生产/消费模式实现及优化

[TOC]
面试中关于redis中经常会被如何实现异步队列?以及存在什么问题,怎么改进,鉴于次今天进行异步队列实现和优化
说明:

  • 异步消息队列是什么?
  • 异步消息队列能解决什么问题?
  • 什么时候用?
  • 什么地方用?

以上问题请参考 消息队列
基于list实现的生产/消费模式队列,在应用中使用场景最为广泛,以下是具体的常见实现过程以及分析

How

redis应用系列二:异步消息队列安全性实现和过程分析

生产/消费模式有三个基本的元素

  • 生产者(producer):用于组装消息,并将组装的消息通过lpush(左进)的方式压入队列,供消费者消费此消息
  • 队列:用于承载消息的载体,在队列里面,被生产者压入的消息按照一定顺序进行存放,保证有序性
  • 消费者(consumer):用于将队列里面的消息按照先进先出(此处采用rpop)的原则弹出队列容器中

生产者实现

    /**
     * 准备处理消息key
     */
    static $todoKey = 'TO_SEND_MSG';
    /**
     * 处理中消息key
     */
    static $doingKey = 'SENDING_MSG';
    /**
     * 无限制阻塞
     */
    static $timeout = 0;

    /**
     * 生产
     */
    public function producer()
    {

        $userARr = [
            'user1@mail1.com',
            'user2@mail2.com',
            'user3@mail3.com',
        ];
        Redis::lpush(self::$todoKey, $userARr);
    }

生产者producer用于生产队列,利用redis的list结构中lpush从左向队列中压入(可以批量)需要处理的消息,此消息在list中按照顺序进行存放,入队相对简单,至此完成了队列入队。
有了队列,怎么去消费这个队列数据呢?这就是下面的消费触发方式

如何消费

消费触发方式 特点
死循环方式读取 易实现,故障时无法及时恢复(比较适合做秒杀,比较集中,运维集中维护)
定时任务 压力均分,有处理上限;(需要合理设置时间间隔,不要等上一个任务没有完成下一个任务又开始了)
守护进程 类似于php-fpm 和php-cg,需要shell基础,实现成本较高

鉴于此我们采用死循环的方式进行消费,具体消费过程设计如下对比

消费者

示例1

   /**
     * 消费
     */
    public function consumer1()
    {
        /**
         * 循环执行消费
         */
        while (true) {
            $msg = Redis::rpop(self::$todoKey);
            if ($msg) {
                // TODO 发送消息业务
            }
        }
    }

这种实现方式也是很多人会采用的实现方式,但是存在问题:

  • 当待处理的消息self::$todoKey为空的情况下:消费服务将会密集的向redis服务重复执行’rpop’命令,将造成资源严重浪费,对系统性能有严重折损,因此并不可取
  • 改进:可以考虑当消息为空的时候,进行sleep代码如下:

示例2

/**
     * 消费
     */
    public function consumer2()
    {
        /**
         * 循环执行消费
         */
        while (true) {
            $msg = Redis::rpop(self::$todoKey);
            if ($msg) {
                // TODO 发送消息业务
            } else {
                // 等待10s
                sleep(10);
            }
        }
    }

加入sleep后,解决了队列为空数据时候造成的服务端资源浪费问题,但是又引入新问题:

  • 如果有消息入队的情况下,程序阻塞在sleep处,程序将会无法及时响应消息,消息及时性会打折扣(排除业务允许延迟的的情况),到此一般业务已经能够满足,但是对于追求性能的业务场景并不尽人意,需要继续探索,请看下面示例:

示例3

    /**
     * 消费
     */
    public function consumer3()
    {
        /**
         * 循环执行消费
         */
        while (true) {
            // timeout=0 无限制阻塞式消费
            $msg = Redis::brpop(self::$todoKey, self::$timeout);
            if ($msg) {
                // TODO 发送消息业务
            }
        }
    }

幸好redis中只需要一个命令:brpop搞定上面问题,brpop在队列有数据的时候进行出队操作,在队列没有数据的时候进行阻塞等待,知道队列中有数据,看起来到此一切完美,大功告成的样子,但是作为程序设计的我们还得考虑极端情况:

  • 如果消费服务端出现异常,由于服务端没有进行备份,那么将会出现消息丢失情况,这种情况的解决请看下面示例

示例4

    /**
     * 消费
     */
    public function consumer4()
    {
        while (true) {
            do {
                // 循环阻阻塞执行,消费端取到消息的同时,原子性的把该消息放入一个正在处理中的$doingKey列表(进行备份)
                $msg = Redis::brpoplpush(self::$todoKey, self::$doingKey, self::$timeout);
                try {
                    // TODO 发送消息业务
                    // 业务未出现异常,处理完业务后,则从正在处理的list中删除当前处理的消息,直到处理中list消息为空
                    Redis::lrem(self::$doingKey, 1, $msg);
                } catch (\Exception $e) {
                    // TODO 异常
                    /**
                     * 业务出现异常,处理异常,
                     * 通常处理异常有两种方法
                     * 1.将异常消息重新放到待消费$todoKey的list中
                     * 2.单独处理异常消息(如发邮件通知,人工处理)
                     * 3.单独起一个脚本程序,处理长时间存在于处理中$doingKey->list消息
                     */

                }
            } while (Redis::llen(self::$todoKey) || Redis::llen(self::$doingKey));
        }
        /**
         * 到此时,队列已经满足了安全性和性能(高可用)要求
         * 但是,对业务的处理上面此种方法并不完美
         * 为了尽可能地完善,还需要写一个服务端定时脚本
         * 此脚本用于监测和处理长时间存在于处理中$doingKey->list消息
         */
    }

上面代码中利用brpoplpush在消费端取到消息的同时原子性的把该消息放入一个正在处理中的$doingKey列表(进行备份):

  • 如果处理业务没有出现异常,那么业务处理完成后从正在处理中的$doingKey列表lrem删除当前已经处理ok的消息
  • 但是如果处理消息业务出现异常,不用慌,我们已经在处理中的$doingKey列表中备份了异常消息,对于异常消息处理,得根据具体业务具体实现:常见有两种:见上面说明。

到此:我们可以说已经兼顾了队列的安全性,高效性,但是感觉在处理异常上面增加了复杂度,那么有没有更简单的实现方法呢?见下面示例:

示例5

    /**
     * 消费
     */
    public function consumer5()
    {
        /**
         * 利用旋转列表功能
         * 重置处理中消息list的key 和 待处理消息list为同一个key
         */
        while (true) {
            self::$doingKey = self::$todoKey;
            do {
                // 循环阻阻塞执行,消费端取到消息的同时,原子性的把该消息放入一个正在处理中的列表(进行备份)
                // 此处由于处理中和待处理list为同一个,利用brpoplpush旋转列表
                $msg = Redis::brpoplpush(self::$todoKey, self::$doingKey, self::$timeout);
                try {
                    // TODO 发送消息业务
                    // 业务未出现异常,处理完业务后,则删除当前处理的消息,直到处理中list消息为空
                    Redis::lrem(self::$doingKey, 1, $msg);
                } catch (\Exception $e) {
                    // 业务出现异常,继续循环,直到
                }
            } while (Redis::llen(self::$todoKey));
        }

    }

此段代码跟示例4唯一的不同就是:self::$doingKey = self::$todoKey;

  • 1.利用list实现旋转列表功能,在同一个队列中,从尾部出队的同时,从头部入队,如果没有异常则删除头部入队消息,如果出现异常,那么一直循环处理,当然这种处理有局限性:
    A.勿删的可能性
    B.如果一个异常消息一直得不到正确处理,就会一直占用资源
    c.此种方法慎用

综合考虑:示例4就是一种最优解
但是:萝卜白菜各有所爱,不同服务业务场景选择不同实现,我们就是得搞清楚他们是萝卜还是白菜

本作品采用《CC 协议》,转载必须注明作者和本文链接
本帖由系统于 1年前 自动加精
《L01 基础入门》
我们将带你从零开发一个项目并部署到线上,本课程教授 Web 开发中专业、实用的技能,如 Git 工作流、Laravel Mix 前端工作流等。
《G01 Go 实战入门》
从零开始带你一步步开发一个 Go 博客项目,让你在最短的时间内学会使用 Go 进行编码。项目结构很大程度上参考了 Laravel。
讨论数量: 4

redis的发布订阅生产环境有用过吗 发布订阅可以一对多

2年前 评论

@yzbfeng 这个实战还没有过,正在计划研究学习发布订阅模式(一对多)和延迟队列,敬请指导哈 :pray:

2年前 评论

延迟队列用有序集合就可以做。

2年前 评论

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!