1 原生实现

未匹配的标注

基本原理:利用 TTL 和 死信队列。其中 TTL 分为:队列 TTL 和消息 TTL。

队列 TTL 的缺点:每出现一个新的 TTL 需求,就要新增一个队列,不灵活。

消息 TTL 的缺点:存在时序问题。比如依次往队列发布两条消息: msg_1(ttl=15s)、msg_2(ttl=3s),我们想要实现的是消息到指定 ttl 时死信,但实际上RabbitMQ 会按照先进先出的原则,先处理 msg_1,再处理 msg_2,也就是说 msg_2 可能在 18s 后才死信。

1.1 队列TTL和死信队列

1.设置队列 TTL:

public function index()
{
    $connect = new AMQPStreamConnection('127.0.0.1', 5672, 'admin', 'admin2021');
    $channel = $connect->channel();
    $channel->exchange_declare('normal_exchange', AMQPExchangeType::DIRECT, false, true, false);

    // 设置队列TTL和死信交换机
    $argument = new AMQPTable([
        'x-message-ttl'             => 10000,                // 10s,设置消息存活时间(TTL,Time To Live)
        'x-dead-letter-exchange'    => 'dl_exchange',        // 配置死信交换机
        'x-dead-letter-routing-key' => 'dl_route_key',       // 配置 Routing Key,路由到 dl_exchange
    ]);
    $channel->queue_declare('normal_queue', false, false, false, false, false, $argument);
    $channel->queue_bind('normal_queue', 'normal_exchange', 'normal_route_key');
    $channel->basic_qos(0, 1, false);

    // 消费消息
    $callback = function ($msg) {
        print_r(">receive the message:".$msg->body."\n");
        $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
    };
    $channel->basic_consume('normal_queue', '', false, false, false, false, $callback);

    print_r("consuming...\n");
    while ($channel->is_consuming()) {
        $channel->wait();
    }
    $channel->close();
    $connect->close();
}

2.配置死信队列:

public function index()
{
    $connect = new AMQPStreamConnection('127.0.0.1', 5672, 'admin', 'admin2021');
    $channel = $connect->channel();

    // 声明死信交换机
    $channel->exchange_declare('dl_exchange', AMQPExchangeType::DIRECT, false, true, false);

    // 声明死信队列
    $channel->queue_declare('dl_queue', false, false, false, false);

    // 死信队列绑定到死信交换机,Routing Key 为“dl_route_key”
    $channel->queue_bind('dl_queue', 'dl_exchange', 'dl_route_key');

    // 消费死信消息
    $callback = function ($msg) {
        print_r(">receive the message:".$msg->body."\n");
        $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
    };
    $channel->basic_consume('dl_queue', '', false, false, false, false, $callback);

    print_r("consuming...\n");
    while ($channel->is_consuming()) {
        $channel->wait();
    }
    $channel->close();
    $connect->close();
}

1.2 消息TTL和死信队列

1.设置消息 TTL:

public function index()
{
    $connection = new AMQPStreamConnection('127.0.0.1', 5672, 'admin', 'admin2021');
    $channel    = $connection->channel();

    // 发送消息
    for ($i = 1; $i <= 10; $i++) {
        $properties = ['expiration' => 10000]; // ttl = 10s
        $message    = new AMQPMessage("这是第{$i}条消息", $properties);
        $channel->basic_publish($message, 'normal_exchange', 'normal_route_key');
    }

    $channel->close();
    $connection->close();
}

2.配置死信队列:如上。

如果文章有帮到你的话,别忘了点赞收藏噢 :smile:

本文章首发在 LearnKu.com 网站上。

上一篇 下一篇
讨论数量: 0
发起讨论 查看所有版本


暂无话题~