一个简单的 Amqp 封装
先说一下,为什么会实现这么一个扩展吧。平常使用PHP写东西,都是独立项目,所以我们可能只需要一个redis-list-queue,来实现消息队列。但当你项目间需要交互时,redis list 显然已经不满足条件了。这个时候,我们可以使用swoole实现一个简单的RPC组件,但是有时候我们还是必须要使用消息队列的订阅发布功能。所以,我在swoft/php-amqplib的基础上封装了这个组件。
使用
使用上十分简单,首先我们定义一个Connection类。
use yii\base\StaticInstanceTrait;
class YiiConnection extends \Swoftx\Amqplib\Connection
{
use StaticInstanceTrait;
}
然后便是我们的消息类
use yii\base\StaticInstanceTrait;
use Swoftx\Amqplib\Connection;
use Swoftx\Amqplib\Message\Publisher;
use YiiConnection;
class DemoMessage extends Publisher
{
protected $exchange = 'demo';
protected $routingKey = 'test';
public function getConnection(): Connection
{
return YiiConnection::instance()->build();
}
}
接下来就是发送消息
use DemoMessage;
$msg = new DemoMessage();
$msg->setData(['id' => $id]);
$msg->publish();
DemoMessage::make()->setData(['id'=>$id])->publish();
是不是相比下面的一坨代码,清爽很多呢??
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$exchange = 'router';
$queue = 'msgs';
$connection = new AMQPStreamConnection(HOST, PORT, USER, PASS, VHOST);
$channel = $connection->channel();
/*
The following code is the same both in the consumer and the producer.
In this way we are sure we always have a queue to consume from and an
exchange where to publish messages.
*/
/*
name: $queue
passive: false
durable: true // the queue will survive server restarts
exclusive: false // the queue can be accessed in other channels
auto_delete: false //the queue won't be deleted once the channel is closed.
*/
$channel->queue_declare($queue, false, true, false, false);
/*
name: $exchange
type: direct
passive: false
durable: true // the exchange will survive server restarts
auto_delete: false //the exchange won't be deleted once the channel is closed.
*/
$channel->exchange_declare($exchange, 'direct', false, true, false);
$channel->queue_bind($queue, $exchange);
$messageBody = implode(' ', array_slice($argv, 1));
$message = new AMQPMessage($messageBody, array('content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT));
$channel->basic_publish($message, $exchange);
$channel->close();
$connection->close();
本作品采用《CC 协议》,转载必须注明作者和本文链接