自己实现Laravel队列驱动,如何起多个队列监听进程?

我正在实现一个 阿里云轻量消息队列 (以下简称smq)的laravel队列驱动

Laravel默认的队列是自己发布消息,自己消费消息,我已经完美实现了这一点

但是smq有队列模型和话题模型,现在存在了两个问题,话题模型插入队列,以及非laravel系统产生的队列,我要如何在laravel系统中消费,我想实现的效果类似webman的队列,www.workerman.net/doc/webman/queue...:

    //配置config/queue.php
    'connections' => [
        ...
        'smq1' => [
            'driver' => 'smq',
            'endpoint' => 'ccccccc',
            'access_id' => 'yyyyyyy',
            'access_key' => 'xxxxx',
            'queue' => env('MNS_QUEUE', 'myqueue'),
            // 仅消费类
            'jobs' => [
                \App\Jobs\SmqTestJob::class,
                \App\Jobs\SmqTest2Job::class,
            ],
        ],

    // SmqTestJob
    <?php
    namespace App\Jobs;

    class SmqTestJob
    {

        // 队列名  其他配置与smq1相同
        public $queue = 'myqueue222';

        // 自己解析和消费myqueue222队列的消息
        // 该消息可能由topic插入myqueue222队列,也可能是其他系统插入队列
        public function handle(): void
        {   

            echo  "消费xxxxxxxxxxxxxxxbbbbbbbxxxxxxxxxxxxxxxxxxxxxxxxx\n";
        }
    }

如果我想运行connections.smq1.jobs里的仅消费类,目前只能在smq驱动里这样做

<?php

use Illuminate\Contracts\Queue\Queue as QueueContract;
use Illuminate\Queue\Queue;

class SmqQueue extends Queue implements QueueContract
{

    public function pop($queue = null)
    {
        $jobs = $this->config['jobs'] ?? [];
        foreach ($jobs as $jobClass) {
            // 遍历jobs中的仅消费类
            $jobQueue = $jobClass::$queue;
            $message = $this->fetchMessageFromQueue($jobQueue);
            if ($message) {
                return new SmqJob($this, $message, $jobQueue, $jobClass);
            }
        }
        return null;
    }

    protected function fetchMessageFromQueue($queue)
    {
        // 拉取该仅消费类对应的队列的消息
        $queue_name = $queue->queue;
        return $this->getQueueRef($queue_name)->receiveMessage(30);
    }
}

但是这存在几个问题,比如我有10个仅消费类,前面9个都没有消息,每个都阻塞30秒的话,轮询到第十个,就会花270秒,这显然不符合需求,消息很可能有时效性,我需要每个仅消费类都有一个独立的进程互不影响

我参考了 github.com/vyuldashev/laravel-queu... 这个库,它是实现了这个功能的,但是…我没看懂它的原理和实现

方案2: 每个仅消费类在queue独立一个配置,但是这样就需要起很多个 artisan queue:work xx1, artisan queue:work xx2, artisan queue:work xx3 并且为他们做不同守护进程,这不太现实,因为可能有几十个仅消费类,这样做太繁琐

前面的方案如何能为每个仅消费类起一个独立的进程,并且通过 artisan queue:work smq1 这样一条命令来启停

:) wink
唐章明
《L04 微信小程序从零到发布》
从小程序个人账户申请开始,带你一步步进行开发一个微信小程序,直到提交微信控制台上线发布。
《L01 基础入门》
我们将带你从零开发一个项目并部署到线上,本课程教授 Web 开发中专业、实用的技能,如 Git 工作流、Laravel Mix 前端工作流等。
最佳答案

简单看了一下, 感觉你的理解有点问题. 不要在 pop 方法里实现多个队列 正确的逻辑是


  • connection=smq
  • 每个阿里 topic 定义一个队列消费 (q1, q2, q3, high)
  • 你的 pop 方法实现, 只管一个 queue 主题(使用ReceiveMessage方法), 一次只获取一个消息(不要从多个主题获取), (不要遍历所有jobs, 而是获取消息反序列化为你定义的某一个 class)

多主题消费应该由 laravel 去操作 php artisan queue:work --queue=high,q1,q2,q3

2周前 评论
唐章明 (楼主) 2周前
唐章明 (楼主) 2周前
讨论数量: 5
唐章明

好像开多个进程可以解决这个问题

2周前 评论

时效性队列和普通队列需要区分开。

webman 能那样配置是它有协程,可以同时监听配置,正常的laravel不行,需要每个时效性的队列都要做个监听

监听多个用守护程序配置监听就行

2周前 评论

简单看了一下, 感觉你的理解有点问题. 不要在 pop 方法里实现多个队列 正确的逻辑是


  • connection=smq
  • 每个阿里 topic 定义一个队列消费 (q1, q2, q3, high)
  • 你的 pop 方法实现, 只管一个 queue 主题(使用ReceiveMessage方法), 一次只获取一个消息(不要从多个主题获取), (不要遍历所有jobs, 而是获取消息反序列化为你定义的某一个 class)

多主题消费应该由 laravel 去操作 php artisan queue:work --queue=high,q1,q2,q3

2周前 评论
唐章明 (楼主) 2周前
唐章明 (楼主) 2周前

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!