1 原生实现
基本原理:利用 TTL 和 死信队列。其中 TTL 分为:队列 TTL 和消息 TTL。
队列 TTL 的缺点:每出现一个新的 TTL 需求,就要新增一个队列,不灵活。
消息 TTL 的缺点:存在时序问题。比如依次往队列发布两条消息: msg_1(ttl=15s)、msg_2(ttl=3s),我们想要实现的是消息到指定 ttl 时死信,但实际上RabbitMQ 会按照先进先出的原则,先处理 msg_1,再处理 msg_2,也就是说 msg_2 可能在 18s 后才死信。
1.1 队列TTL和死信队列
1.设置队列 TTL:
public function index()
{
$connect = new AMQPStreamConnection('127.0.0.1', 5672, 'admin', 'admin2021');
$channel = $connect->channel();
$channel->exchange_declare('normal_exchange', AMQPExchangeType::DIRECT, false, true, false);
// 设置队列TTL和死信交换机
$argument = new AMQPTable([
'x-message-ttl' => 10000, // 10s,设置消息存活时间(TTL,Time To Live)
'x-dead-letter-exchange' => 'dl_exchange', // 配置死信交换机
'x-dead-letter-routing-key' => 'dl_route_key', // 配置 Routing Key,路由到 dl_exchange
]);
$channel->queue_declare('normal_queue', false, false, false, false, false, $argument);
$channel->queue_bind('normal_queue', 'normal_exchange', 'normal_route_key');
$channel->basic_qos(0, 1, false);
// 消费消息
$callback = function ($msg) {
print_r(">receive the message:".$msg->body."\n");
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
$channel->basic_consume('normal_queue', '', false, false, false, false, $callback);
print_r("consuming...\n");
while ($channel->is_consuming()) {
$channel->wait();
}
$channel->close();
$connect->close();
}
2.配置死信队列:
public function index()
{
$connect = new AMQPStreamConnection('127.0.0.1', 5672, 'admin', 'admin2021');
$channel = $connect->channel();
// 声明死信交换机
$channel->exchange_declare('dl_exchange', AMQPExchangeType::DIRECT, false, true, false);
// 声明死信队列
$channel->queue_declare('dl_queue', false, false, false, false);
// 死信队列绑定到死信交换机,Routing Key 为“dl_route_key”
$channel->queue_bind('dl_queue', 'dl_exchange', 'dl_route_key');
// 消费死信消息
$callback = function ($msg) {
print_r(">receive the message:".$msg->body."\n");
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
$channel->basic_consume('dl_queue', '', false, false, false, false, $callback);
print_r("consuming...\n");
while ($channel->is_consuming()) {
$channel->wait();
}
$channel->close();
$connect->close();
}
1.2 消息TTL和死信队列
1.设置消息 TTL:
public function index()
{
$connection = new AMQPStreamConnection('127.0.0.1', 5672, 'admin', 'admin2021');
$channel = $connection->channel();
// 发送消息
for ($i = 1; $i <= 10; $i++) {
$properties = ['expiration' => 10000]; // ttl = 10s
$message = new AMQPMessage("这是第{$i}条消息", $properties);
$channel->basic_publish($message, 'normal_exchange', 'normal_route_key');
}
$channel->close();
$connection->close();
}
2.配置死信队列:如上。
如果文章有帮到你的话,别忘了点赞收藏噢 :smile: