使用 RabbitMQ 实现延时队列

方案一:使用 RabbitMQ 自带的延迟队列(不推荐)

Dead Letter Exchange 其实就是一种普通的 exchange 。只是在某一个设置Dead Letter Exchange 的队列中有消息过期了会自动触发消息的转发,发送到 Dead Letter Exchange 中去。

实现原理:

RabbitMQ针对队列中的消息过期时间有两种方法可以设置。

  • 通过设置队列的 “x-message-ttl” 属性,队列中所有消息都有相同的过期时间。
  • 对消息进行单独设置,每条消息TTL可以不同。
  • 如果同时使用,则消息的过期时间以两者之间TTL较小的那个数值为准。

注意事项:

两种方式是有区别的: 如果设置了队列的TTL属性,那么一旦消息过期,就会被队列丢弃,而第二种方式,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间,如果使用在消息属性上设置 TTL 的方式,消息可能并不会按时“死亡“,因为RabbitMQ只会检查第一个消息是否过期,如果过期则丢到死信队列,索引如果第一个消息的延时时长很长,而第二个消息的延时时长很短,则第二个消息并不会优先得到执行。

延时队列的消息流转流程
使用 RabbitMQ 实现延时队列

具体思路就是不同延时时间创建不同队列

使用 RabbitMQ 实现延时队列

代码

生产者伪代码

$channel = $connection->getChannel();
$delayExchange   = 'delayed_' . $producerMessage->getExchange();
$delayQueue      = 'delayed_queue_' . $producerMessage->getExchange() . $producerMessage->getTtl() . '_' . $delayTime;
$delayRoutingKey = $producerMessage->getRoutingKey() . $delayTime;
//定义延迟交换器
$channel->exchange_declare($delayExchange, 'topic', false, true, false);
//定义延迟队列
$channel->queue_declare($delayQueue, false, true, false, false, false, new AMQPTable(array(
       "x-dead-letter-exchange"    => $producerMessage->getExchange(),
       "x-dead-letter-routing-key" => $producerMessage->getRoutingKey(),
       "x-message-ttl"             => $producerMessage->getTtl() * 1000,
       )));
//绑定延迟队列到交换器上
$channel->queue_bind($delayQueue, $delayExchange, $delayRoutingKey);
$channel->basic_publish($message, $delayExchange, $delayRoutingKey);

消费者

方案二 使用 rabbitmq-delayed-message-exchange 插件

安装

插件下载地址

  • 下载文件后解压,并将其拷贝至 RabbitMQ plugins 目录。
  • 启用插件 rabbitmq-plugins enable rabbitmq_delayed_message_exchang

编写 Demo 代码

生产者

<?php
require_once __DIR__ . './vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

$connection = new AMQPStreamConnection('*.*.*.*', 5672, 'guest', 'guest');
$channel    = $connection->channel();
$args       = new AMQPTable(['x-delayed-type' => 'fanout']);
$channel->exchange_declare('delayed_exchange_test', 'x-delayed-message', false, true, false, false, false, $args);
$args = new AMQPTable(['x-dead-letter-exchange' => 'delayed']);
$channel->queue_declare('delayed_queue_test', false, true, false, false, false, $args);
$channel->queue_bind('delayed_queue_test', 'delayed_exchange_test');
$data = 'Hello World at ' . date('Y-m-d H:i:s');
$delay = 10000;
$message = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
$headers = new AMQPTable(['x-delay' => $delay]);
$message->set('application_headers', $headers);
$channel->basic_publish($message, 'delayed_exchange_test');
printf(' [x] Message sent: %s %s', $data, PHP_EOL);
$channel->close();
$connection->close();

消费者

require_once __DIR__ . './vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

$connection = new AMQPStreamConnection('*.*.*.*', 5672, 'guest', 'guest');
$channel    = $connection->channel();
$callback = function (AMQPMessage $message) {
    printf(' [x] Message received: %s %s', $message->body, \Carbon\Carbon::now()->toDateTimeString().PHP_EOL);
    $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
};
$channel->basic_consume('delayed_queue_test', '', false, false, false, false, $callback);
while (count($channel->callbacks)) {
    $channel->wait();
}
$channel->close();
$connection->close();

查看效果

发消息
使用 RabbitMQ 实现延时队列

收消息

使用 RabbitMQ 实现延时队列
到此延迟队列已实现。

本作品采用《CC 协议》,转载必须注明作者和本文链接
当它本可进取时,却故作谦卑; 在困难和容易之间,它选择了容易; 自由软弱,却把它认为是生命的坚韧; 侧身于生活的污泥中,虽不甘心,却又畏首畏尾。
《L05 电商实战》
从零开发一个电商项目,功能包括电商后台、商品 & SKU 管理、购物车、订单管理、支付宝支付、微信支付、订单退款流程、优惠券等
《G01 Go 实战入门》
从零开始带你一步步开发一个 Go 博客项目,让你在最短的时间内学会使用 Go 进行编码。项目结构很大程度上参考了 Laravel。
讨论数量: 4

如果是个人项目,使用哪个方案都行。但如果用于生成环境,你的不推介的方案一反而是我推荐的。

方案二使用的插件不是一个正式插件,出现了问题rabbitmq团队不一定会去修复,开发这个插件的团队也声明过,这个插件的优先级不高。这个时候希望你懂erlang,能源码排查,不然就糟糕了。生成环境,稳定第一。

这些信息我技术调研过,生成环境一定是稳定优先。

4年前 评论
rufo (楼主) 4年前
rufo (楼主) 4年前
L学习不停 (作者) 4年前

延迟队列,laravel 自带的不行吗?

4年前 评论

@pan_zoe @Enzo_Lwb 你们说的自带,应该是指 sync同步队列。这是框架中用于测试的一个选项,并不是一个队列,是用于测试的。

消息队列本身就要具有异步属性, 从同步队列中就能看出来,这不属于队列。

说了这么多,你们在项目中使用一下就明白了。

4年前 评论
L学习不停 (作者) 4年前
pan_zoe 4年前
cheer 4年前

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!