3 死信队列案例

未匹配的标注

死信队列:绑定了死信交换机的队列,处理变成死信的消息。死信队列也是和正常队列没有区别,只是在声明的时候,配置 x-dead-letter-exchange参数。

3 死信队列案例

3.1 当消息被拒绝

当消息被拒绝时,消息进入死信队列,配置流程如下:

1.声明业务队列时,配置死信交换机和 Routing Key。

public function index()
{
    $connect = new AMQPStreamConnection('127.0.0.1', 5672, 'admin', 'admin2021');
    $channel = $connect->channel();
    $channel->exchange_declare('normal_exchange', AMQPExchangeType::DIRECT, false, true, false);

    // 声明业务队列的死信交换机
    $argument = new AMQPTable([
        'x-dead-letter-exchange'    => 'dl_exchange',        // 配置死信交换机
        'x-dead-letter-routing-key' => 'dl_route_key',       // 配置 Routing Key,路由到 dl_exchange
    ]);
    $channel->queue_declare('normal_queue', false, false, false, false, false, $argument);

    // 业务队列“normal_queue”绑定到业务交换机“normal_exchange”,Routing Key 为“normal_route_key”
    $channel->queue_bind('normal_queue', 'normal_exchange', 'normal_route_key');

    // 消费消息时,拒绝消息
    $callback = function ($msg) {
        $msg->delivery_info['channel']->basic_reject($msg->delivery_info['delivery_tag'], false);
    };
    $channel->basic_consume('normal_queue', '', false, false, false, false, $callback);

    print_r("consuming...\n");
    while ($channel->is_consuming()) {
        $channel->wait();
    }
    $channel->close();
    $connect->close();
}

2.为死信交换机配置死信队列。

public function index()
{
    $connect = new AMQPStreamConnection('127.0.0.1', 5672, 'admin', 'admin2021');
    $channel = $connect->channel();

    // 声明死信交换机
    $channel->exchange_declare('dl_exchange', AMQPExchangeType::DIRECT, false, true, false);

    // 声明死信队列
    $channel->queue_declare('dl_queue', false, false, false, false);

    // 死信队列绑定到死信交换机,Routing Key 为“dl_route_key”
    $channel->queue_bind('dl_queue', 'dl_exchange', 'dl_route_key');

    // 消费消息
    $callback = function ($msg) {
        print_r(">receive the message:".$msg->body."\n");
        $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
    };
    $channel->basic_consume('dl_queue', '', false, false, false, false, $callback);

    print_r("consuming...\n");
    while ($channel->is_consuming()) {
        $channel->wait();
    }
    $channel->close();
    $connect->close();
}

当消息在业务队列normal_queue 的消息被拒绝时,它会按照x-dead-letter-exchangex-dead-letter-routing-key重新将消息 publish 到死信交换机dl_exchange。此时,绑定了死信交换机的死信队列dl_queue就能订阅到消息。

3.2 当消息过期时

当消息过期时,消息进入死信队列,利用这一特性可以实现延迟队列,代码如下。

1.声明业务队列时,配置消息过期时间、死信交换机:

public function index()
{
    $connect = new AMQPStreamConnection('127.0.0.1', 5672, 'admin', 'admin2021');
    $channel = $connect->channel();
    $channel->exchange_declare('normal_exchange', AMQPExchangeType::DIRECT, false, true, false);

    // 声明队列
    $argument = new AMQPTable([
        'x-message-ttl'             => 10000,                // 10s,设置消息存活时间(TTL,Time To Live)
        'x-dead-letter-exchange'    => 'dl_exchange',        // 配置死信交换机
        'x-dead-letter-routing-key' => 'dl_route_key',       // 配置 Routing Key,路由到 dl_exchange
    ]);
    $channel->queue_declare('normal_queue', false, false, false, false, false, $argument);
    $channel->queue_bind('normal_queue', 'normal_exchange', 'normal_route_key');

    // 一次拉取一条消息
    $channel->basic_qos(0, 1, false);

    // 消费消息
    $callback = function ($msg) {
        sleep(5);  // 延时,所以处理两条消息后,后面的消息都已经过期
        $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
    };
    $channel->basic_consume('normal_queue', '', false, false, false, false, $callback);

    print_r("consuming...\n");
    while ($channel->is_consuming()) {
        $channel->wait();
    }
    $channel->close();
    $connect->close();
}

2.为死信交换机配置死信队列。(同上)

3.3 当队列达到最大长度

当队列达到最大长度限制时,消息进入死信队列。

1.声明业务队列时,配置队列长度限制、死信交换机:

public function index()
{
    $connect = new AMQPStreamConnection('127.0.0.1', 5672, 'admin', 'admin2021');
    $channel = $connect->channel();
    $channel->exchange_declare('normal_exchange', AMQPExchangeType::DIRECT, false, true, false);

    $argument = new AMQPTable([
        'x-max-length'              => 5,
        'x-dead-letter-exchange'    => 'dl_exchange',        // 配置死信交换机
        'x-dead-letter-routing-key' => 'dl_route_key',       // 配置 Routing Key,路由到 dl_exchange
        'x-overflow'                => 'reject-publish-dlx'
    ]);
    $channel->queue_declare('normal_queue', false, false, false, false, false, $argument);
    $channel->queue_bind('normal_queue', 'normal_exchange', 'normal_route_key');

    // 消费消息
    $callback = function ($msg) {
        print_r(">receive the message:".$msg->body."\n");
        $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
    };
    $channel->basic_consume('normal_queue', '', false, false, false, false, $callback);

    print_r("consuming...\n");
    while ($channel->is_consuming()) {
        $channel->wait();
    }
    $channel->close();
    $connect->close();
}

2.为死信交换机配置死信队列。(同上)

如果文章有帮到你的话,别忘了点赞收藏噢 :smile:

本文章首发在 LearnKu.com 网站上。

上一篇 下一篇
讨论数量: 0
发起讨论 查看所有版本


暂无话题~