使用 Laravel Queue 不得不明白的知识

file

背景

首先说一下我写这篇文章的初衷,在我们打算使用 Laravel Queue 的时候,你的首选应该是去看文档,但是无奈 Laravel 的文档很多地方写得太简单,有时候想了解一个深入的问题,不得不去看源码,但是看源码确实费一些时间。

所以我打算写一篇文章,把我在使用 Laravel Queue 过程中的方方面面都写一下,方便新手学习、老司机温习。

因为 Redis Queue 是比较简单也很常用的一种队列,所以以下内容我都基于 Redis Queue。

为什么使用队列?

虽然这个问题不是今天文章的重点,但是我还要说一下,一般来说使用队列是为了:

  1. 异步
  2. 重试

也许你还有其他的理由使用队列,但是这应该是最基本的两个原因。

什么情况使用队列?

了解了为什么使用队列,那么一般有这么几类任务使用队列:

  1. 耗时比较久的,比如上传一个文件后进行一些格式的转化等。
  2. 需要保证送达率的,比如发送短信,因为要调用别人的 api,总会有几率失败,那么为了保证送达,重试就必不可少了。

使用队列的时候一定要想明白一个问题,这个任务到底是不是可以异步,如果因为异步会导致问题,那么就要放弃使用队列。

一些小技巧

  1. 在开发环境我们想测试的时候,可以把 Queue driver 设置成为 sync,这样队列就变成了同步执行,方便调试队列里面的任务。
  2. Job 里面的 handle 方法是可以注入别的 class 的,就像在 Controller action 里面也可以注入一样。

问答

问:什么时候使用 queue:listen 什么时候使用 queue:work

答:Laravel 5.3 的文档已经不写 queue:listen这个指令怎么用了,所以你可以看出来可能官方已经不怎么建议使用 queue:listen了,但是在本地调试的时候要使用 queue:listen,因为 queue:work在启动后,代码修改,queue:work不会再 Load 上下文,但是 queue:listen仍然会重新 Load 新代码。

其余情况全部使用 queue:work吧,因为效率更高。

命令讲解

以下是常用的指令,我讲解一下

php artisan queue:work --daemon --quiet --queue=default --delay=3 --sleep=3 --tries=3

--daemon

The queue:work Artisan command includes a --daemon option for forcing the queue worker to continue processing jobs without ever re-booting the framework. This results in a significant reduction of CPU usage when compared to the queue:listen command

总体来说,在 supervisor 中一般要加这个 option,可以节省 CPU 使用。

--quiet

不输出任何内容

--delay=3

一个任务失败后,延迟多长时间后再重试,单位是秒。这个值的设定我个人建议不要太短,因为一个任务失败(比如网络原因),重试时间太短可能会出现连续失败的情况。

--sleep=3

去 Redis 中拿任务的时候,发现没有任务,休息多长时间,单位是秒。这个值的设定要看你的任务是否紧急,如果是那种非常紧急的任务,不能等待太长时间。

--tries=3

定义失败任务最多重试次数。这个值的设定根据任务的重要程度来确定,一般 3 次比较适合。

Redis 中发生了什么事情

dispatch(new ExampleJob());

如果一个任务进入 default 队列,会发生:

127.0.0.1:6379>monitor

"RPUSH" 
"queues:default"
"{\"job\":\"Illuminate\\\\Queue\\\\CallQueuedHandler@call\",\"data\":{\"commandName\":\"App\\\\Jobs\\\\ExampleJob\",\"command\":\"O:19:\\\"App\\\\Jobs\\\\ExampleJob\\\":7:{s:17:\\\"\\u0000*\\u0000userIdentifier\\\";N;s:9:\\\"\\u0000*\\u0000realIp\\\";N;s:12:\\\"\\u0000*\\u0000requestId\\\";N;s:6:\\\"\\u0000*\\u0000job\\\";N;s:10:\\\"connection\\\";N;s:5:\\\"queue\\\";N;s:5:\\\"delay\\\";N;}\"},\"id\":\"bwA7ICPqnjYiM0ErjRBNwn0kVWF6KeAs\",\"attempts\":1}"

redis 中会出现如下内容:

127.0.0.1:6379> keys queue*
1) "queues:default"

如果执行命令:

php artisan queue:work --daemon --quiet --queue=default --delay=3 --sleep=3 --tries=3

Redis 会发生什么事情?

第一步:查看是否需要重启,如果 laravel:illuminate:queue:restart 存在,就重启队列(代码更新后,一定要重启队列,否则队列不会读取最新代码)。

"GET"
"laravel:illuminate:queue:restart"

第二步:查看zset queues:default:delayed ,注意这里的事务

"WATCH"
"queues:default:delayed"

"ZRANGEBYSCORE"
"queues:default:delayed"
"-inf"
"1485386782"

"UNWATCH"

第三步:查看 zset queues:default:reserved,注意这里的事务

"WATCH"
"queues:default:reserved"

"ZRANGEBYSCORE"
"queues:default:reserved"
"-inf"
"1485386782"

"UNWATCH"

第四步:从 queue:default list 中取任务,如果有任务,要把任务先暂存到 queues:default:reserved 中(过期时间60秒,Redis Queue 里面写一个任务最多执行60秒)。

任务执行结束会把 queues:default:reserved 中的任务删除,如果任务报错(Throw exception),也会把queues:default:reserved 中的任务删除,然后把任务扔进 queues:default:delay,delay 的秒数是 3 秒(因为我们上面参数配置的是 --delay=3)。

"LPOP"
"queues:default"

#取出任务后,先要放到 queues:default:reserved zset 中

"ZADD"
"queues:default:reserved"
"1485386842" 
"{\"job\":\"Illuminate\\\\Queue\\\\CallQueuedHandler@call\",\"data\":{\"commandName\":\"App\\\\Jobs\\\\ExampleJob\",\"command\":\"O:19:\\\"App\\\\Jobs\\\\ExampleJob\\\":7:{s:17:\\\"\\u0000*\\u0000userIdentifier\\\";N;s:9:\\\"\\u0000*\\u0000realIp\\\";N;s:12:\\\"\\u0000*\\u0000requestId\\\";N;s:6:\\\"\\u0000*\\u0000job\\\";N;s:10:\\\"connection\\\";N;s:5:\\\"queue\\\";N;s:5:\\\"delay\\\";N;}\"},\"id\":\"bwA7ICPqnjYiM0ErjRBNwn0kVWF6KeAs\",\"attempts\":1}"

# 任务执行完毕后, 从 queues:default:reserved zset 中删除

"ZREM"
"queues:default:reserved"
"{\"job\":\"Illuminate\\\\Queue\\\\CallQueuedHandler@call\",\"data\":{\"commandName\":\"App\\\\Jobs\\\\ExampleJob\",\"command\":\"O:19:\\\"App\\\\Jobs\\\\ExampleJob\\\":7:{s:17:\\\"\\u0000*\\u0000userIdentifier\\\";N;s:9:\\\"\\u0000*\\u0000realIp\\\";N;s:12:\\\"\\u0000*\\u0000requestId\\\";N;s:6:\\\"\\u0000*\\u0000job\\\";N;s:10:\\\"connection\\\";N;s:5:\\\"queue\\\";N;s:5:\\\"delay\\\";N;}\"},\"id\":\"bwA7ICPqnjYiM0ErjRBNwn0kVWF6KeAs\",\"attempts\":1}"

# 如果任务失败,会放到 queue:default:delay zset 中

"ZADD"
"queues:default:delayed"
"1485386783" 
"{\"job\":\"Illuminate\\\\Queue\\\\CallQueuedHandler@call\",\"data\":{\"commandName\":\"App\\\\Jobs\\\\ExampleJob\",\"command\":\"O:19:\\\"App\\\\Jobs\\\\ExampleJob\\\":7:{s:17:\\\"\\u0000*\\u0000userIdentifier\\\";N;s:9:\\\"\\u0000*\\u0000realIp\\\";N;s:12:\\\"\\u0000*\\u0000requestId\\\";N;s:6:\\\"\\u0000*\\u0000job\\\";N;s:10:\\\"connection\\\";N;s:5:\\\"queue\\\";N;s:5:\\\"delay\\\";N;}\"},\"id\":\"uuPBCq4QE9ocnw8UbkLhUl2Lh07yPm6M\",\"attempts\":1}"

Redis 中的数据结构和操作:

1. queue:default

数据结构:
List

操作:
LRANGE "queues:default" 0 -1 获取 List 里面的所有数据。

2. queue:default:reserved 和 queue:default:delay

数据结构:
Zset,时间是 zset 的 score,通过 score 来排序。

操作:
ZRANGE ”queues:default:reserved“ 0 -1 获取 zset 里面的所有数据。
ZRANGEBYSCORE queues:default:reserved -inf +inf 通过时间来排序获取所有数据。

注意

Redis 里面一个任务默认最多执行60秒,如果一个任务60秒没有执行完毕,会继续放回到队列中,循环执行,那酸爽(依稀记得那个加班的夜晚........)
file

原文链接:https://www.lijinma.com/blog/2017/01/31/la...

本作品采用《CC 协议》,转载必须注明作者和本文链接
写文字大部分时候是因为我希望能帮助到你,小部分时候是想做总结或做记录。我的微信是 lijinma,希望和你交朋友。 以下是我的公众账号,会分享我的学习和成长。
本帖由 Summer 于 7年前 加精
lijinma
《L04 微信小程序从零到发布》
从小程序个人账户申请开始,带你一步步进行开发一个微信小程序,直到提交微信控制台上线发布。
《L02 从零构建论坛系统》
以构建论坛项目 LaraBBS 为线索,展开对 Laravel 框架的全面学习。应用程序架构思路贴近 Laravel 框架的设计哲学。
讨论数量: 25
lijinma

@leo 谢谢 leo 提醒,我查看了 5.2 的代码

//Illuminate\Queue\RedisQueue
public function pop($queue = null)
{
    $original = $queue ?: $this->default;

    $queue = $this->getQueue($queue);

    if (! is_null($this->expire)) {
        $this->migrateAllExpiredJobs($queue);
    }

    $job = $this->getConnection()->lpop($queue);

    if (! is_null($job)) {
        //这里设置的过期时间。
        $this->getConnection()->zadd($queue.':reserved', $this->getTime() + $this->expire, $job);

        return new RedisJob($this->container, $this, $job, $original);
    }
}

queue:work 确实没有 timeout 的参数,但是 queue:work 在 RedisQueue 的时候可以设置 expire

//Illuminate\Queue\Connectors\RedisConnector
public function connect(array $config)
{
    $queue = new RedisQueue(
        $this->redis, $config['queue'], Arr::get($config, 'connection', $this->connection)
    );

    $queue->setExpire(Arr::get($config, 'expire', 60));

    return $queue;
}

可以在 queue.php 里面可以通过设置 expire 来延长一个任务的执行时间。

        'redis' => [
            'driver'     => 'redis',
            'connection' => 'default',
            'queue'      => 'default',
            'expire'     => 60
        ],

文章我已经修改。

7年前 评论
Summer

--tries=3 应该是:定义失败任务最多重试次数。

来自:https://learnku.com/docs/laravel/5.3/queue...

Redis 里面一个任务最多执行60秒(写死了) 这个牛 :smile_cat:

7年前 评论
lijinma

@Summer 谢谢 Summer 提醒,我测试了一下,果然 LPOP 了4次,多谢,已修改。

7年前 评论
lijinma

@anyuzhe 我暂时没有好的解决方案,一般是只把一些数据扔到队列里面,然后在 handle 的时候,初始化不能序列化的类。

7年前 评论
leo

追了一下5.2的相关代码,没有发现60秒写死的逻辑呀,只有使用queue:listen的时候可以通过--timeout选项指定单个任务超时

7年前 评论
lijinma

@leo 谢谢 leo 提醒,我查看了 5.2 的代码

//Illuminate\Queue\RedisQueue
public function pop($queue = null)
{
    $original = $queue ?: $this->default;

    $queue = $this->getQueue($queue);

    if (! is_null($this->expire)) {
        $this->migrateAllExpiredJobs($queue);
    }

    $job = $this->getConnection()->lpop($queue);

    if (! is_null($job)) {
        //这里设置的过期时间。
        $this->getConnection()->zadd($queue.':reserved', $this->getTime() + $this->expire, $job);

        return new RedisJob($this->container, $this, $job, $original);
    }
}

queue:work 确实没有 timeout 的参数,但是 queue:work 在 RedisQueue 的时候可以设置 expire

//Illuminate\Queue\Connectors\RedisConnector
public function connect(array $config)
{
    $queue = new RedisQueue(
        $this->redis, $config['queue'], Arr::get($config, 'connection', $this->connection)
    );

    $queue->setExpire(Arr::get($config, 'expire', 60));

    return $queue;
}

可以在 queue.php 里面可以通过设置 expire 来延长一个任务的执行时间。

        'redis' => [
            'driver'     => 'redis',
            'connection' => 'default',
            'queue'      => 'default',
            'expire'     => 60
        ],

文章我已经修改。

7年前 评论
lijinma

@leo 但是 timeoutexpire 不太一样,timeout 是控制子进程的过期时间,expire 是队列里面任务的一个控制时间。

7年前 评论

monitor命令好用

7年前 评论
lijinma

@mingyun 嘿嘿。

7年前 评论

为什么不使用 rabbitmq , zeromq 之流?

7年前 评论
lijinma

@qufo redis 比较轻,很快就玩起来。

7年前 评论

@anyuzhe Eloquent 模型类被序列化只记录主键的值,反序列话时根据主键值去查找模型,其非继承模型类的类或值,采用默认的序列化方式,是可以使用一般的类的

7年前 评论

指定任务推送到哪一个队列上,这里 onQueue 方法所选的任务分类是在哪个地方定义的?

7年前 评论
lijinma

@Jorly 随便定义,比如你定义了一个 sms,只要你在跑队列的时候,从指定的 sms 里面拿数据进行。比如:

php artisan queue:work --daemon --quiet --queue=sms --delay=3 --sleep=3 --tries=3
7年前 评论

@anyuzhe 对象的方法丢了是啥意思啊?我不太能明白你的意思。我没遇到过这种哦。能贴点代码示例吗?难道你发短信的对象有属性是闭包嘛?php是不能序列化闭包属性的啊。

7年前 评论

再job中,调用guzzlehttp,然后请求外部接口挂掉了,这个队列会自动重试(只要报异常,就重试?),需要我在代码里做什么处理吗?

6年前 评论
sushengbuhuo

@839891627 try catch 里重试

5年前 评论
sushengbuhuo

@anyuzhe 有代码发出来看看吗 没遇到过

5年前 评论

@Jorly 指定任务推送到哪一个队列上,这里 onQueue 方法所选的任务分类是在哪个地方定义的? 请问这个答案确定了吗

5年前 评论

遇到个,QUEUE_DRIVER=sync 时执行正常,当 QUEUE_DRIVER=database 或 redis 时,如:只要是 bModel.php 模型时,jobs 表一切正常,但是状态未改变,failed_jobs 表也未有执行未成功的 jobs,好像就是执行 jobs 时,里面的代码未执行一样。但是 aModel.php 模型时队列按期望的方式正常执行了。
测试把 aModel.php 模型的 job 与 bModel.php 的 job 合并成同一个,里面使用 if 判断来区分执行,经过测试,当模型是 aModel.php 是队列按期望正常执行,当是 bModel.php 时发生了同上述异常正常的情况。好怪异

5年前 评论
lijian81519180 3年前

Laravel 5.5 已经不使用 WATCH 来执行, 而是使用 Lua 脚本来确保操作的原子性.
有兴趣的可以看下这篇: https://segmentfault.com/a/119000001913559...

4年前 评论
hookover

超时90秒,发起新任务执行,这个怎么解决的?

4年前 评论
JerryMan

@Summer laravel5.3如果我只用了命令php artisan queue:work 在job里设置tries有效么

4年前 评论

我在写 Job 的逻辑时,都习惯在末尾加上 $this->job->delete(); :sweat_smile:

3年前 评论

file 记录一下 60s 超时(我用了sleep(120) 测试)

2年前 评论

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!