1 消息死信的情况
消息变成死信有三种情况:消息被拒绝、消息过期、队列达到最大长度。
1.1 消息被拒绝
消息被拒绝(basic.reject / basic.nack),并且 requeue = false
$connect = new AMQPStreamConnection('127.0.0.1', 5672, 'admin', 'admin2021');
$channel = $connect->channel();
// 消费消息
$callback = function ($msg) {
// basic_reject() 第二个参数 $requeue = false
$msg->delivery_info['channel']->basic_reject($msg->delivery_info['delivery_tag'], false);
// 或者:
// basic_nack() 第二个参数 $requeue = false
// $msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag'], false);
};
$channel->basic_consume('normal_queue', '', false, false, false, false, $callback);
basic.reject 和 basic.nack 的区别在于:basic.nack 多了一个参数 multiple,可以一次拒绝或重新排队多条消息
basic_nack($delivery_tag, $multiple = false, $requeue = false)
basic_reject($delivery_tag, $requeue)
1.2 消息过期
设置了消息存活时间(TTL,Time To Live),并且过期了。
针对队列来说,可以使用
x-message-ttl
参数设置当前队列中所有消息的过期时间(单位毫秒)。一旦消息过期,就会从队列中抹去。$connect = new AMQPStreamConnection('127.0.0.1', 5672, 'admin', 'admin2021'); $channel = $connect->channel(); $argument = new AMQPTable(['x-message-ttl'=>10000]); // 设置消息 TTL 为 10s $channel->queue_declare('normal_queue', false, false, false, false, false, $argument);
针对单个消息来说,在发布消息时,可以使用
expiration
参数来设置单个消息的过期时间(单位毫秒)。要注意的是,即使消息过期,也不会马上从队列中抹去,因为每条消息是否过期是在即将投递到消费者之前判定的。$connect = new AMQPStreamConnection('127.0.0.1', 5672, 'admin', 'admin2021'); $channel = $connect->channel(); $properties = ['expiration'=> 10000]; // 设置消息 TTL 为 10s $message = new AMQPMessage("这是第{$i}条消息",$properties); $channel->basic_publish($message);
如果以上两个都设置,则以当前消息最短的那个过期时间为准。
Q:为什么这两种方法处理过期消息的方式不一样?
A:因为第一种方法里,队列中己过期的消息肯定在队列头部,RabbitMQ 只要定期从队头开始扫描是否有过期的消息即可。而第二种方法里,每条消
息的过期时间不同,如果要删除所有过期消息势必要扫描整个队列,所以不如等到此消息即将被消费时再判定是否过期,如果过期再进行删除即可。
1.3 队列达到最大长度
1.3.1 设置队列长度限制
- 可以通过设置
x-max-length
(队列声明参数,非负整数)来设置最大消息数。 - 可以通过设置
x-max-length-bytes
(队列声明参数,非负整数)来设置最大长度(以字节为单位)。
如果设置了两个参数,则两者都将适用,将强制执行首先达到的限制。
$argument = new AMQPTable([
'x-max-length' => 5, // 或 'x-max-length-bytes' => 1024
]);
$channel->queue_declare('limit_queue', false, false, false, false, false, $argument);
1.3.2 队列溢出行为
Queue Overflow Behaviour。可以通过 x-overflow
(队列声明参数,字符串)来设置溢出行为 。
可选值为 drop-head(默认)、 reject-publish 或 reject-publish-dlx。
drop-head:从队列前端(即队列中最旧的消息)删除或死信消息。
reject-publish:直接丢弃最近发布的消息。假设 x-max-length = 5,发送消息1-10,最终剩消息 1-5。
$argument = new AMQPTable([ 'x-max-length' => 5, 'x-overflow' => 'reject-publish' ]); $channel->queue_declare('limit_queue', false, false, false, false, false, $argument);
如果启用了
publisher confirm
(发布者确认),那么会使用 basic.nack 方法通知发送者消息被拒绝。如果消息被同时路由到多个队列,并且被其中最少一个队列拒绝,那么通道将会使用 basic.nack 方法来通知消费者消息被拒绝,但是消息仍可以继续发送给可以接受它的队列。reject-publish-dlx:最近发布的消息会进入死信队列。假设 x-max-length = 5,发送消息1-10,最终消息1-5进入队列,消息6-10会进入死信队列。
$argument = new AMQPTable([ 'x-max-length' => 5, 'x-dead-letter-exchange' => 'dl_exchange', // 配置死信交换机 'x-dead-letter-routing-key' => 'dl_route_key', // 配置 Routing Key,路由到 dl_exchange 'x-overflow' => 'reject-publish-dlx' ]); $channel->queue_declare('limit_queue', false, false, false, false, false, $argument);
如果文章有帮到你的话,别忘了点赞收藏噢 :smile:
推荐文章: