一个简单的 Amqp 封装

先说一下,为什么会实现这么一个扩展吧。平常使用PHP写东西,都是独立项目,所以我们可能只需要一个redis-list-queue,来实现消息队列。但当你项目间需要交互时,redis list 显然已经不满足条件了。这个时候,我们可以使用swoole实现一个简单的RPC组件,但是有时候我们还是必须要使用消息队列的订阅发布功能。所以,我在swoft/php-amqplib的基础上封装了这个组件。

一个简单的Amqp封装

使用

使用上十分简单,首先我们定义一个Connection类。

use yii\base\StaticInstanceTrait;
class YiiConnection extends \Swoftx\Amqplib\Connection
{
    use StaticInstanceTrait;
}

然后便是我们的消息类

use yii\base\StaticInstanceTrait;
use Swoftx\Amqplib\Connection;
use Swoftx\Amqplib\Message\Publisher;
use YiiConnection;

class DemoMessage extends Publisher
{
    protected $exchange = 'demo';

    protected $routingKey = 'test';

    public function getConnection(): Connection
    {
        return YiiConnection::instance()->build();
    }
}

接下来就是发送消息

use DemoMessage;

$msg = new DemoMessage();
$msg->setData(['id' => $id]);
$msg->publish();

DemoMessage::make()->setData(['id'=>$id])->publish();

是不是相比下面的一坨代码,清爽很多呢??

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$exchange = 'router';
$queue = 'msgs';
$connection = new AMQPStreamConnection(HOST, PORT, USER, PASS, VHOST);
$channel = $connection->channel();
/*
    The following code is the same both in the consumer and the producer.
    In this way we are sure we always have a queue to consume from and an
        exchange where to publish messages.
*/
/*
    name: $queue
    passive: false
    durable: true // the queue will survive server restarts
    exclusive: false // the queue can be accessed in other channels
    auto_delete: false //the queue won't be deleted once the channel is closed.
*/
$channel->queue_declare($queue, false, true, false, false);
/*
    name: $exchange
    type: direct
    passive: false
    durable: true // the exchange will survive server restarts
    auto_delete: false //the exchange won't be deleted once the channel is closed.
*/
$channel->exchange_declare($exchange, 'direct', false, true, false);
$channel->queue_bind($queue, $exchange);
$messageBody = implode(' ', array_slice($argv, 1));
$message = new AMQPMessage($messageBody, array('content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT));
$channel->basic_publish($message, $exchange);
$channel->close();
$connection->close();
本作品采用《CC 协议》,转载必须注明作者和本文链接
Any fool can write code that a computer can understand. Good programmers write code that humans can understand.
《L05 电商实战》
从零开发一个电商项目,功能包括电商后台、商品 & SKU 管理、购物车、订单管理、支付宝支付、微信支付、订单退款流程、优惠券等
《G01 Go 实战入门》
从零开始带你一步步开发一个 Go 博客项目,让你在最短的时间内学会使用 Go 进行编码。项目结构很大程度上参考了 Laravel。
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!