2 插件实现

未匹配的标注

插件 rabbitmq_delayed_message_exchange 将消息延迟发送到交换机。

1.配置延迟类型的交换机:

public function index()
{
    $connect = new AMQPStreamConnection('127.0.0.1', 5672, 'admin', 'admin2021');
    $channel = $connect->channel();

    // 延迟类型交换机的路由方式为 direct
    $argument = new AMQPTable(['x-delayed-type' => 'direct']);
    // 参数2指定交换机类型为 x-delayed-message,而不是之前的 direct/fanout...
    $channel->exchange_declare('delay_exchange', 'x-delayed-message', false, true, false, false, false, $argument);

    // 声明队列
    $channel->queue_declare('delay_queue', false, true, false, false, false);
    $channel->queue_bind('delay_queue', 'delay_exchange', 'delay_route_key');
    $channel->basic_qos(0, 1, false);

    // 消费消息
    $callback = function ($msg) {
        print_r(">".time()."_receive the message:".$msg->body."\n");
        $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
    };
    $channel->basic_consume('delay_queue', '', false, false, false, false, $callback);

    print_r("consuming...\n");
    while ($channel->is_consuming()) {
        $channel->wait();
    }
    $channel->close();
    $connect->close();
}

2.发送消息到延迟交换机:

public function index()
{
    $connection = new AMQPStreamConnection('127.0.0.1', 5672, 'admin', 'admin2021');
    $channel    = $connection->channel();

    // 发送消息
    for ($i = 1; $i <= 10; $i++) {
        $properties = [
            'application_headers' => new AMQPTable(['x-delay' => 10 * 1000]) // 配置消息延迟时间
        ];

        $message = new AMQPMessage("这是第{$i}条消息_".time(), $properties);
        $channel->basic_publish($message, 'delay_exchange', 'delay_route_key');
    }

    $channel->close();
    $connection->close();
}

如果文章有帮到你的话,别忘了点赞收藏噢 :smile:

本文章首发在 LearnKu.com 网站上。

上一篇 下一篇
讨论数量: 0
发起讨论 只看当前版本


暂无话题~