基于Yii2对RabbitMQ的基本用法封装及队列实现(一)

介绍

该类库从功能点上实现了RabbitMQ的所有队列形式,并且结合《RabbitMQ实战指南》这本书的指导思想和yii2-queue扩展的实战经验融合在了类库的封装。结合Yii2的组件概念,使队列的实现和使用更加的简洁轻便。除此之外还实现了RPC队列,可以实现跨项目调用。最后模拟supervisor对RabbitMQ的消费者进程实现了web管理。

关键词

1、普通队列:相比延时队列,没有启用延时功能。
exchange --rk--> queue

2、延时队列:相比普通队列多了个延时功能
exchange --rk-->queue --ttl-> delayQueue

3、备份路由:消息没有正常路由到预期的队列最后通过备份路由到备份队列
exchange ------>queue
            |
            \_ aeExchange --> aeQueue

4、队列副本:一个消息通过exchange路由到不同队列
         | --rk0 --> queue0
exchange | --rk1---> queue1
         |-- rk2 --> queue2

5、RPC队列:支持跨项目调用

功能点

  • 实现了RabbitMQ的基本队列类型:普通队列、延时队列、优先队列、Rpc队列。
  • 实现普通消费者和Rpc消费者
  • 实现了备份路由
  • 支持RabbitMQ的所有绑定类型。
  • 支持不同的消费内容序列化
  • 支持队列副本
  • 支持事件订阅
  • 支持单个消息发送和批量消息发送
  • 新增配套消费者进程管理
  • 结合队列副本,支持随机路由和轮询路由。(后续还会有加权路由以及动态路由!)
  • 等等还在实现中

先谈谈普通队列的实现

结合Yii2组件的特性,实现起来也是非常简单:
step1:在Yii2配置组件

/** 普通队列定义 */
    'easyQueue' => [
        'class' =>  \pzr\amqp\queue\EasyQueue::class,
        'host' => '127.0.0.1',
        'port' => 5672,
        'user' => 'guest',
        'password' => 'guest',
        'queueName' => 'easy_queue',
        'exchangeName' => 'easy_exchange',
        'routingKey' => 'easy',
        // 'serizer' => \pzr\amqp\serializers\PhpSerializer, //default value
        // 'dulicater' => \pzr\amqp\duplicate\DuplicateRandom, //default value
        // 'duplicate' => 0, //队列的副本数,不启用则设置为0
        // priority => 10, //定义优先级队列时配置
    ],

step2:在控制器中使用组件

Yii::$app->easyQueue->push(new CountJob([
    'count' => 1,
]);

这里的CountJob是消费的实体,对象的定义如:

class CountJob extends \pzr\amqp\AmqpJob
{
    public $count;
    public function execute()
    {
        return $this->count;
    }
}

step3:增加消费者
首先是消费者组件的配置:

/** 普通消费者 */
    'consumer' => [
        // 'class' =>  \pzr\amqp\Amqp::class,
        'class' =>  \pzr\amqp\AmqpBase::class,
        'host' => '127.0.0.1',
        'port' => 5672,
        'user' => 'guest',
        'password' => 'guest',
    ],

然后在控制器中调用消费者组件:

class AmqpController extends Controller
{
    // 普通消费者定义
    public function actionConsumer($queueName, $qos)
    {
        Yii::$app->consumer->consume($queueName, $qos);
    }
}

最后为了方便测试,可以在cli下启动消费者进程:

/usr/bin/php yii amqp/consumer queueName qos

为了方便对进程管理可以使用supervisor 。

延时队列的定义

延时队列的定义和普通队列几乎一样,只是在配置上稍微不同,如:

'delayQueue' => [
        'class' =>  \pzr\amqp\queue\DelayQueue::class,
        'host' => '127.0.0.1',
        'port' => 5672,
        'user' => 'guest',
        'password' => 'guest',
        'queueName' => '07_14_queue',
        'exchangeName' => '07_14_exchange',
        'exchangeType' => 'topic',
        'routingKey' => '07_14_delay.*.',
        'delayQueueName' => '07_14_delay_queue',
        'delayExchangeName' => '07_14_delay_exchange',
        'delayExchangeType' => 'topic',
        'ttl' => 5000, //ms
        'duplicate' => 2,
        // Other driver options
    ],

以上配置会自动生成普通队列2个:07_14_queue_007_14_queue_1,并且消息在普通队列5000ms之后自动路由到延时队列07_14_delay_queue

优先队列定义

优先级队列的定义只需在配置上增加一个属性:priority表示该队列最大的优先级。然后在发送消息的时候定义每条消息的优先级,不超过最大优先级即可:

Yii::$app->easyQueue->push(new CountJob([
    'count' => 1,
    'priority' => 1
]);

最后

其他的队列实现也是类似,但是Rpc队列会和其他的稍有不同。第一节就先讲这么多!感兴趣的话可以看看guthub的ReadMe(持续更新中)。有任何问题非常希望指正,提Issue,留言评论!
Composer的资源是(还没有稳定的版本,努力中):composer require pzr/amqp
下一节说说Rpc队列的实现!

本作品采用《CC 协议》,转载必须注明作者和本文链接
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!