redis应用系列二:异步消息队列:生产/消费模式实现及优化
[TOC]
面试中关于redis中经常会被如何实现异步队列?以及存在什么问题,怎么改进,鉴于次今天进行异步队列实现和优化
说明:
- 异步消息队列是什么?
- 异步消息队列能解决什么问题?
- 什么时候用?
- 什么地方用?
以上问题请参考 消息队列
基于list实现的生产/消费模式队列,在应用中使用场景最为广泛,以下是具体的常见实现过程以及分析
How
生产/消费模式有三个基本的元素
- 生产者(producer):用于组装消息,并将组装的消息通过lpush(左进)的方式压入队列,供消费者消费此消息
- 队列:用于承载消息的载体,在队列里面,被生产者压入的消息按照一定顺序进行存放,保证有序性
- 消费者(consumer):用于将队列里面的消息按照
先进先出
(此处采用rpop
)的原则弹出队列容器中
生产者实现
/**
* 准备处理消息key
*/
static $todoKey = 'TO_SEND_MSG';
/**
* 处理中消息key
*/
static $doingKey = 'SENDING_MSG';
/**
* 无限制阻塞
*/
static $timeout = 0;
/**
* 生产
*/
public function producer()
{
$userARr = [
'user1@mail1.com',
'user2@mail2.com',
'user3@mail3.com',
];
Redis::lpush(self::$todoKey, $userARr);
}
生产者producer用于生产队列,利用redis的list结构中
lpush
从左向队列中压入(可以批量)需要处理的消息,此消息在list中按照顺序进行存放,入队相对简单,至此完成了队列入队。
有了队列,怎么去消费这个队列数据呢?这就是下面的消费触发方式
如何消费
消费触发方式 | 特点 |
---|---|
死循环方式读取 | 易实现,故障时无法及时恢复(比较适合做秒杀,比较集中,运维集中维护) |
定时任务 | 压力均分,有处理上限;(需要合理设置时间间隔,不要等上一个任务没有完成下一个任务又开始了) |
守护进程 | 类似于php-fpm 和php-cg,需要shell基础,实现成本较高 |
鉴于此我们采用死循环的方式进行消费,具体消费过程设计如下对比
消费者
示例1
/**
* 消费
*/
public function consumer1()
{
/**
* 循环执行消费
*/
while (true) {
$msg = Redis::rpop(self::$todoKey);
if ($msg) {
// TODO 发送消息业务
}
}
}
这种实现方式也是很多人会采用的实现方式,但是存在问题:
- 当待处理的消息
self::$todoKey
为空的情况下:消费服务将会密集的向redis服务重复执行’rpop’命令,将造成资源严重浪费,对系统性能有严重折损,因此并不可取- 改进:可以考虑当消息为空的时候,进行
sleep
代码如下:
示例2
/**
* 消费
*/
public function consumer2()
{
/**
* 循环执行消费
*/
while (true) {
$msg = Redis::rpop(self::$todoKey);
if ($msg) {
// TODO 发送消息业务
} else {
// 等待10s
sleep(10);
}
}
}
加入sleep后,解决了队列为空数据时候造成的服务端资源浪费问题,但是又引入新问题:
- 如果有消息入队的情况下,程序阻塞在sleep处,程序将会无法及时响应消息,消息及时性会打折扣(排除业务允许延迟的的情况),到此一般业务已经能够满足,但是对于追求性能的业务场景并不尽人意,需要继续探索,请看下面示例:
示例3
/**
* 消费
*/
public function consumer3()
{
/**
* 循环执行消费
*/
while (true) {
// timeout=0 无限制阻塞式消费
$msg = Redis::brpop(self::$todoKey, self::$timeout);
if ($msg) {
// TODO 发送消息业务
}
}
}
幸好redis中只需要一个命令:
brpop
搞定上面问题,brpop
在队列有数据的时候进行出队操作,在队列没有数据的时候进行阻塞等待,知道队列中有数据,看起来到此一切完美,大功告成的样子,但是作为程序设计的我们还得考虑极端情况:
- 如果消费服务端出现异常,由于服务端没有进行备份,那么将会出现消息丢失情况,这种情况的解决请看下面示例
示例4
/**
* 消费
*/
public function consumer4()
{
while (true) {
do {
// 循环阻阻塞执行,消费端取到消息的同时,原子性的把该消息放入一个正在处理中的$doingKey列表(进行备份)
$msg = Redis::brpoplpush(self::$todoKey, self::$doingKey, self::$timeout);
try {
// TODO 发送消息业务
// 业务未出现异常,处理完业务后,则从正在处理的list中删除当前处理的消息,直到处理中list消息为空
Redis::lrem(self::$doingKey, 1, $msg);
} catch (\Exception $e) {
// TODO 异常
/**
* 业务出现异常,处理异常,
* 通常处理异常有两种方法
* 1.将异常消息重新放到待消费$todoKey的list中
* 2.单独处理异常消息(如发邮件通知,人工处理)
* 3.单独起一个脚本程序,处理长时间存在于处理中$doingKey->list消息
*/
}
} while (Redis::llen(self::$todoKey) || Redis::llen(self::$doingKey));
}
/**
* 到此时,队列已经满足了安全性和性能(高可用)要求
* 但是,对业务的处理上面此种方法并不完美
* 为了尽可能地完善,还需要写一个服务端定时脚本
* 此脚本用于监测和处理长时间存在于处理中$doingKey->list消息
*/
}
上面代码中利用
brpoplpush
在消费端取到消息的同时原子性的把该消息放入一个正在处理中的$doingKey列表(进行备份):
- 如果处理业务没有出现异常,那么业务处理完成后从正在处理中的$doingKey列表
lrem
删除当前已经处理ok的消息- 但是如果处理消息业务出现异常,不用慌,我们已经在处理中的$doingKey列表中备份了异常消息,对于异常消息处理,得根据具体业务具体实现:常见有两种:见上面说明。
到此:我们可以说已经兼顾了队列的安全性,高效性,但是感觉在处理异常上面增加了复杂度,那么有没有更简单的实现方法呢?见下面示例:
示例5
/**
* 消费
*/
public function consumer5()
{
/**
* 利用旋转列表功能
* 重置处理中消息list的key 和 待处理消息list为同一个key
*/
while (true) {
self::$doingKey = self::$todoKey;
do {
// 循环阻阻塞执行,消费端取到消息的同时,原子性的把该消息放入一个正在处理中的列表(进行备份)
// 此处由于处理中和待处理list为同一个,利用brpoplpush旋转列表
$msg = Redis::brpoplpush(self::$todoKey, self::$doingKey, self::$timeout);
try {
// TODO 发送消息业务
// 业务未出现异常,处理完业务后,则删除当前处理的消息,直到处理中list消息为空
Redis::lrem(self::$doingKey, 1, $msg);
} catch (\Exception $e) {
// 业务出现异常,继续循环,直到
}
} while (Redis::llen(self::$todoKey));
}
}
此段代码跟示例4唯一的不同就是:
self::$doingKey = self::$todoKey;
- 1.利用list实现旋转列表功能,在同一个队列中,从尾部出队的同时,从头部入队,如果没有异常则删除头部入队消息,如果出现异常,那么一直循环处理,当然这种处理有局限性:
A.勿删的可能性
B.如果一个异常消息一直得不到正确处理,就会一直占用资源
c.此种方法慎用
综合考虑:示例4就是一种最优解
但是:萝卜白菜各有所爱,不同服务业务场景选择不同实现,我们就是得搞清楚他们是萝卜还是白菜
本作品采用《CC 协议》,转载必须注明作者和本文链接
redis的发布订阅生产环境有用过吗 发布订阅可以一对多
@yzbfeng 这个实战还没有过,正在计划研究学习发布订阅模式(一对多)和延迟队列,敬请指导哈 :pray:
延迟队列用有序集合就可以做。
@郎中航 是的,这是延迟队列队列的实现,请指导