基于Yii2对RabbitMQ的基本用法封装及队列实现(一)
介绍
该类库从功能点上实现了RabbitMQ的所有队列形式,并且结合《RabbitMQ实战指南》这本书的指导思想和yii2-queue扩展
的实战经验融合在了类库的封装。结合Yii2的组件概念,使队列的实现和使用更加的简洁轻便。除此之外还实现了RPC队列,可以实现跨项目调用。最后模拟supervisor对RabbitMQ的消费者进程实现了web管理。
关键词
1、普通队列:相比延时队列,没有启用延时功能。 exchange --rk--> queue 2、延时队列:相比普通队列多了个延时功能 exchange --rk-->queue --ttl-> delayQueue 3、备份路由:消息没有正常路由到预期的队列最后通过备份路由到备份队列 exchange ------>queue | \_ aeExchange --> aeQueue 4、队列副本:一个消息通过exchange路由到不同队列 | --rk0 --> queue0 exchange | --rk1---> queue1 |-- rk2 --> queue2 5、RPC队列:支持跨项目调用
功能点
- 实现了RabbitMQ的基本队列类型:普通队列、延时队列、优先队列、Rpc队列。
- 实现普通消费者和Rpc消费者
- 实现了备份路由
- 支持RabbitMQ的所有绑定类型。
- 支持不同的消费内容序列化
- 支持队列副本
- 支持事件订阅
- 支持单个消息发送和批量消息发送
- 新增配套消费者进程管理
- 结合队列副本,支持随机路由和轮询路由。(后续还会有加权路由以及动态路由!)
- 等等还在实现中
先谈谈普通队列的实现
结合Yii2组件的特性,实现起来也是非常简单:
step1:在Yii2配置组件
/** 普通队列定义 */
'easyQueue' => [
'class' => \pzr\amqp\queue\EasyQueue::class,
'host' => '127.0.0.1',
'port' => 5672,
'user' => 'guest',
'password' => 'guest',
'queueName' => 'easy_queue',
'exchangeName' => 'easy_exchange',
'routingKey' => 'easy',
// 'serizer' => \pzr\amqp\serializers\PhpSerializer, //default value
// 'dulicater' => \pzr\amqp\duplicate\DuplicateRandom, //default value
// 'duplicate' => 0, //队列的副本数,不启用则设置为0
// priority => 10, //定义优先级队列时配置
],
step2:在控制器中使用组件
Yii::$app->easyQueue->push(new CountJob([
'count' => 1,
]);
这里的CountJob是消费的实体,对象的定义如:
class CountJob extends \pzr\amqp\AmqpJob
{
public $count;
public function execute()
{
return $this->count;
}
}
step3:增加消费者
首先是消费者组件的配置:
/** 普通消费者 */
'consumer' => [
// 'class' => \pzr\amqp\Amqp::class,
'class' => \pzr\amqp\AmqpBase::class,
'host' => '127.0.0.1',
'port' => 5672,
'user' => 'guest',
'password' => 'guest',
],
然后在控制器中调用消费者组件:
class AmqpController extends Controller
{
// 普通消费者定义
public function actionConsumer($queueName, $qos)
{
Yii::$app->consumer->consume($queueName, $qos);
}
}
最后为了方便测试,可以在cli下启动消费者进程:
/usr/bin/php yii amqp/consumer queueName qos
为了方便对进程管理可以使用supervisor 。
延时队列的定义
延时队列的定义和普通队列几乎一样,只是在配置上稍微不同,如:
'delayQueue' => [
'class' => \pzr\amqp\queue\DelayQueue::class,
'host' => '127.0.0.1',
'port' => 5672,
'user' => 'guest',
'password' => 'guest',
'queueName' => '07_14_queue',
'exchangeName' => '07_14_exchange',
'exchangeType' => 'topic',
'routingKey' => '07_14_delay.*.',
'delayQueueName' => '07_14_delay_queue',
'delayExchangeName' => '07_14_delay_exchange',
'delayExchangeType' => 'topic',
'ttl' => 5000, //ms
'duplicate' => 2,
// Other driver options
],
以上配置会自动生成普通队列2个:07_14_queue_0
和07_14_queue_1
,并且消息在普通队列5000ms之后自动路由到延时队列07_14_delay_queue
。
优先队列定义
优先级队列的定义只需在配置上增加一个属性:priority
表示该队列最大的优先级。然后在发送消息的时候定义每条消息的优先级,不超过最大优先级即可:
Yii::$app->easyQueue->push(new CountJob([
'count' => 1,
'priority' => 1
]);
最后
其他的队列实现也是类似,但是Rpc队列会和其他的稍有不同。第一节就先讲这么多!感兴趣的话可以看看guthub的ReadMe(持续更新中)。有任何问题非常希望指正,提Issue,留言评论!
Composer的资源是(还没有稳定的版本,努力中):composer require pzr/amqp
下一节说说Rpc队列的实现!
本作品采用《CC 协议》,转载必须注明作者和本文链接
推荐文章: