1 消息确认

未匹配的标注

Message acknowledgment 消息确认。当消息被 RabbitMQ 发送给消费者之后,马上就会在内存中移除。这种情况,你只要把一个工作者停止,正在处理的消息就会丢失。同时,所有发送到这个工作者的还没有处理的消息都会丢失。

为了防止消息丢失,RabbitMQ 提供了消息响应(acknowledgments)。消费者会通过一个 ack(响应),告诉 RabbitMQ 已经收到并处理了某条消息,然后 RabbitMQ 就会释放并删除这条消息。如果消费者挂掉了,没有发送响应,RabbitMQ 就会认为消息没有被完全处理,然后重新发送给其他消费者。

1.1 消费者确认

消费者确认模式分为自动确认和手动确认。

1.1.1 自动确认

将 basic_consume() 第四个参数 $no_ack 设置为 true 表示自动确认。如果消费者接受到消息便会自动发送一个 ack。

/**
 * 自动发送ack
 */
public function index()
{
    // 连接并建立信道
   $connect = new AMQPStreamConnection('127.0.0.1', 5672, 'admin', 'admin2021');
   $channel = $connect->channel();
   $channel->basic_consume('queue_name', '', false, true, false, false, $callback);

    // 消费消息回调
    $callback = function ($msg) {
        print_r(">receive the message:".$msg->body."\n");
    };
    // 消费消息时设置 no_ack=true
    $channel->basic_consume('hello', '', false, true, false, false, $callback);

    print_r("consuming...\n");
    while ($channel->is_consuming()) {
        $channel->wait();
    }
    $channel->close();
    $connect->close();
}


1.1.2 手动确认

将 basic_consume() 第四个参数 $no_ack 设置为 false 表示手动确认。如果消费者接受到消息时没有手动发送一个 ack,那么 RabbitMQ 后续会重新发送。

  • basic.ack 用于肯定确认。
  • basic.nack 用于否定确认,可以一次拒绝或重新排队多条消息。(如果需要重新排队,$requeue = true)
  • basic.reject 用于否定确认,可以拒绝或重新排队一条消息。(如果需要重新排队,$requeue = true)
/**
 * 手动发送ack
 */
public function index()
{
    // 连接并建立信道
    $connect = new AMQPStreamConnection('127.0.0.1', 5672, 'admin', 'admin2021');
    $channel = $connect->channel();
    $channel->queue_declare('queue_name', false, false, false, false);

    // 消费消息
    $callback = function ($msg) {
        print_r(">receive the message:".$msg->body."\n");
        $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
    };
    // 消费消息时设置 no_ack=false
    $channel->basic_consume('hello', '', false, false, false, false, $callback);

    print_r("consuming...\n");
    while ($channel->is_consuming()) {
        $channel->wait();
    }
    $channel->close();
    $connect->close();
}

QoS (Quality Of Service 服务质量保证) 官方文档:www.rabbitmq.com/consumer-prefetch...

​ QoS 在手动确认消息的前提下,可以设置一个整数值 N,表示一个消费者最多只能一次拉取 N 条 消息。如果 N 条消息没有处理完,就不会从队列中获取新的消息,直到有消息被 ACK。

​ 设置 QoS 的作用就是防止消费者从队列中一下子拉取所有消息,从而导致服务崩溃或异常。这种机制一方面可以实现限速(将消息暂存到 RabbitMQ 内存中),一方面可以保证消息确认质量(比如确认了但是处理有异常的情况)。

1.2 发布者确认

通过 confirm_select() 启用 RabbitMQ 的发送确认功能,要求 RabbitMQ 显式告知发布者消息是否已成功发送。

当消息正常投递时,RabbitMQ 客户端将异步调用 set_ack_handler() 表示消息已经成功投递。

当消息投递出现异常时,set_nack_handler()将被调用。

public function index()
{
    $connection = new AMQPStreamConnection('127.0.0.1', 5672, 'admin', 'admin2021');
    $channel    = $connection->channel();
    $channel->exchange_declare('normal_exchange', AMQPExchangeType::DIRECT, false, true, false);

    // 开启确认模式
    $channel->confirm_select();

    // nack 回调
    $channel->set_nack_handler(function (AMQPMessage $msg) {
        print_r("{$msg->getBody()},nack\n");
    });
    // ack 回调
    $channel->set_ack_handler(function (AMQPMessage $msg) {
        print_r("{$msg->getBody()},ack\n");
    });

    // 发送消息
    for ($i = 1; $i <= 10; $i++) {
        $message = new AMQPMessage("这是第{$i}条消息");
        $channel->basic_publish($message, 'normal_exchange', 'route_key');
    }

    // 等待接收服务器的 ack 和 nacks
    $channel->wait_for_pending_acks();

    $channel->close();
    $connection->close();
}

如果文章有帮到你的话,别忘了点赞收藏噢 :smile:

本文章首发在 LearnKu.com 网站上。

上一篇 下一篇
贡献者:1
讨论数量: 0
发起讨论 只看当前版本


暂无话题~