Laravel RabbitMQ 工作队列

Laravel工作队列

安装组件

~ composer require php-amqplib/php-amqplib

创建一个初始化抽象类 InitAbstract

对连接及通用的定义进行简单的封装

<?php
/**
 * User: Loki.Q
 * Date: 2020/5/12
 * Time: 11:43
 */

namespace App\Services\RabbitMQ;


use PhpAmqpLib\Connection\AMQPStreamConnection;

/**
 * Class InitAbstract
 * @package App\Services\RabbitMQ
 */
abstract class InitAbstract
{
    /**
     * @var AMQPStreamConnection
     */
    protected $connection;

    /**
     * HOST地址
     * @var string
     */
    private $host = '127.0.0.1';

    /**
     * 端口
     * @var string
     */
    private $port = '5672';

    /**
     * 账号
     * @var string
     */
    private $user = 'test001';

    /**
     * 密码
     * @var string
     */
    private $passwd = 'test001';

    /**
     * vhost
     * @var string
     */
    private $vhost = 'hello';

    /**
     * 队列名称
     * @var string
     */
    protected $queueName = 'test_queue';

    /**
     * 连接RabbitMQ
     * InitAbstract constructor.
     */
    public function __construct()
    {
        $this->connection = new AMQPStreamConnection(
            $this->host,
            $this->port,
            $this->user,
            $this->passwd,
            $this->vhost
        );
    }

    /**
     * 定义频道
     * @return \PhpAmqpLib\Channel\AMQPChannel
     */
    protected function channel()
    {
        $channel = $this->connection->channel();

        /**
         * 为了不让队列消失,需要把队列声明为持久化(durable)。
         * 为此我们通过queue_declare的第三参数为 true
         */
        $channel->queue_declare(
            $this->queueName, false, true, false, false
        );
        return  $channel;
    }
}

定义一个生产者类 Producer

<?php
/**
 * User: Loki.Q
 * Date: 2020/5/12
 * Time: 11:51
 */

namespace App\Services\RabbitMQ;


use PhpAmqpLib\Message\AMQPMessage;

/**
 * Class Producer
 * @package App\Services\RabbitMQ
 */
class Producer extends InitAbstract
{
    /**
     * 发布消息
     */
    public function push()
    {
        $channel = $this->channel();

        $data = 'this is message';

        $msg = new AMQPMessage($data, [
            //消息持久化
            'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
        ]);
        $channel->basic_publish(
            $msg, '', $this->queueName
        );

        echo " [x] Sent ", $data, "\n";

        $channel->close();
        $this->connection->close();
    }
}

定义消费者类 Consumer

<?php
/**
 * User: Loki.Q
 * Date: 2020/5/12
 * Time: 13:44
 */

namespace App\Services\RabbitMQ;

/**
 * Class Consumer
 * @package App\Services\RabbitMQ
 */
class Consumer extends InitAbstract
{
    /**
     * @throws \ErrorException
     */
    public function wait()
    {
        $channel = $this->channel();

        echo " [*] Waiting for messages. To exit press CTRL+C\n";

        $callback = function ($msg) {
            echo " [x] Received ", $msg->body, "\n";
            sleep(substr_count($msg->body, '.'));
            echo " [x] Done", "\n";

            //手动确认消息
            $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
        };

        /**
         * 这时因为RabbitMQ只管分发进入队列的消息,不会关心有多少消费者(consumer)没有作出响应。
         * 它盲目的把第n-th条消息发给第n-th个消费者。
         * 我们可以使用basic.qos方法,并设置prefetch_count=1。
         * 消息并且作出了响应。
         * 这样,RabbitMQ就会把消息分发给下一个空闲的工作者(worker)。
         */
        $channel->basic_qos(null, 1, null);
        $channel->basic_consume(
            $this->queueName, '', false, false, false, false, $callback
        );

        while (count($channel->callbacks)) {
            $channel->wait();
        }

        $channel->close();
        $this->connection->close();
    }
}

routes/console.php中定义命令

监听消费

Artisan::command('queue', function (\App\Services\RabbitMQ\Consumer $consumer) {
    $consumer->wait();
})->describe('rabbitmq queue');

定义一个Controller来执行生产

<?php
/**
 * User: Loki.Q
 * Date: 2020/5/12
 * Time: 13:37
 */

namespace App\Http\Controllers;


use App\Services\RabbitMQ\Producer;

class ProducerController extends Controller
{
    public function handle(Producer $producer)
    {
        return $producer->push();
    }
}

定义好路由后,执行生产

~ curl http://xx.test/producer/push
 [x] Sent this is message

结果

~ php artisan queue
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received this is message
 [x] Done
 [x] Received this is message
 [x] Done
 [x] Received this is message
 [x] Done
本作品采用《CC 协议》,转载必须注明作者和本文链接
《L05 电商实战》
从零开发一个电商项目,功能包括电商后台、商品 & SKU 管理、购物车、订单管理、支付宝支付、微信支付、订单退款流程、优惠券等
《G01 Go 实战入门》
从零开始带你一步步开发一个 Go 博客项目,让你在最短的时间内学会使用 Go 进行编码。项目结构很大程度上参考了 Laravel。
讨论数量: 1

github.com/vyuldashev/laravel-queu... rabbitmq的laravel扩展 了解一下

3年前 评论

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!