1 消息死信的情况

未匹配的标注

消息变成死信有三种情况:消息被拒绝、消息过期、队列达到最大长度。

1.1 消息被拒绝

消息被拒绝(basic.reject / basic.nack),并且 requeue = false

$connect  = new AMQPStreamConnection('127.0.0.1', 5672, 'admin', 'admin2021');
$channel  = $connect->channel();

// 消费消息
$callback = function ($msg) {
    // basic_reject() 第二个参数 $requeue = false
    $msg->delivery_info['channel']->basic_reject($msg->delivery_info['delivery_tag'], false); 

    // 或者:
    // basic_nack() 第二个参数 $requeue = false
    // $msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag'], false);
};

$channel->basic_consume('normal_queue', '', false, false, false, false, $callback);

basic.reject 和 basic.nack 的区别在于:basic.nack 多了一个参数 multiple,可以一次拒绝或重新排队多条消息

basic_nack($delivery_tag, $multiple = false, $requeue = false)
basic_reject($delivery_tag, $requeue)

1.2 消息过期

设置了消息存活时间(TTL,Time To Live),并且过期了。

  • 针对队列来说,可以使用 x-message-ttl 参数设置当前队列中所有消息的过期时间(单位毫秒)。一旦消息过期,就会从队列中抹去。

    $connect  = new AMQPStreamConnection('127.0.0.1', 5672, 'admin', 'admin2021');
    $channel  = $connect->channel();
    
    $argument = new AMQPTable(['x-message-ttl'=>10000]);  // 设置消息 TTL 为 10s
    $channel->queue_declare('normal_queue', false, false, false, false, false, $argument);
  • 针对单个消息来说,在发布消息时,可以使用 expiration 参数来设置单个消息的过期时间(单位毫秒)。要注意的是,即使消息过期,也不会马上从队列中抹去,因为每条消息是否过期是在即将投递到消费者之前判定的。

    $connect  = new AMQPStreamConnection('127.0.0.1', 5672, 'admin', 'admin2021');
    $channel  = $connect->channel();
    
    $properties = ['expiration'=> 10000];                // 设置消息 TTL 为 10s
    $message = new AMQPMessage("这是第{$i}条消息",$properties);
    $channel->basic_publish($message);

如果以上两个都设置,则以当前消息最短的那个过期时间为准。

Q:为什么这两种方法处理过期消息的方式不一样?

A:因为第一种方法里,队列中己过期的消息肯定在队列头部,RabbitMQ 只要定期从队头开始扫描是否有过期的消息即可。而第二种方法里,每条消
息的过期时间不同,如果要删除所有过期消息势必要扫描整个队列,所以不如等到此消息即将被消费时再判定是否过期,如果过期再进行删除即可。

1.3 队列达到最大长度

1.3.1 设置队列长度限制

  • 可以通过设置 x-max-length (队列声明参数,非负整数)来设置最大消息数。
  • 可以通过设置 x-max-length-bytes (队列声明参数,非负整数)来设置最大长度(以字节为单位)。

如果设置了两个参数,则两者都将适用,将强制执行首先达到的限制。

$argument = new AMQPTable([ 
    'x-max-length' => 5, // 或 'x-max-length-bytes' => 1024
]);
$channel->queue_declare('limit_queue', false, false, false, false, false, $argument);

1.3.2 队列溢出行为

Queue Overflow Behaviour。可以通过 x-overflow (队列声明参数,字符串)来设置溢出行为 。

可选值为 drop-head(默认)、 reject-publish 或 reject-publish-dlx。

  • drop-head:从队列前端(即队列中最旧的消息)删除或死信消息。

  • reject-publish:直接丢弃最近发布的消息。假设 x-max-length = 5,发送消息1-10,最终剩消息 1-5。

    $argument = new AMQPTable([
        'x-max-length' => 5,
        'x-overflow'   => 'reject-publish'
    ]);
    $channel->queue_declare('limit_queue', false, false, false, false, false, $argument);

    如果启用了publisher confirm(发布者确认),那么会使用 basic.nack 方法通知发送者消息被拒绝。如果消息被同时路由到多个队列,并且被其中最少一个队列拒绝,那么通道将会使用 basic.nack 方法来通知消费者消息被拒绝,但是消息仍可以继续发送给可以接受它的队列。

  • reject-publish-dlx:最近发布的消息会进入死信队列。假设 x-max-length = 5,发送消息1-10,最终消息1-5进入队列,消息6-10会进入死信队列。

    $argument = new AMQPTable([
        'x-max-length'              => 5,
        'x-dead-letter-exchange'    => 'dl_exchange',       // 配置死信交换机
        'x-dead-letter-routing-key' => 'dl_route_key',      // 配置 Routing Key,路由到 dl_exchange
        'x-overflow'                => 'reject-publish-dlx'
    ]);
    $channel->queue_declare('limit_queue', false, false, false, false, false, $argument);

如果文章有帮到你的话,别忘了点赞收藏噢 :smile:

本文章首发在 LearnKu.com 网站上。

上一篇 下一篇
讨论数量: 0
发起讨论 只看当前版本


暂无话题~