Laravel5.6 整合 RabbitMQ 消息队列

1、Composer安装laravel-queue-rabbitmq
composer require vladimir-yuldashev/laravel-queue-rabbitmq
2、在config/app.php文件中,providers中添加:
VladimirYuldashev\LaravelQueueRabbitMQ\LaravelQueueRabbitMQServiceProvider::class,
3、在app/config/queue.php配置文件中的connections数组中加入以下配置

'rabbitmq' => [

            'driver' => 'rabbitmq',

            'dsn' => env('RABBITMQ_DSN', null),

            /*
             * Could be one a class that implements \Interop\Amqp\AmqpConnectionFactory for example:
             *  - \EnqueueAmqpExt\AmqpConnectionFactory if you install enqueue/amqp-ext
             *  - \EnqueueAmqpLib\AmqpConnectionFactory if you install enqueue/amqp-lib
             *  - \EnqueueAmqpBunny\AmqpConnectionFactory if you install enqueue/amqp-bunny
             */

            'factory_class' => Enqueue\AmqpLib\AmqpConnectionFactory::class,

            'host' => env('RABBITMQ_HOST', '127.0.0.1'),
            'port' => env('RABBITMQ_PORT', 5672),

            'vhost' => env('RABBITMQ_VHOST', '/'),
            'login' => env('RABBITMQ_LOGIN', 'guest'),
            'password' => env('RABBITMQ_PASSWORD', 'guest'),

            'queue' => env('RABBITMQ_QUEUE', 'default'),

            'options' => [

                'exchange' => [

                    'name' => env('RABBITMQ_EXCHANGE_NAME'),

                    /*
                     * Determine if exchange should be created if it does not exist.
                     */

                    'declare' => env('RABBITMQ_EXCHANGE_DECLARE', true),

                    /*
                     * Read more about possible values at https://www.rabbitmq.com/tutorials/amqp-concepts.html
                     */

                    'type' => env('RABBITMQ_EXCHANGE_TYPE', \Interop\Amqp\AmqpTopic::TYPE_DIRECT),
                    'passive' => env('RABBITMQ_EXCHANGE_PASSIVE', false),
                    'durable' => env('RABBITMQ_EXCHANGE_DURABLE', true),
                    'auto_delete' => env('RABBITMQ_EXCHANGE_AUTODELETE', false),
                    'arguments' => env('RABBITMQ_EXCHANGE_ARGUMENTS'),
                ],

                'queue' => [

                    /*
                     * Determine if queue should be created if it does not exist.
                     */

                    'declare' => env('RABBITMQ_QUEUE_DECLARE', true),

                    /*
                     * Determine if queue should be binded to the exchange created.
                     */

                    'bind' => env('RABBITMQ_QUEUE_DECLARE_BIND', true),

                    /*
                     * Read more about possible values at https://www.rabbitmq.com/tutorials/amqp-concepts.html
                     */

                    'passive' => env('RABBITMQ_QUEUE_PASSIVE', false),
                    'durable' => env('RABBITMQ_QUEUE_DURABLE', true),
                    'exclusive' => env('RABBITMQ_QUEUE_EXCLUSIVE', false),
                    'auto_delete' => env('RABBITMQ_QUEUE_AUTODELETE', false),
                    'arguments' => env('RABBITMQ_QUEUE_ARGUMENTS'),
                ],
            ],

            /*
             * Determine the number of seconds to sleep if there's an error communicating with rabbitmq
             * If set to false, it'll throw an exception rather than doing the sleep for X seconds.
             */

            'sleep_on_error' => env('RABBITMQ_ERROR_SLEEP', 5),

            /*
             * Optional SSL params if an SSL connection is used
             * Using an SSL connection will also require to configure your RabbitMQ to enable SSL. More details can be founds here: https://www.rabbitmq.com/ssl.html
             */

            'ssl_params' => [
                'ssl_on' => env('RABBITMQ_SSL', false),
                'cafile' => env('RABBITMQ_SSL_CAFILE', null),
                'local_cert' => env('RABBITMQ_SSL_LOCALCERT', null),
                'local_key' => env('RABBITMQ_SSL_LOCALKEY', null),
                'verify_peer' => env('RABBITMQ_SSL_VERIFY_PEER', true),
                'passphrase' => env('RABBITMQ_SSL_PASSPHRASE', null),
            ],

        ],

4、修改 .env 文件


QUEUE_CONNECTION=rabbitmq    #这个配置env一般会有先找到修改为这个

#以下是新增配置

RABBITMQ_HOST=rabbitmq  #mq的服务器地址,我这里用的是laradock,具体的就具体修改咯
RABBITMQ_PORT=5672  #mq的端口
RABBITMQ_VHOST=/
RABBITMQ_LOGIN=guest    #mq的登录名
RABBITMQ_PASSWORD=guest   #mq的密码
RABBITMQ_QUEUE=queue_name   #mq的队列名称

5、创建任务类
php artisan make:job Queue
执行之后会生成一个文件app/Jobs/Queue.php

例子:

<?php

namespace App\Jobs;

use App\Entities\Posts;
use Illuminate\Bus\Queueable;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;

class Queue  implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    private $data;

    /**
     * Queue constructor.
     * @param $data
     */
    public function __construct($data)
    {
        $this->data = $data;
    }

    /**
     * Execute the job.
     *
     * @return void
     */
    public function handle()
    {

        try{
            $insert = [
                'title'=>$this->data->title,
                'author_id'=>$this->data->author_id,
                'content'=>$this->data->content,
                'description'=>$this->data->description,
            ];
            $result = Posts::create($insert);
            echo json_encode(['code' => 200, 'msg' => $result]);
        }catch (\Exception $exception) {
            echo json_encode(['code'=>0,'msg'=>$exception->getMessage()]);
        }

    }
}

6、生产,把数据放进mq队列

<?php

namespace App\Http\Controllers;

use App\Entities\CostaNews;
use App\Jobs\Queue;

class IndexController extends Controller
{

    public function index()
    {
        $data = CostaNews::get();
        foreach ($data as $item) {
            $this->dispatch(new Queue($item));
        }
        return response()->json(['code'=>0, 'msg'=>"success"]);
    }

}

7、消费队列
执行命令进行消费:
php artisan queue:work rabbitmq
效果如下:

root@9e99cf9fba73:/var/www/blog# php artisan  queue:work rabbitmq
[2018-12-24 07:34:32][5c208bf66e63b3.56379160] Processing: App\Jobs\Queue
{"code":200,"msg":{"title":1,"author_id":2,"content":"\u5185\u5bb9","description":"\u63cf\u8ff0","updated_at":"2018-12-24 07:34:32","created_at":"2018-12-24 07:34:32","id":1}}[2018-12-24 07:34:32][5c208bf66e63b3.56379160] Processed:  App\Jobs\Queue
[2018-12-24 07:34:32][5c208bf66ff7c3.20969590] Processing: App\Jobs\Queue
{"code":200,"msg":{"title":2,"author_id":2,"content":"\u5185\u5bb92","description":"\u63cf\u8ff02","updated_at":"2018-12-24 07:34:32","created_at":"2018-12-24 07:34:32","id":2}}[2018-12-24 07:34:32][5c208bf66ff7c3.20969590] Processed:  App\Jobs\Queue
[2018-12-24 07:34:32][5c208bf6702695.93123122] Processing: App\Jobs\Queue
{"code":200,"msg":{"title":3,"author_id":2,"content":"\u5185\u5bb93","description":"\u63cf\u8ff03","updated_at":"2018-12-24 07:34:32","created_at":"2018-12-24 07:34:32","id":3}}[2018-12-24 07:34:32][5c208bf6702695.93123122] Processed:  App\Jobs\Queue
[2018-12-24 07:34:32][5c208bf6706e24.78015170] Processing: App\Jobs\Queue
{"code":200,"msg":{"title":4,"author_id":2,"content":"\u5185\u5bb94","description":"\u63cf\u8ff04","updated_at":"2018-12-24 07:34:32","created_at":"2018-12-24 07:34:32","id":4}}[2018-12-24 07:34:32][5c208bf6706e24.78015170] Processed:  App\Jobs\Queue
[2018-12-24 07:34:32][5c208bf6709be0.07998731] Processing: App\Jobs\Queue
{"code":200,"msg":{"title":5,"author_id":2,"content":"\u5185\u5bb95","description":"\u63cf\u8ff05","updated_at":"2018-12-24 07:34:32","created_at":"2018-12-24 07:34:32","id":5}}[2018-12-24 07:34:32][5c208bf6709be0.07998731] Processed:  App\Jobs\Queue

注意:使用这个laravel-queue-rabbitmq这个包需要开启sockets拓展,不然会报错

本作品采用《CC 协议》,转载必须注明作者和本文链接
每天进步一点点,多年以后再回头,就会发现自己不知不觉走了很远很远
本帖由系统于 1年前 自动加精
《L04 微信小程序从零到发布》
从小程序个人账户申请开始,带你一步步进行开发一个微信小程序,直到提交微信控制台上线发布。
《L05 电商实战》
从零开发一个电商项目,功能包括电商后台、商品 & SKU 管理、购物车、订单管理、支付宝支付、微信支付、订单退款流程、优惠券等
讨论数量: 24

我想问下,使用这个包,还需要安装PHP的amqp扩展了么 ?

1年前 评论
CrazyZard

如果是使用消息分发机制,项目需要对进行不同频道的发送,那么exchange的值能否实时改变?

1年前 评论

@CrazyZard 不能,这个扩展是一个 connection 对应一个 exchange,多个exchange就要配置多个connection
或者不指定 exchange name, 这样的话 exchange 就跟 queue name一样了

1年前 评论
CrazyZard

@klgd 好的 我明白了 谢谢

1年前 评论

@houmuxu 一样的道理呀,在其他服务器配置这个服务器的mq账号密码和地址就好了

1年前 评论
Complicated

问下小白问题,mq和laravel 自带队列,那个好用些啊

1年前 评论
laosan123 1年前

@Complicated 其实我也是小白, 就我个人而言,如果是方便的话,肯定是自带的使用起来方便, 但是如果mq已经搭建好了的, 我还是选择用mq, mq可以多语言公用, 还有支持持久化。还有其他的我不知道怎么描述,你百度看看mq的好处吧

1年前 评论

上面这个例子是,加入队列,然后插入数据库?
后面的消费是什么意思?

1年前 评论

@lm604882597 你先理清楚什么是生产者什么是消费者。加入队列的时候叫做生产,出队列的时候叫做消费。本例中:生产就是生产数据,这时候把数据组装好,放进队列里面等待被消费。执行消费命令的时候,会执行出队操作也就是消费操作,这时候会把数据放进数据库

1年前 评论

我把RABBITMQ_ERROR_SLEEP配置成false的时候,偶尔就出现Error writing data to the connection with RabbitMQ这个错误,不设置的话,就消息没推送成功,但又没有提示。请问知道是什么原因吗?

1年前 评论
houmuxu

请问一下,你5.6的框架用的哪个版本的包,我5.5用最新的8.1的不好使

1年前 评论

建议你先了解一下原生的rabbitmq怎么写吧, 先了解rabbitmq的机制先再来整合会比较清晰点, https://www.cnblogs.com/php-linux/p/799461... https://www.cnblogs.com/linkenpark/p/53936... 这里有个例子还是讲的很清楚的, 你先看看吧 。 看你这个截图数据好像没问题啊, 你执行出队了没有。执行 php artisan queue:work rabbitmq 之后, 在 handle 里面可以拿到正常的数据的

1年前 评论
xugege

local.ERROR: AMQP error while attempting pushRaw: The connection timed out after 3 sec while awaiting incoming data . 请问我报这个错误应该如何处理这个异常,3秒这个超时可以配置吗?如果可以的话,请问如何配置

1年前 评论
1年前 评论
xugege 1年前
-client 5个月前

想问一下,这个扩展的多队列如何配置啊,还有rabbitmq的路由键支持吗?

10个月前 评论
一念沧海一念桑田 (楼主) 10个月前
halt-dudu (作者) 10个月前

请问下大牛,rabbitmq 这个插件如何手动确认消息,而不是消费之后自动确认删除,queue当中的auto_delete已经是false了但消费了之后貌似也会自动删除,是我理解的不对还是,请指教,谢谢啦

9个月前 评论
一念沧海一念桑田 (楼主) 5个月前

AMQP error while attempting pushRaw: Undefined index: name 有谁遇到过这种错误吗

7个月前 评论
xiaoksle 3个月前
singhania 2个月前
zero风来 (作者) 1个月前

The connection timed out after 3 sec while awaiting incoming data出现这个问题是什么造成的呢

5个月前 评论
一念沧海一念桑田 (楼主) 5个月前
-client (作者) 5个月前
一念沧海一念桑田 (楼主) 5个月前

这个只是简单的队列啊!怎么实现工作队列啊

3个月前 评论
一念沧海一念桑田 (楼主) 1个月前

这个不适用于生产环境

2个月前 评论
一念沧海一念桑田 (楼主) 1个月前

Class 'Interop\Amqp\AmqpTopic' not found

1个月前 评论

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!