3 死信队列案例
死信队列:绑定了死信交换机的队列,处理变成死信的消息。死信队列也是和正常队列没有区别,只是在声明的时候,配置 x-dead-letter-exchange
参数。
3.1 当消息被拒绝
当消息被拒绝时,消息进入死信队列,配置流程如下:
1.声明业务队列时,配置死信交换机和 Routing Key。
public function index()
{
$connect = new AMQPStreamConnection('127.0.0.1', 5672, 'admin', 'admin2021');
$channel = $connect->channel();
$channel->exchange_declare('normal_exchange', AMQPExchangeType::DIRECT, false, true, false);
// 声明业务队列的死信交换机
$argument = new AMQPTable([
'x-dead-letter-exchange' => 'dl_exchange', // 配置死信交换机
'x-dead-letter-routing-key' => 'dl_route_key', // 配置 Routing Key,路由到 dl_exchange
]);
$channel->queue_declare('normal_queue', false, false, false, false, false, $argument);
// 业务队列“normal_queue”绑定到业务交换机“normal_exchange”,Routing Key 为“normal_route_key”
$channel->queue_bind('normal_queue', 'normal_exchange', 'normal_route_key');
// 消费消息时,拒绝消息
$callback = function ($msg) {
$msg->delivery_info['channel']->basic_reject($msg->delivery_info['delivery_tag'], false);
};
$channel->basic_consume('normal_queue', '', false, false, false, false, $callback);
print_r("consuming...\n");
while ($channel->is_consuming()) {
$channel->wait();
}
$channel->close();
$connect->close();
}
2.为死信交换机配置死信队列。
public function index()
{
$connect = new AMQPStreamConnection('127.0.0.1', 5672, 'admin', 'admin2021');
$channel = $connect->channel();
// 声明死信交换机
$channel->exchange_declare('dl_exchange', AMQPExchangeType::DIRECT, false, true, false);
// 声明死信队列
$channel->queue_declare('dl_queue', false, false, false, false);
// 死信队列绑定到死信交换机,Routing Key 为“dl_route_key”
$channel->queue_bind('dl_queue', 'dl_exchange', 'dl_route_key');
// 消费消息
$callback = function ($msg) {
print_r(">receive the message:".$msg->body."\n");
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
$channel->basic_consume('dl_queue', '', false, false, false, false, $callback);
print_r("consuming...\n");
while ($channel->is_consuming()) {
$channel->wait();
}
$channel->close();
$connect->close();
}
当消息在业务队列normal_queue
的消息被拒绝时,它会按照x-dead-letter-exchange
和x-dead-letter-routing-key
重新将消息 publish 到死信交换机dl_exchange
。此时,绑定了死信交换机的死信队列dl_queue
就能订阅到消息。
3.2 当消息过期时
当消息过期时,消息进入死信队列,利用这一特性可以实现延迟队列,代码如下。
1.声明业务队列时,配置消息过期时间、死信交换机:
public function index()
{
$connect = new AMQPStreamConnection('127.0.0.1', 5672, 'admin', 'admin2021');
$channel = $connect->channel();
$channel->exchange_declare('normal_exchange', AMQPExchangeType::DIRECT, false, true, false);
// 声明队列
$argument = new AMQPTable([
'x-message-ttl' => 10000, // 10s,设置消息存活时间(TTL,Time To Live)
'x-dead-letter-exchange' => 'dl_exchange', // 配置死信交换机
'x-dead-letter-routing-key' => 'dl_route_key', // 配置 Routing Key,路由到 dl_exchange
]);
$channel->queue_declare('normal_queue', false, false, false, false, false, $argument);
$channel->queue_bind('normal_queue', 'normal_exchange', 'normal_route_key');
// 一次拉取一条消息
$channel->basic_qos(0, 1, false);
// 消费消息
$callback = function ($msg) {
sleep(5); // 延时,所以处理两条消息后,后面的消息都已经过期
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
$channel->basic_consume('normal_queue', '', false, false, false, false, $callback);
print_r("consuming...\n");
while ($channel->is_consuming()) {
$channel->wait();
}
$channel->close();
$connect->close();
}
2.为死信交换机配置死信队列。(同上)
3.3 当队列达到最大长度
当队列达到最大长度限制时,消息进入死信队列。
1.声明业务队列时,配置队列长度限制、死信交换机:
public function index()
{
$connect = new AMQPStreamConnection('127.0.0.1', 5672, 'admin', 'admin2021');
$channel = $connect->channel();
$channel->exchange_declare('normal_exchange', AMQPExchangeType::DIRECT, false, true, false);
$argument = new AMQPTable([
'x-max-length' => 5,
'x-dead-letter-exchange' => 'dl_exchange', // 配置死信交换机
'x-dead-letter-routing-key' => 'dl_route_key', // 配置 Routing Key,路由到 dl_exchange
'x-overflow' => 'reject-publish-dlx'
]);
$channel->queue_declare('normal_queue', false, false, false, false, false, $argument);
$channel->queue_bind('normal_queue', 'normal_exchange', 'normal_route_key');
// 消费消息
$callback = function ($msg) {
print_r(">receive the message:".$msg->body."\n");
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
$channel->basic_consume('normal_queue', '', false, false, false, false, $callback);
print_r("consuming...\n");
while ($channel->is_consuming()) {
$channel->wait();
}
$channel->close();
$connect->close();
}
2.为死信交换机配置死信队列。(同上)
如果文章有帮到你的话,别忘了点赞收藏噢 :smile: