Laravel5.6 整合 RabbitMQ 消息队列

1、Composer安装laravel-queue-rabbitmq
composer require vladimir-yuldashev/laravel-queue-rabbitmq
2、在config/app.php文件中,providers中添加:
VladimirYuldashev\LaravelQueueRabbitMQ\LaravelQueueRabbitMQServiceProvider::class,
3、在app/config/queue.php配置文件中的connections数组中加入以下配置

'rabbitmq' => [

            'driver' => 'rabbitmq',

            'dsn' => env('RABBITMQ_DSN', null),

            /*
             * Could be one a class that implements \Interop\Amqp\AmqpConnectionFactory for example:
             *  - \EnqueueAmqpExt\AmqpConnectionFactory if you install enqueue/amqp-ext
             *  - \EnqueueAmqpLib\AmqpConnectionFactory if you install enqueue/amqp-lib
             *  - \EnqueueAmqpBunny\AmqpConnectionFactory if you install enqueue/amqp-bunny
             */

            'factory_class' => Enqueue\AmqpLib\AmqpConnectionFactory::class,

            'host' => env('RABBITMQ_HOST', '127.0.0.1'),
            'port' => env('RABBITMQ_PORT', 5672),

            'vhost' => env('RABBITMQ_VHOST', '/'),
            'login' => env('RABBITMQ_LOGIN', 'guest'),
            'password' => env('RABBITMQ_PASSWORD', 'guest'),

            'queue' => env('RABBITMQ_QUEUE', 'default'),

            'options' => [

                'exchange' => [

                    'name' => env('RABBITMQ_EXCHANGE_NAME'),

                    /*
                     * Determine if exchange should be created if it does not exist.
                     */

                    'declare' => env('RABBITMQ_EXCHANGE_DECLARE', true),

                    /*
                     * Read more about possible values at https://www.rabbitmq.com/tutorials/amqp-concepts.html
                     */

                    'type' => env('RABBITMQ_EXCHANGE_TYPE', \Interop\Amqp\AmqpTopic::TYPE_DIRECT),
                    'passive' => env('RABBITMQ_EXCHANGE_PASSIVE', false),
                    'durable' => env('RABBITMQ_EXCHANGE_DURABLE', true),
                    'auto_delete' => env('RABBITMQ_EXCHANGE_AUTODELETE', false),
                    'arguments' => env('RABBITMQ_EXCHANGE_ARGUMENTS'),
                ],

                'queue' => [

                    /*
                     * Determine if queue should be created if it does not exist.
                     */

                    'declare' => env('RABBITMQ_QUEUE_DECLARE', true),

                    /*
                     * Determine if queue should be binded to the exchange created.
                     */

                    'bind' => env('RABBITMQ_QUEUE_DECLARE_BIND', true),

                    /*
                     * Read more about possible values at https://www.rabbitmq.com/tutorials/amqp-concepts.html
                     */

                    'passive' => env('RABBITMQ_QUEUE_PASSIVE', false),
                    'durable' => env('RABBITMQ_QUEUE_DURABLE', true),
                    'exclusive' => env('RABBITMQ_QUEUE_EXCLUSIVE', false),
                    'auto_delete' => env('RABBITMQ_QUEUE_AUTODELETE', false),
                    'arguments' => env('RABBITMQ_QUEUE_ARGUMENTS'),
                ],
            ],

            /*
             * Determine the number of seconds to sleep if there's an error communicating with rabbitmq
             * If set to false, it'll throw an exception rather than doing the sleep for X seconds.
             */

            'sleep_on_error' => env('RABBITMQ_ERROR_SLEEP', 5),

            /*
             * Optional SSL params if an SSL connection is used
             * Using an SSL connection will also require to configure your RabbitMQ to enable SSL. More details can be founds here: https://www.rabbitmq.com/ssl.html
             */

            'ssl_params' => [
                'ssl_on' => env('RABBITMQ_SSL', false),
                'cafile' => env('RABBITMQ_SSL_CAFILE', null),
                'local_cert' => env('RABBITMQ_SSL_LOCALCERT', null),
                'local_key' => env('RABBITMQ_SSL_LOCALKEY', null),
                'verify_peer' => env('RABBITMQ_SSL_VERIFY_PEER', true),
                'passphrase' => env('RABBITMQ_SSL_PASSPHRASE', null),
            ],

        ],

4、修改 .env 文件


QUEUE_CONNECTION=rabbitmq    #这个配置env一般会有先找到修改为这个

#以下是新增配置

RABBITMQ_HOST=rabbitmq  #mq的服务器地址,我这里用的是laradock,具体的就具体修改咯
RABBITMQ_PORT=5672  #mq的端口
RABBITMQ_VHOST=/
RABBITMQ_LOGIN=guest    #mq的登录名
RABBITMQ_PASSWORD=guest   #mq的密码
RABBITMQ_QUEUE=queue_name   #mq的队列名称

5、创建任务类
php artisan make:job Queue
执行之后会生成一个文件app/Jobs/Queue.php

例子:

<?php

namespace App\Jobs;

use App\Entities\Posts;
use Illuminate\Bus\Queueable;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;

class Queue  implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    private $data;

    /**
     * Queue constructor.
     * @param $data
     */
    public function __construct($data)
    {
        $this->data = $data;
    }

    /**
     * Execute the job.
     *
     * @return void
     */
    public function handle()
    {

        try{
            $insert = [
                'title'=>$this->data->title,
                'author_id'=>$this->data->author_id,
                'content'=>$this->data->content,
                'description'=>$this->data->description,
            ];
            $result = Posts::create($insert);
            echo json_encode(['code' => 200, 'msg' => $result]);
        }catch (\Exception $exception) {
            echo json_encode(['code'=>0,'msg'=>$exception->getMessage()]);
        }

    }
}

6、生产,把数据放进mq队列

<?php

namespace App\Http\Controllers;

use App\Entities\CostaNews;
use App\Jobs\Queue;

class IndexController extends Controller
{

    public function index()
    {
        $data = CostaNews::get();
        foreach ($data as $item) {
            $this->dispatch(new Queue($item));
        }
        return response()->json(['code'=>0, 'msg'=>"success"]);
    }

}

7、消费队列
执行命令进行消费:
php artisan queue:work rabbitmq
效果如下:

root@9e99cf9fba73:/var/www/blog# php artisan  queue:work rabbitmq
[2018-12-24 07:34:32][5c208bf66e63b3.56379160] Processing: App\Jobs\Queue
{"code":200,"msg":{"title":1,"author_id":2,"content":"\u5185\u5bb9","description":"\u63cf\u8ff0","updated_at":"2018-12-24 07:34:32","created_at":"2018-12-24 07:34:32","id":1}}[2018-12-24 07:34:32][5c208bf66e63b3.56379160] Processed:  App\Jobs\Queue
[2018-12-24 07:34:32][5c208bf66ff7c3.20969590] Processing: App\Jobs\Queue
{"code":200,"msg":{"title":2,"author_id":2,"content":"\u5185\u5bb92","description":"\u63cf\u8ff02","updated_at":"2018-12-24 07:34:32","created_at":"2018-12-24 07:34:32","id":2}}[2018-12-24 07:34:32][5c208bf66ff7c3.20969590] Processed:  App\Jobs\Queue
[2018-12-24 07:34:32][5c208bf6702695.93123122] Processing: App\Jobs\Queue
{"code":200,"msg":{"title":3,"author_id":2,"content":"\u5185\u5bb93","description":"\u63cf\u8ff03","updated_at":"2018-12-24 07:34:32","created_at":"2018-12-24 07:34:32","id":3}}[2018-12-24 07:34:32][5c208bf6702695.93123122] Processed:  App\Jobs\Queue
[2018-12-24 07:34:32][5c208bf6706e24.78015170] Processing: App\Jobs\Queue
{"code":200,"msg":{"title":4,"author_id":2,"content":"\u5185\u5bb94","description":"\u63cf\u8ff04","updated_at":"2018-12-24 07:34:32","created_at":"2018-12-24 07:34:32","id":4}}[2018-12-24 07:34:32][5c208bf6706e24.78015170] Processed:  App\Jobs\Queue
[2018-12-24 07:34:32][5c208bf6709be0.07998731] Processing: App\Jobs\Queue
{"code":200,"msg":{"title":5,"author_id":2,"content":"\u5185\u5bb95","description":"\u63cf\u8ff05","updated_at":"2018-12-24 07:34:32","created_at":"2018-12-24 07:34:32","id":5}}[2018-12-24 07:34:32][5c208bf6709be0.07998731] Processed:  App\Jobs\Queue

注意:使用这个laravel-queue-rabbitmq这个包需要开启sockets拓展,不然会报错

每天进步一点点,多年以后再回头,就会发现自己不知不觉走了很远很远

本帖由系统于 9个月前 自动加精
《L03 构架 API 服务器》
你将学到如 RESTFul 设计风格、PostMan 的使用、OAuth 流程,JWT 概念及使用 和 API 开发相关的进阶知识。
《L04 微信小程序从零到发布》
从小程序个人账户申请开始,带你一步步进行开发一个微信小程序,直到提交微信控制台上线发布。
讨论数量: 18

我想问下,使用这个包,还需要安装PHP的amqp扩展了么 ?

9个月前 评论
CrazyZard

如果是使用消息分发机制,项目需要对进行不同频道的发送,那么exchange的值能否实时改变?

9个月前 评论

@CrazyZard 不能,这个扩展是一个 connection 对应一个 exchange,多个exchange就要配置多个connection
或者不指定 exchange name, 这样的话 exchange 就跟 queue name一样了

8个月前 评论
CrazyZard

@klgd 好的 我明白了 谢谢

8个月前 评论

@houmuxu 一样的道理呀,在其他服务器配置这个服务器的mq账号密码和地址就好了

7个月前 评论
Complicated

问下小白问题,mq和laravel 自带队列,那个好用些啊

5个月前 评论
laosan123 3周前

@Complicated 其实我也是小白, 就我个人而言,如果是方便的话,肯定是自带的使用起来方便, 但是如果mq已经搭建好了的, 我还是选择用mq, mq可以多语言公用, 还有支持持久化。还有其他的我不知道怎么描述,你百度看看mq的好处吧

5个月前 评论

上面这个例子是,加入队列,然后插入数据库?
后面的消费是什么意思?

5个月前 评论

@lm604882597 你先理清楚什么是生产者什么是消费者。加入队列的时候叫做生产,出队列的时候叫做消费。本例中:生产就是生产数据,这时候把数据组装好,放进队列里面等待被消费。执行消费命令的时候,会执行出队操作也就是消费操作,这时候会把数据放进数据库

5个月前 评论

我把RABBITMQ_ERROR_SLEEP配置成false的时候,偶尔就出现Error writing data to the connection with RabbitMQ这个错误,不设置的话,就消息没推送成功,但又没有提示。请问知道是什么原因吗?

4个月前 评论
houmuxu

请问一下,你5.6的框架用的哪个版本的包,我5.5用最新的8.1的不好使

2个月前 评论
houmuxu

@一念沧海一念桑田 还是要麻烦一下 :joy:,当类型是direct时需要routing_key,但是我没找到routing_key怎么设置,默认是和queue名一致,然后就是,如果我要获取里面的数据是不是一定要用laravel的job获取?我自己通过mq获取到数据但是无法解析出来,外面的一层是json,里面data里的好像是序列化,但是反序列化解不出来,aaaa,xxxx,cccccc是我写入的数据

file

2个月前 评论

建议你先了解一下原生的rabbitmq怎么写吧, 先了解rabbitmq的机制先再来整合会比较清晰点, https://www.cnblogs.com/php-linux/p/799461... https://www.cnblogs.com/linkenpark/p/53936... 这里有个例子还是讲的很清楚的, 你先看看吧 。 看你这个截图数据好像没问题啊, 你执行出队了没有。执行 php artisan queue:work rabbitmq 之后, 在 handle 里面可以拿到正常的数据的

2个月前 评论
xugege

local.ERROR: AMQP error while attempting pushRaw: The connection timed out after 3 sec while awaiting incoming data . 请问我报这个错误应该如何处理这个异常,3秒这个超时可以配置吗?如果可以的话,请问如何配置

2个月前 评论
2个月前 评论
xugege 2个月前

请勿发布不友善或者负能量的内容。与人为善,比聪明更重要!