2 插件实现
插件 rabbitmq_delayed_message_exchange 将消息延迟发送到交换机。
1.配置延迟类型的交换机:
public function index()
{
$connect = new AMQPStreamConnection('127.0.0.1', 5672, 'admin', 'admin2021');
$channel = $connect->channel();
// 延迟类型交换机的路由方式为 direct
$argument = new AMQPTable(['x-delayed-type' => 'direct']);
// 参数2指定交换机类型为 x-delayed-message,而不是之前的 direct/fanout...
$channel->exchange_declare('delay_exchange', 'x-delayed-message', false, true, false, false, false, $argument);
// 声明队列
$channel->queue_declare('delay_queue', false, true, false, false, false);
$channel->queue_bind('delay_queue', 'delay_exchange', 'delay_route_key');
$channel->basic_qos(0, 1, false);
// 消费消息
$callback = function ($msg) {
print_r(">".time()."_receive the message:".$msg->body."\n");
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
$channel->basic_consume('delay_queue', '', false, false, false, false, $callback);
print_r("consuming...\n");
while ($channel->is_consuming()) {
$channel->wait();
}
$channel->close();
$connect->close();
}
2.发送消息到延迟交换机:
public function index()
{
$connection = new AMQPStreamConnection('127.0.0.1', 5672, 'admin', 'admin2021');
$channel = $connection->channel();
// 发送消息
for ($i = 1; $i <= 10; $i++) {
$properties = [
'application_headers' => new AMQPTable(['x-delay' => 10 * 1000]) // 配置消息延迟时间
];
$message = new AMQPMessage("这是第{$i}条消息_".time(), $properties);
$channel->basic_publish($message, 'delay_exchange', 'delay_route_key');
}
$channel->close();
$connection->close();
}
如果文章有帮到你的话,别忘了点赞收藏噢 :smile: