RabbitMQ实现延迟队列

  • 实现原理:给队列的消息设置过期时间(TTL),消息到期后就会投递到一个死信队列,我们就可以在这里处理延迟的任务。

    一、介绍

    1. 死信队列

    当消息在一个队列中变成死信之后,它会被重新投递到设置的Exchange中,这个Exchange就是DLX,通过routing_key的绑定投递到对应的队列,这个队列就是死信队列。

    2. 死信消息
    • 消息被拒绝(basic.reject / basic.nack),并且requeue = false。
    • 消息TTL过期。TTL:Time To Live的简称,即过期时间。
    • 队列达到最大长度。
    3. 过期消息
    • 通过队列进行设置,设置后该队列所有的消息都存在相同的过期时间。
    • 通过对消息本身设置,队列中的每条消息的过期时间都可以不一样。如果要用来实现延迟队列不建议使用这种方式,因为队列只会判断第一个消息是否过期,过期则把消息投递到死信队列。如果第一个消息过期时间为30s,二个消息的过期时间为10s,那么队列等30s后把第一消息投递到死信队列,然后继续判断下一个消息,但是这样子第二个消息的延迟时间就变成30s了。

    二、原理图

    RabbitMQ实现延迟队列

    三、上代码

    composer require php-amqplib/php-amqplib
    把代码贴到根目录的public.php文件运行

      <?php
    
      use PhpAmqpLib\Message\AMQPMessage;
    
      require_once __DIR__ . '/vendor/autoload.php';
      //配置信息
      $conn_args       = array(
          'host'     => '47.107.237.18',
          'port'     => '5672',
          'login'    => 'guest',
          'password' => 'guest',
          'vhost'    => '/'
      );
      $ttl             = 10;//过期时间
      $exchange        = 'exchange';//正常交换机
      $delayExchange   = 'delayed_' . $exchange;//死信交换机
      $type            = 'topic';
      $msg             = 'hello world';
      $route           = 'delay';
      $deadQueue       = 'dead_queue';//死信队列
      $delayQueue      = 'delayed_queue_' . $exchange . '_' . $ttl;//延迟队列
      $delayRoutingKey = $route . '_' . $ttl;
    
      $conn = new \PhpAmqpLib\Connection\AMQPStreamConnection(
          $conn_args['host'],
          $conn_args['port'],
          $conn_args['login'],
          $conn_args['password'],
          $conn_args['vhost']
      );
      //创建连接和channel
      $channel = $conn->channel();
      //定义延迟交换器
      $channel->exchange_declare($delayExchange, 'topic', false, true, false);
      //定义延迟队列,
      $channel->queue_declare(
          $delayQueue,
          false,
          true,
          false,
          false,
          false,
          new \PhpAmqpLib\Wire\AMQPTable(
              array(
                  "x-dead-letter-exchange"    => $delayExchange,/*队列信息超时后投递到这个交换机*/
                  "x-dead-letter-routing-key" => $route,/*routingKey*/
                  "x-message-ttl"             => $ttl * 1000,/*超时时间*/
              )
          )
      );
      /*定义死信队列,延迟队列超时的消息就会投递到这里处理*/
      $channel->queue_declare(
          $deadQueue,
          false,
          true,
          false,
          false
      );
      /*绑定死信队列到延迟交换机*/
      $channel->queue_bind($deadQueue, $delayExchange, $route);
      //绑定延迟队列到交换器上
      $channel->queue_bind($delayQueue, $delayExchange, $delayRoutingKey);
      //生产者发送消息
      $msg = new AMQPMessage($msg);
      $channel->basic_publish($msg, $delayExchange, $delayRoutingKey);
      $channel->close();

谢谢观看,日常记录!
streetlamp 敬上!

本作品采用《CC 协议》,转载必须注明作者和本文链接
《L01 基础入门》
我们将带你从零开发一个项目并部署到线上,本课程教授 Web 开发中专业、实用的技能,如 Git 工作流、Laravel Mix 前端工作流等。
《L05 电商实战》
从零开发一个电商项目,功能包括电商后台、商品 & SKU 管理、购物车、订单管理、支付宝支付、微信支付、订单退款流程、优惠券等
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!