使用 RabbitMQ 实现延时队列
方案一:使用 RabbitMQ 自带的延迟队列(不推荐)
Dead Letter Exchange 其实就是一种普通的 exchange 。只是在某一个设置Dead Letter Exchange 的队列中有消息过期了会自动触发消息的转发,发送到 Dead Letter Exchange 中去。
实现原理:
RabbitMQ针对队列中的消息过期时间有两种方法可以设置。
- 通过设置队列的 “x-message-ttl” 属性,队列中所有消息都有相同的过期时间。
- 对消息进行单独设置,每条消息TTL可以不同。
- 如果同时使用,则消息的过期时间以两者之间TTL较小的那个数值为准。
注意事项:
两种方式是有区别的: 如果设置了队列的TTL属性,那么一旦消息过期,就会被队列丢弃,而第二种方式,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间,如果使用在消息属性上设置 TTL 的方式,消息可能并不会按时“死亡“,因为RabbitMQ只会检查第一个消息是否过期,如果过期则丢到死信队列,索引如果第一个消息的延时时长很长,而第二个消息的延时时长很短,则第二个消息并不会优先得到执行。
延时队列的消息流转流程
具体思路就是不同延时时间创建不同队列
代码
生产者伪代码
$channel = $connection->getChannel();
$delayExchange = 'delayed_' . $producerMessage->getExchange();
$delayQueue = 'delayed_queue_' . $producerMessage->getExchange() . $producerMessage->getTtl() . '_' . $delayTime;
$delayRoutingKey = $producerMessage->getRoutingKey() . $delayTime;
//定义延迟交换器
$channel->exchange_declare($delayExchange, 'topic', false, true, false);
//定义延迟队列
$channel->queue_declare($delayQueue, false, true, false, false, false, new AMQPTable(array(
"x-dead-letter-exchange" => $producerMessage->getExchange(),
"x-dead-letter-routing-key" => $producerMessage->getRoutingKey(),
"x-message-ttl" => $producerMessage->getTtl() * 1000,
)));
//绑定延迟队列到交换器上
$channel->queue_bind($delayQueue, $delayExchange, $delayRoutingKey);
$channel->basic_publish($message, $delayExchange, $delayRoutingKey);
消费者
略
方案二 使用 rabbitmq-delayed-message-exchange 插件
安装
- 下载文件后解压,并将其拷贝至 RabbitMQ plugins 目录。
- 启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchang
编写 Demo 代码
生产者
<?php
require_once __DIR__ . './vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
$connection = new AMQPStreamConnection('*.*.*.*', 5672, 'guest', 'guest');
$channel = $connection->channel();
$args = new AMQPTable(['x-delayed-type' => 'fanout']);
$channel->exchange_declare('delayed_exchange_test', 'x-delayed-message', false, true, false, false, false, $args);
$args = new AMQPTable(['x-dead-letter-exchange' => 'delayed']);
$channel->queue_declare('delayed_queue_test', false, true, false, false, false, $args);
$channel->queue_bind('delayed_queue_test', 'delayed_exchange_test');
$data = 'Hello World at ' . date('Y-m-d H:i:s');
$delay = 10000;
$message = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
$headers = new AMQPTable(['x-delay' => $delay]);
$message->set('application_headers', $headers);
$channel->basic_publish($message, 'delayed_exchange_test');
printf(' [x] Message sent: %s %s', $data, PHP_EOL);
$channel->close();
$connection->close();
消费者
require_once __DIR__ . './vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
$connection = new AMQPStreamConnection('*.*.*.*', 5672, 'guest', 'guest');
$channel = $connection->channel();
$callback = function (AMQPMessage $message) {
printf(' [x] Message received: %s %s', $message->body, \Carbon\Carbon::now()->toDateTimeString().PHP_EOL);
$message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
};
$channel->basic_consume('delayed_queue_test', '', false, false, false, false, $callback);
while (count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
查看效果
发消息
收消息
到此延迟队列已实现。
本作品采用《CC 协议》,转载必须注明作者和本文链接
推荐文章: