Laravel RabbitMQ 工作队列
Laravel工作队列
安装组件
~ composer require php-amqplib/php-amqplib
创建一个初始化抽象类
InitAbstract
对连接及通用的定义进行简单的封装
<?php
/**
* User: Loki.Q
* Date: 2020/5/12
* Time: 11:43
*/
namespace App\Services\RabbitMQ;
use PhpAmqpLib\Connection\AMQPStreamConnection;
/**
* Class InitAbstract
* @package App\Services\RabbitMQ
*/
abstract class InitAbstract
{
/**
* @var AMQPStreamConnection
*/
protected $connection;
/**
* HOST地址
* @var string
*/
private $host = '127.0.0.1';
/**
* 端口
* @var string
*/
private $port = '5672';
/**
* 账号
* @var string
*/
private $user = 'test001';
/**
* 密码
* @var string
*/
private $passwd = 'test001';
/**
* vhost
* @var string
*/
private $vhost = 'hello';
/**
* 队列名称
* @var string
*/
protected $queueName = 'test_queue';
/**
* 连接RabbitMQ
* InitAbstract constructor.
*/
public function __construct()
{
$this->connection = new AMQPStreamConnection(
$this->host,
$this->port,
$this->user,
$this->passwd,
$this->vhost
);
}
/**
* 定义频道
* @return \PhpAmqpLib\Channel\AMQPChannel
*/
protected function channel()
{
$channel = $this->connection->channel();
/**
* 为了不让队列消失,需要把队列声明为持久化(durable)。
* 为此我们通过queue_declare的第三参数为 true
*/
$channel->queue_declare(
$this->queueName, false, true, false, false
);
return $channel;
}
}
定义一个生产者类
Producer
<?php
/**
* User: Loki.Q
* Date: 2020/5/12
* Time: 11:51
*/
namespace App\Services\RabbitMQ;
use PhpAmqpLib\Message\AMQPMessage;
/**
* Class Producer
* @package App\Services\RabbitMQ
*/
class Producer extends InitAbstract
{
/**
* 发布消息
*/
public function push()
{
$channel = $this->channel();
$data = 'this is message';
$msg = new AMQPMessage($data, [
//消息持久化
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
]);
$channel->basic_publish(
$msg, '', $this->queueName
);
echo " [x] Sent ", $data, "\n";
$channel->close();
$this->connection->close();
}
}
定义消费者类
Consumer
<?php
/**
* User: Loki.Q
* Date: 2020/5/12
* Time: 13:44
*/
namespace App\Services\RabbitMQ;
/**
* Class Consumer
* @package App\Services\RabbitMQ
*/
class Consumer extends InitAbstract
{
/**
* @throws \ErrorException
*/
public function wait()
{
$channel = $this->channel();
echo " [*] Waiting for messages. To exit press CTRL+C\n";
$callback = function ($msg) {
echo " [x] Received ", $msg->body, "\n";
sleep(substr_count($msg->body, '.'));
echo " [x] Done", "\n";
//手动确认消息
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
/**
* 这时因为RabbitMQ只管分发进入队列的消息,不会关心有多少消费者(consumer)没有作出响应。
* 它盲目的把第n-th条消息发给第n-th个消费者。
* 我们可以使用basic.qos方法,并设置prefetch_count=1。
* 消息并且作出了响应。
* 这样,RabbitMQ就会把消息分发给下一个空闲的工作者(worker)。
*/
$channel->basic_qos(null, 1, null);
$channel->basic_consume(
$this->queueName, '', false, false, false, false, $callback
);
while (count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$this->connection->close();
}
}
在
routes/console.php
中定义命令监听消费
Artisan::command('queue', function (\App\Services\RabbitMQ\Consumer $consumer) {
$consumer->wait();
})->describe('rabbitmq queue');
定义一个
Controller
来执行生产
<?php
/**
* User: Loki.Q
* Date: 2020/5/12
* Time: 13:37
*/
namespace App\Http\Controllers;
use App\Services\RabbitMQ\Producer;
class ProducerController extends Controller
{
public function handle(Producer $producer)
{
return $producer->push();
}
}
定义好路由后,执行生产
~ curl http://xx.test/producer/push
[x] Sent this is message
结果
~ php artisan queue
[*] Waiting for messages. To exit press CTRL+C
[x] Received this is message
[x] Done
[x] Received this is message
[x] Done
[x] Received this is message
[x] Done
本作品采用《CC 协议》,转载必须注明作者和本文链接
github.com/vyuldashev/laravel-queu... rabbitmq的laravel扩展 了解一下