分析 Laravel 队列实现原理解决问题

问题

公司项目使用Laravel的开发的两个项目在同一个测试服务器部署,公用同一个redis。在使用laravel中的队列时,产生冲突干扰。

查找问题原因

在laravel 队列的操作类Illuminate\Queue\RedisQueue.php中可以看到pushRaw()方法:

// 将一任务推入队列中
public function pushRaw($payload, $queue = null, array $options = [])
    {
        $this->getConnection()->rpush($this->getQueue($queue), $payload);

        return Arr::get(json_decode($payload, true), 'id');
    }

从该方法中可以看出Lrarvel队列的redis实现是通过list结构实现的,rpush(key, value)是将value推入键值为key的redis队列,key的值则是通过$this->getQueue($queue) 获取到的

protected function getQueue($queue)
    {
        return 'queues:'.($queue ?: $this->default);
    }

所以的redis中list中的key是 'queues:'.($queue ?: $this->default);拼接的,$this->default 的值是 RedisQueue 实例化的时候从config\queue.php配置中加载的 'queue' => 'default'$queue 是添加队列时$this->dispatch( new jobClass()->onQueue($queue) )传入的。

// config\queue.php 文件中的redis配置部分
'redis' => [
            'driver'     => 'redis',
            'connection' => 'default',
            'queue'      => 'default',
            'expire'     => 60,
        ],

至此,两个项目的队列冲突原因就找到了。因为redis队列配置中 'queue' => 'default' 都使用的默认的default,所以当共用redis时,默认的队列list 都是'queue:default',所以导致了冲突。

因为队列监听 监听的队列名称是由 --queue参数决定的,如果不传就是我们上面设置的默认值,若传了就会根据传入的队列名从前往后优先依次处理,具体见代码Illuminate\Queue\Worker.php中:

protected function getNextJob($connection, $queue)
    {
        if (is_null($queue)) {
            return $connection->pop();
        }

        foreach (explode(',', $queue) as $queue) {
            if (! is_null($job = $connection->pop($queue))) {
                return $job;
            }
        }
    }

$queue就是--queue=传入的参数,当 $queue不存在是直接调用$connection->pop()当参数存在时会将参数解析,优先处理排在前面的队列名称,将队列名称传入pop($queue), pop()会尝试从指定队列或默认队列中获取队列任务

// Illuminate\Queue\RedisQueue.php
public function pop($queue = null)
    {
        $original = $queue ?: $this->default;

        $queue = $this->getQueue($queue);

        if (! is_null($this->expire)) {
            $this->migrateAllExpiredJobs($queue);
        }

        $job = $this->getConnection()->lpop($queue);

        if (! is_null($job)) {
            $this->getConnection()->zadd($queue.':reserved', $this->getTime() + $this->expire, $job);

            return new RedisJob($this->container, $this, $job, $original);
        }
    }

至此搞清了队列执行的原理。

解决方法

将queue的配置文件中默认队列修改为不同的名称,比如: 'queue' => laravel1','queue' => laravel2'。

队列监听 php artisan queue:listen redis --queue=laravel1,syncExpress

最后

欢迎关注我的博客 ^_^

本作品采用《CC 协议》,转载必须注明作者和本文链接
Dr點燃
本帖由 Summer 于 7年前 加精
《L05 电商实战》
从零开发一个电商项目,功能包括电商后台、商品 & SKU 管理、购物车、订单管理、支付宝支付、微信支付、订单退款流程、优惠券等
《L01 基础入门》
我们将带你从零开发一个项目并部署到线上,本课程教授 Web 开发中专业、实用的技能,如 Git 工作流、Laravel Mix 前端工作流等。
讨论数量: 7

这是消息队列生产和消费的过程,不是原理

7年前 评论

两个项目共用一个redis时一定要用 options 定义 prefix, 或者使用不同的 redis 数据库,否则你还会碰到别的问题。

redis貌似是有200多个数据库吧,从0开始编号。个人不喜欢这种方法,因为数据库不能重新命名只能用数字,很难维护。

7年前 评论

@sdpfoue 因为是测试环境所以就~~~ 回复的后半段我没看懂 O(∩_∩)O哈哈~

7年前 评论
liuqing_hu

@sdpfoue Redis 默认数据库从编号 0 到 15 共计 16 个数据库,但在理论上 Redis 的数据库个数不受限制。可以在配置文件 redis.conf 中修改 databases 配置。

7年前 评论
sushengbuhuo

php artisan queue:listen redis --queue=laravel1,syncExpress这是同时监听2个队列?

6年前 评论

@sushengbuhuo 对的,监听这两个指定队列,并且排前面的参数优先处理

6年前 评论

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!