Laravel 之队列使用浅析

最近有朋友有朋友问laravel队列的实现原理和经验,刚好用过所以整理了一下分享给大家。

laravel队列配置参见:http://learnku.com/docs/laravel/5.1/queues

原理分析

创建分发任务方法

class TestController extends Controller
{
    //其他方法

    //发送消息
    public function SendMessage(Request $request){
        ...
        $this->dispatch((new SendMessage($sendParams))->onQueue('snail:SendMessage'));
    }
}

内部实现:
这里写图片描述

创建消费任务

命令行运行如下命令:

/home/niuyufu/php/bin/php /home/niuyufu/webroot/snail_api/artisan queue:work --queue=snail:SendMessage --tries=3 --memory=512 --daemon

内部实现:
这里写图片描述

队列消息分析:

监控redis对应队列消息,具体产生的消息操作,如下:

tail -f | redis-cli -h 10.94.120.13 -p 6380 monitor | grep "queues:snail"

1492446053.406282 [0 10.95.117.155:57132] "WATCH" "queues:snail:SendMessage:delayed"
1492446053.406452 [0 10.95.117.155:57132] "ZRANGEBYSCORE" "queues:snail:SendMessage:delayed" "-inf" "1492446053"
1492446053.406754 [0 10.95.117.155:57132] "WATCH" "queues:snail:SendMessage:reserved"
1492446053.406842 [0 10.95.117.155:57132] "ZRANGEBYSCORE" "queues:snail:SendMessage:reserved" "-inf" "1492446053"
1492446053.407029 [0 10.95.117.155:57132] "LPOP" "queues:snail:SendMessage"
1492446053.407700 [0 10.95.117.155:57132] "ZADD" "queues:snail:SendMessage:reserved" "1492446113" "{job}"
1492446053.463953 [0 10.95.117.155:57132] "ZREM" "queues:snail:SendMessage:reserved" "{job}"

PS:如果你的redis是codis的话,注意了,因为codis禁用方法列表

KEYS, MOVE, OBJECT, RENAME, RENAMENX, SORT, SCAN, BITOP,MSETNX, BLPOP, BRPOP, BRPOPLPUSH, PSUBSCRIBE,PUBLISH, PUNSUBSCRIBE, SUBSCRIBE, UNSUBSCRIBE, DISCARD, EXEC, MULTI, UNWATCH, WATCH, SCRIPT EXISTS, SCRIPT FLUSH, SCRIPT KILL, SCRIPT LOAD, AUTH, ECHO, SELECT, BGREWRITEAOF, BGSAVE, CLIENT KILL, CLIENT LIST, CONFIG GET, CONFIG SET, CONFIG RESETSTAT, DBSIZE, DEBUG OBJECT, DEBUG SEGFAULT, FLUSHALL, FLUSHDB, INFO, LASTSAVE, MONITOR, SAVE, SHUTDOWN, SLAVEOF, SLOWLOG, SYNC, TIME

所以执行消费任务会有以下错误:

[Predis\Connection\ConnectionException]                               
Error while reading line from the server. [tcp://100.90.154.39:3000] 

解决方法为修改config/queue.php

'redis' => [
    'driver' => 'redis',
    'connection' => 'default',
    'queue' => 'default',
    'expire' => null,   //禁用即可
],

优化日志处理:

如果你的系统有切割文件日志操作。会发现虽然日志被切分了,但程序却没有往新文件里写入。如,2017-04-18 13:50启动的项目,日志会一直打到 snail.log.2017041813上。

改进方案:
app/Jobs/Job.php文件中添加如下方法:

public function releaseLoggerFile(){
    $handles=\Log::getMonolog()->getHandlers();
    if(!is_array($handles) || empty($handles)){
        return;
    }
    foreach($handles as $handle){
        if(method_exists($handle, "close")){
            $handle->close();
        }
    }
    return;
}

在具体job实现类中的handle方法结尾添加:

public function handle()
{
    ...
    $this->releaseLoggerFile(); //释放log文件
}

线上部署

创建任务shell

#!/bin/sh
day=$(date +'%y%m%d')
now=$(date +"%F %T")
php_command="/home/niuyufu/php/bin/php"
command="/home/niuyufu/webroot/snail_api/artisan"
log="/home/niuyufu/webroot/log/wave"
queue_name_arr=("snail:SendMessage" "snail:SendMessageToApp")

case $1 in
"run")
    for queue_name in ${queue_name_arr[*]}
    do
        count=$(ps aux | grep artisan |grep queue=${queue_name} | grep -v grep | wc -l)
        if [ ${count} -lt 1 ];then
            echo "${now}:${queue_name} process has exit ${count}\n";
            nohup $php_command $command queue:work --queue=${queue_name} --tries=3 --memory=512 --daemon >> $log/queueMonitor.log 2>&1 &
        else
            echo "${now}:${queue_name} process is runing";
        fi
    done
    ;;

"stop")
    kill -9 $(ps -ef | grep "queue:work" | grep -v grep | awk '{print $2}' | tr -s '\n' ' ')
    echo ${now}."Queue process all stop";
    ;;
"list")
    ps -ef | grep "queue:work" | grep -v grep
    ;;
*)
    echo "
Usage: QueueMonitorCommandShell.sh [run|stop|list]
    "
    ;;
esac

总结:

laravel这边的延迟队列使用了三个队列。

queue:default:delayed // 存储延迟任务
queue:default // 存储“生”任务,就是未处理任务
queue:default:reserved // 存储待处理任务
任务在三个队列中进行轮转,最后一定进入到queue:default:reserved,并且成功后把任务从这个队列中删除。

laravel5.1 使用了watch来控制队列的原子操作,但由于codis本身不支持 watch 方法。所以使用codis不能完全体验队列功能:延迟队列不支持、不支持数据重跑,对线上数据比较严格操作谨慎使用。

laravel5.3 之后redis队列 开始使用lua脚本支持的队列原子操作,它没有使用 watch multi等操作,所以如果线上codis 支持lua的话,可以完整体验到队列功能。

参考文档:
博客:Laravel 的消息队列剖析
http://laravelacademy.org/post/2012.html

本作品采用《CC 协议》,转载必须注明作者和本文链接
本帖由系统于 6年前 自动加精
《L04 微信小程序从零到发布》
从小程序个人账户申请开始,带你一步步进行开发一个微信小程序,直到提交微信控制台上线发布。
《L03 构架 API 服务器》
你将学到如 RESTFul 设计风格、PostMan 的使用、OAuth 流程,JWT 概念及使用 和 API 开发相关的进阶知识。
讨论数量: 4
Gecco

有两个问题,烦请:
1、在某处看到这个任务队列 最大同时处理数在5000+,如果多开worker,机器配置够,为什么会有这种说法(redis驱动)?
2、有大量延时任务推送到队列,并不立即执行,延时任务产生速度大于消费速度,堆积过多任务,是否存在问题(不同队列驱动)?

6年前 评论

@Gecco
1、5000并发处理已经很高了,这除了跟redis、机器配置之外还与db写入、磁盘io瓶颈等有关。
2、可以设置个最大队列数检测,超过直接抛出错误。虽然是队列,不可能让你无限制的写入的

6年前 评论

一直以为日志没有按照配置文件切割是因为supervisor....
这就去试一下文里的方法
谢谢🙏

5年前 评论

@Gecco 我测试过, 一次写入2万条队列 , 之后等这2万执行完, 时间过了很久了, 这样的异步好现太慢了。

4年前 评论

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!