Redis 应用-异步消息队列与延时队列

系列文章

异步消息队列#

说道消息队列,你肯定会想到 KafkaRabbitmq 等消息中间件,这些专业的消息中间件提供了很多功能特性,当然他的部署使用维护都是比较麻烦的。如果你对消息队列没那么高要求,想要轻量级的,使用 Redis 就没错啦。

Redis 通过 list 数据结构来实现消息队列。主要使用到如下命令:

  • lpush 和 rpush 入队列
  • lpop 和 rpop 出队列
  • blpop 和 brpop 阻塞式出队列

image

废话补不多说上代码:

$redis = new Redis();
$redis->connect('127.0.0.1', 6379);

//发送消息
$redis->lPush($list, $value);

//消费消息
while (true) {
    try {
        $msg = $redis->rPop($list);
        if (!$msg) {
            sleep(1);
        }
        //业务处理

    } catch (Exception $e) {
        echo $e->getMessage();
    }
}

上面代码会有个问题,如果队列长时间是空的,那 rpop 操作就会不断的循环执行,这样会导致 Redis 的 QPS 升高,影响性能。所以我们使用 sleep 来解决,当没有消息的时候阻塞一段时间。但其实这样还会带来另一个问题,就是 sleep 会导致消息的处理延迟增加。这个问题我们可以通过 blpop/brpop 来阻塞读取队列。

blpop/brpop 在队列没有数据的时候,会立即进入休眠状态,一旦数据到来,则立刻醒过来。消息的延迟几乎为零。用 blpop/brpop 替代前面的 lpop/rpop,就完美解决了上面的问题。

还有一个需要注意的点是我们需要是用 try/catch 来进行异常捕获,如果一直阻塞在那里,Redis 服务器一般会主动断开掉空链接,来减少闲置资源的占用。

延迟队列#

你是否在做电商项目的时候会遇到如下场景:

  • 订单下单后超过一小时用户未支付,需要关闭订单
  • 订单的评论如果 7 天未评价,系统需要自动产生一条评论

这个时候我们就需要用到延时队列了,顾名思义就是需要延迟一段时间后执行。Redis 可通过 zset 来实现。我们可以将有序集合的 value 设置为我们的消息任务,把 value 的 score 设置为消息的到期时间,然后轮询获取有序集合的中的到期消息进行处理。

实现代码如下:

$redis = new Redis();
$redis->connect('127.0.0.1', 6379);

$redis->zAdd($delayQueue,$tts, $value);

while(true) {
    try{
        $msg = $redis->zRangeByScore($delayQueue,0,time(),0,1);
        if(!$msg){
            continue;
        }
        //删除消息
        $ok = $redis.zrem($delayQueue,$msg);
        if($ok){
            //业务处理
        }
    } catch(\Exception $e) {

    }
}

这里又产生了一个问题,同一个任务可能会被多个进程取到之后再使用 zrem 进行争抢,那些没抢到的进程都是白取了一次任务,这是浪费。解决办法:将 zrangebyscorezrem 使用 lua 脚本进行原子化操作,这样多个进程之间争抢任务时就不会出现这种浪费了。

本文亦在微信公众号【小道资讯】发布,欢迎扫码关注!
image

本作品采用《CC 协议》,转载必须注明作者和本文链接
本帖由系统于 5年前 自动加精
《L03 构架 API 服务器》
你将学到如 RESTFul 设计风格、PostMan 的使用、OAuth 流程,JWT 概念及使用 和 API 开发相关的进阶知识。
《L01 基础入门》
我们将带你从零开发一个项目并部署到线上,本课程教授 Web 开发中专业、实用的技能,如 Git 工作流、Laravel Mix 前端工作流等。
讨论数量: 10

这样吧

 if(!$msg){
            continue;
        }
6年前 评论
Gundy (楼主) 6年前

消费失败如何处理?

5年前 评论

@yankewei 这个就看你具体业务需求了,是重试 还是记录

5年前 评论
  • 上面代码会有个问题如果队列长时间是空的,那 pop 就不会不断的循环,这样会导致 redis 的 QPS 升高,影响性能。

这句话好像不太通顺 :joy:

5年前 评论

学习 受教, 内容非常棒。鼓掌.................

5年前 评论

$len = $redis->llen (' 队列名称 ');
if ( $len <= 0) {
break;
}

5年前 评论

$ok = $redis.zrem ($delayQueue,$msg); 不应该是点吧,应该是这样吧 $ok = $redis->zrem ($delayQueue,$msg);

4年前 评论

$redis->lPush ($list, $value); 列如发送消息中的 $list

$redis->zAdd ($delayQueue,$tts, $value); $delayQueue 这两个需要先定义吗??这个应该是局部变量还是全局呢??

3年前 评论