Laravel RabbitMQ 工作队列



~ composer require php-amqplib/php-amqplib

创建一个初始化抽象类 InitAbstract


 * User: Loki.Q
 * Date: 2020/5/12
 * Time: 11:43

namespace App\Services\RabbitMQ;

use PhpAmqpLib\Connection\AMQPStreamConnection;

 * Class InitAbstract
 * @package App\Services\RabbitMQ
abstract class InitAbstract
     * @var AMQPStreamConnection
    protected $connection;

     * HOST地址
     * @var string
    private $host = '';

     * 端口
     * @var string
    private $port = '5672';

     * 账号
     * @var string
    private $user = 'test001';

     * 密码
     * @var string
    private $passwd = 'test001';

     * vhost
     * @var string
    private $vhost = 'hello';

     * 队列名称
     * @var string
    protected $queueName = 'test_queue';

     * 连接RabbitMQ
     * InitAbstract constructor.
    public function __construct()
        $this->connection = new AMQPStreamConnection(

     * 定义频道
     * @return \PhpAmqpLib\Channel\AMQPChannel
    protected function channel()
        $channel = $this->connection->channel();

         * 为了不让队列消失,需要把队列声明为持久化(durable)。
         * 为此我们通过queue_declare的第三参数为 true
            $this->queueName, false, true, false, false
        return  $channel;

定义一个生产者类 Producer

 * User: Loki.Q
 * Date: 2020/5/12
 * Time: 11:51

namespace App\Services\RabbitMQ;

use PhpAmqpLib\Message\AMQPMessage;

 * Class Producer
 * @package App\Services\RabbitMQ
class Producer extends InitAbstract
     * 发布消息
    public function push()
        $channel = $this->channel();

        $data = 'this is message';

        $msg = new AMQPMessage($data, [
            'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
            $msg, '', $this->queueName

        echo " [x] Sent ", $data, "\n";


定义消费者类 Consumer

 * User: Loki.Q
 * Date: 2020/5/12
 * Time: 13:44

namespace App\Services\RabbitMQ;

 * Class Consumer
 * @package App\Services\RabbitMQ
class Consumer extends InitAbstract
     * @throws \ErrorException
    public function wait()
        $channel = $this->channel();

        echo " [*] Waiting for messages. To exit press CTRL+C\n";

        $callback = function ($msg) {
            echo " [x] Received ", $msg->body, "\n";
            sleep(substr_count($msg->body, '.'));
            echo " [x] Done", "\n";


         * 这时因为RabbitMQ只管分发进入队列的消息,不会关心有多少消费者(consumer)没有作出响应。
         * 它盲目的把第n-th条消息发给第n-th个消费者。
         * 我们可以使用basic.qos方法,并设置prefetch_count=1。
         * 消息并且作出了响应。
         * 这样,RabbitMQ就会把消息分发给下一个空闲的工作者(worker)。
        $channel->basic_qos(null, 1, null);
            $this->queueName, '', false, false, false, false, $callback

        while (count($channel->callbacks)) {




Artisan::command('queue', function (\App\Services\RabbitMQ\Consumer $consumer) {
})->describe('rabbitmq queue');


 * User: Loki.Q
 * Date: 2020/5/12
 * Time: 13:37

namespace App\Http\Controllers;

use App\Services\RabbitMQ\Producer;

class ProducerController extends Controller
    public function handle(Producer $producer)
        return $producer->push();


~ curl http://xx.test/producer/push
 [x] Sent this is message


~ php artisan queue
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received this is message
 [x] Done
 [x] Received this is message
 [x] Done
 [x] Received this is message
 [x] Done
本作品采用《CC 协议》,转载必须注明作者和本文链接
《L02 从零构建论坛系统》
以构建论坛项目 LaraBBS 为线索,展开对 Laravel 框架的全面学习。应用程序架构思路贴近 Laravel 框架的设计哲学。
《L01 基础入门》
我们将带你从零开发一个项目并部署到线上,本课程教授 Web 开发中专业、实用的技能,如 Git 工作流、Laravel Mix 前端工作流等。
讨论数量: 1 rabbitmq的laravel扩展 了解一下

4年前 评论
