Laravel5.6 整合 RabbitMQ 消息队列

1、Composer安装laravel-queue-rabbitmq
composer require vladimir-yuldashev/laravel-queue-rabbitmq
2、在config/app.php文件中,providers中添加:
VladimirYuldashev\LaravelQueueRabbitMQ\LaravelQueueRabbitMQServiceProvider::class,
3、在app/config/queue.php配置文件中的connections数组中加入以下配置

'rabbitmq' => [

            'driver' => 'rabbitmq',

            'dsn' => env('RABBITMQ_DSN', null),

            /*
             * Could be one a class that implements \Interop\Amqp\AmqpConnectionFactory for example:
             *  - \EnqueueAmqpExt\AmqpConnectionFactory if you install enqueue/amqp-ext
             *  - \EnqueueAmqpLib\AmqpConnectionFactory if you install enqueue/amqp-lib
             *  - \EnqueueAmqpBunny\AmqpConnectionFactory if you install enqueue/amqp-bunny
             */

            'factory_class' => Enqueue\AmqpLib\AmqpConnectionFactory::class,

            'host' => env('RABBITMQ_HOST', '127.0.0.1'),
            'port' => env('RABBITMQ_PORT', 5672),

            'vhost' => env('RABBITMQ_VHOST', '/'),
            'login' => env('RABBITMQ_LOGIN', 'guest'),
            'password' => env('RABBITMQ_PASSWORD', 'guest'),

            'queue' => env('RABBITMQ_QUEUE', 'default'),

            'options' => [

                'exchange' => [

                    'name' => env('RABBITMQ_EXCHANGE_NAME'),

                    /*
                     * Determine if exchange should be created if it does not exist.
                     */

                    'declare' => env('RABBITMQ_EXCHANGE_DECLARE', true),

                    /*
                     * Read more about possible values at https://www.rabbitmq.com/tutorials/amqp-concepts.html
                     */

                    'type' => env('RABBITMQ_EXCHANGE_TYPE', \Interop\Amqp\AmqpTopic::TYPE_DIRECT),
                    'passive' => env('RABBITMQ_EXCHANGE_PASSIVE', false),
                    'durable' => env('RABBITMQ_EXCHANGE_DURABLE', true),
                    'auto_delete' => env('RABBITMQ_EXCHANGE_AUTODELETE', false),
                    'arguments' => env('RABBITMQ_EXCHANGE_ARGUMENTS'),
                ],

                'queue' => [

                    /*
                     * Determine if queue should be created if it does not exist.
                     */

                    'declare' => env('RABBITMQ_QUEUE_DECLARE', true),

                    /*
                     * Determine if queue should be binded to the exchange created.
                     */

                    'bind' => env('RABBITMQ_QUEUE_DECLARE_BIND', true),

                    /*
                     * Read more about possible values at https://www.rabbitmq.com/tutorials/amqp-concepts.html
                     */

                    'passive' => env('RABBITMQ_QUEUE_PASSIVE', false),
                    'durable' => env('RABBITMQ_QUEUE_DURABLE', true),
                    'exclusive' => env('RABBITMQ_QUEUE_EXCLUSIVE', false),
                    'auto_delete' => env('RABBITMQ_QUEUE_AUTODELETE', false),
                    'arguments' => env('RABBITMQ_QUEUE_ARGUMENTS'),
                ],
            ],

            /*
             * Determine the number of seconds to sleep if there's an error communicating with rabbitmq
             * If set to false, it'll throw an exception rather than doing the sleep for X seconds.
             */

            'sleep_on_error' => env('RABBITMQ_ERROR_SLEEP', 5),

            /*
             * Optional SSL params if an SSL connection is used
             * Using an SSL connection will also require to configure your RabbitMQ to enable SSL. More details can be founds here: https://www.rabbitmq.com/ssl.html
             */

            'ssl_params' => [
                'ssl_on' => env('RABBITMQ_SSL', false),
                'cafile' => env('RABBITMQ_SSL_CAFILE', null),
                'local_cert' => env('RABBITMQ_SSL_LOCALCERT', null),
                'local_key' => env('RABBITMQ_SSL_LOCALKEY', null),
                'verify_peer' => env('RABBITMQ_SSL_VERIFY_PEER', true),
                'passphrase' => env('RABBITMQ_SSL_PASSPHRASE', null),
            ],

        ],

4、修改 .env 文件


QUEUE_CONNECTION=rabbitmq    #这个配置env一般会有先找到修改为这个

#以下是新增配置

RABBITMQ_HOST=rabbitmq  #mq的服务器地址,我这里用的是laradock,具体的就具体修改咯
RABBITMQ_PORT=5672  #mq的端口
RABBITMQ_VHOST=/
RABBITMQ_LOGIN=guest    #mq的登录名
RABBITMQ_PASSWORD=guest   #mq的密码
RABBITMQ_QUEUE=queue_name   #mq的队列名称

5、创建任务类
php artisan make:job Queue
执行之后会生成一个文件app/Jobs/Queue.php

例子:

<?php

namespace App\Jobs;

use App\Entities\Posts;
use Illuminate\Bus\Queueable;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;

class Queue  implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    private $data;


    /**
     * Queue constructor.
     * @param $data
     */
    public function __construct($data)
    {
        $this->data = $data;
    }

    /**
     * Execute the job.
     *
     * @return void
     */
    public function handle()
    {

        try{
            $insert = [
                'title'=>$this->data->title,
                'author_id'=>$this->data->author_id,
                'content'=>$this->data->content,
                'description'=>$this->data->description,
            ];
            $result = Posts::create($insert);
            echo json_encode(['code' => 200, 'msg' => $result]);
        }catch (\Exception $exception) {
            echo json_encode(['code'=>0,'msg'=>$exception->getMessage()]);
        }

    }
}

6、生产,把数据放进mq队列

<?php

namespace App\Http\Controllers;

use App\Entities\CostaNews;
use App\Jobs\Queue;

class IndexController extends Controller
{

    public function index()
    {
        $data = CostaNews::get();
        foreach ($data as $item) {
            $this->dispatch(new Queue($item));
        }
        return response()->json(['code'=>0, 'msg'=>"success"]);
    }

}

7、消费队列
执行命令进行消费:
php artisan queue:work rabbitmq
效果如下:

root@9e99cf9fba73:/var/www/blog# php artisan  queue:work rabbitmq
[2018-12-24 07:34:32][5c208bf66e63b3.56379160] Processing: App\Jobs\Queue
{"code":200,"msg":{"title":1,"author_id":2,"content":"\u5185\u5bb9","description":"\u63cf\u8ff0","updated_at":"2018-12-24 07:34:32","created_at":"2018-12-24 07:34:32","id":1}}[2018-12-24 07:34:32][5c208bf66e63b3.56379160] Processed:  App\Jobs\Queue
[2018-12-24 07:34:32][5c208bf66ff7c3.20969590] Processing: App\Jobs\Queue
{"code":200,"msg":{"title":2,"author_id":2,"content":"\u5185\u5bb92","description":"\u63cf\u8ff02","updated_at":"2018-12-24 07:34:32","created_at":"2018-12-24 07:34:32","id":2}}[2018-12-24 07:34:32][5c208bf66ff7c3.20969590] Processed:  App\Jobs\Queue
[2018-12-24 07:34:32][5c208bf6702695.93123122] Processing: App\Jobs\Queue
{"code":200,"msg":{"title":3,"author_id":2,"content":"\u5185\u5bb93","description":"\u63cf\u8ff03","updated_at":"2018-12-24 07:34:32","created_at":"2018-12-24 07:34:32","id":3}}[2018-12-24 07:34:32][5c208bf6702695.93123122] Processed:  App\Jobs\Queue
[2018-12-24 07:34:32][5c208bf6706e24.78015170] Processing: App\Jobs\Queue
{"code":200,"msg":{"title":4,"author_id":2,"content":"\u5185\u5bb94","description":"\u63cf\u8ff04","updated_at":"2018-12-24 07:34:32","created_at":"2018-12-24 07:34:32","id":4}}[2018-12-24 07:34:32][5c208bf6706e24.78015170] Processed:  App\Jobs\Queue
[2018-12-24 07:34:32][5c208bf6709be0.07998731] Processing: App\Jobs\Queue
{"code":200,"msg":{"title":5,"author_id":2,"content":"\u5185\u5bb95","description":"\u63cf\u8ff05","updated_at":"2018-12-24 07:34:32","created_at":"2018-12-24 07:34:32","id":5}}[2018-12-24 07:34:32][5c208bf6709be0.07998731] Processed:  App\Jobs\Queue

注意:使用这个laravel-queue-rabbitmq这个包需要开启sockets拓展,不然会报错

========此教程与2018年写的可能已过时,最新的请阅读包官方文档=========

本作品采用《CC 协议》,转载必须注明作者和本文链接
每天进步一点点,多年以后再回头,就会发现自己不知不觉走了很远很远
本帖由系统于 5年前 自动加精
《L03 构架 API 服务器》
你将学到如 RESTFul 设计风格、PostMan 的使用、OAuth 流程,JWT 概念及使用 和 API 开发相关的进阶知识。
《L02 从零构建论坛系统》
以构建论坛项目 LaraBBS 为线索,展开对 Laravel 框架的全面学习。应用程序架构思路贴近 Laravel 框架的设计哲学。
讨论数量: 44

AMQP error while attempting pushRaw: Undefined index: name 有谁遇到过这种错误吗

4年前 评论
xiaoksle 3年前
singhania 3年前
zero风来 (作者) 3年前
4年前 评论
xugege 4年前
-client 3年前

有没有使用腾讯云的tdmq 接入在lumen 或者laravel 里面作为消息队列的

2年前 评论

Class 'Interop\Amqp\AmqpTopic' not found

3年前 评论
木大大 2年前
haojiubujian2014 2年前
木大大 2年前
Sunn 1年前

这个不适用于生产环境

3年前 评论
一念沧海一念桑田 (楼主) 3年前

这个只是简单的队列啊!怎么实现工作队列啊

3年前 评论
一念沧海一念桑田 (楼主) 3年前

The connection timed out after 3 sec while awaiting incoming data出现这个问题是什么造成的呢

3年前 评论
一念沧海一念桑田 (楼主) 3年前
-client (作者) 3年前
一念沧海一念桑田 (楼主) 3年前

请问下大牛,rabbitmq 这个插件如何手动确认消息,而不是消费之后自动确认删除,queue当中的auto_delete已经是false了但消费了之后貌似也会自动删除,是我理解的不对还是,请指教,谢谢啦

4年前 评论
一念沧海一念桑田 (楼主) 3年前

想问一下,这个扩展的多队列如何配置啊,还有rabbitmq的路由键支持吗?

4年前 评论
一念沧海一念桑田 (楼主) 4年前
halt-dudu (作者) 4年前
xugege

local.ERROR: AMQP error while attempting pushRaw: The connection timed out after 3 sec while awaiting incoming data . 请问我报这个错误应该如何处理这个异常,3秒这个超时可以配置吗?如果可以的话,请问如何配置

4年前 评论

建议你先了解一下原生的rabbitmq怎么写吧, 先了解rabbitmq的机制先再来整合会比较清晰点, https://www.cnblogs.com/php-linux/p/799461... https://www.cnblogs.com/linkenpark/p/53936... 这里有个例子还是讲的很清楚的, 你先看看吧 。 看你这个截图数据好像没问题啊, 你执行出队了没有。执行 php artisan queue:work rabbitmq 之后, 在 handle 里面可以拿到正常的数据的

4年前 评论

我想问下,使用这个包,还需要安装PHP的amqp扩展了么 ?

5年前 评论
houmuxu

请问一下,你5.6的框架用的哪个版本的包,我5.5用最新的8.1的不好使

4年前 评论

我把RABBITMQ_ERROR_SLEEP配置成false的时候,偶尔就出现Error writing data to the connection with RabbitMQ这个错误,不设置的话,就消息没推送成功,但又没有提示。请问知道是什么原因吗?

4年前 评论

@lm604882597 你先理清楚什么是生产者什么是消费者。加入队列的时候叫做生产,出队列的时候叫做消费。本例中:生产就是生产数据,这时候把数据组装好,放进队列里面等待被消费。执行消费命令的时候,会执行出队操作也就是消费操作,这时候会把数据放进数据库

4年前 评论

上面这个例子是,加入队列,然后插入数据库?
后面的消费是什么意思?

4年前 评论

@Complicated 其实我也是小白, 就我个人而言,如果是方便的话,肯定是自带的使用起来方便, 但是如果mq已经搭建好了的, 我还是选择用mq, mq可以多语言公用, 还有支持持久化。还有其他的我不知道怎么描述,你百度看看mq的好处吧

4年前 评论
Complicated

问下小白问题,mq和laravel 自带队列,那个好用些啊

4年前 评论
董雷 4年前

@houmuxu 一样的道理呀,在其他服务器配置这个服务器的mq账号密码和地址就好了

5年前 评论
CrazyZard

@klgd 好的 我明白了 谢谢

5年前 评论

@CrazyZard 不能,这个扩展是一个 connection 对应一个 exchange,多个exchange就要配置多个connection
或者不指定 exchange name, 这样的话 exchange 就跟 queue name一样了

5年前 评论
CrazyZard

如果是使用消息分发机制,项目需要对进行不同频道的发送,那么exchange的值能否实时改变?

5年前 评论

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!