laravel定时任务实现rabbitMQ消费者常驻进程服务。
1. 运行环境
lnmp宝塔
1). 当前使用的 Laravel 版本?
Laravel Framework 7.30.6
2). 当前使用的 php/php-fpm 版本?
PHP 7.4.33 (cli) (built: Nov 2 2022 16:00:55) ( ZTS Visual C++ 2017 x64 )
Copyright (c) The PHP Group
Zend Engine v3.4.0, Copyright (c) Zend Technologies
with Xdebug v3.1.6, Copyright (c) 2002-2022, by Derick Rethans
3). 当前系统
4). 业务环境
开发环境
5). 相关软件版本
php mq 扩展包:
“php-amqplib/php-amqplib”: “^2.8”,
2. 问题描述?
队列服务莫名奇妙断连之后,重试逻辑异常。不断触发重连和断开,无法正常监听队列信息。
日志表现记录为,记录到最后一次尝试重试连接及连接成功之后的日志
重点来了!后续的日志开始了无限重新连接再断开的记录,具体表现如下图
下面是脚本的关键代码。
/**
* 构造函数,初始化连接配置信息
*/
public function __construct()
{
if (!defined('SOCKET_EAGAIN')) {
define('SOCKET_EAGAIN', 11);
}
if (!defined('SOCKET_EWOULDBLOCK')) {
define('SOCKET_EWOULDBLOCK', 11);
}
if (!defined('SOCKET_EINTR')) {
define('SOCKET_EINTR', 4);
}
// 判断服务是否开启
if (!env('RABBITMQ_ENABLE', false)) {
return;
}
// RabbitMQ 连接配置信息
$this->host = env("RABBITMQ_HOST", "localhost");
$this->port = env("RABBITMQ_PORT", 5671);
$this->user = env("RABBITMQ_USER", "guest");
$this->pass = env("RABBITMQ_PASSWORD", "guest");
$this->vhost = env("RABBITMQ_VHOST", "/");
$this->initializeConnection();
}
public bool $isDeclared = false;
/**
* 建立 RabbitMQ 连接
*
* @return void
* @throws \ErrorException
*/
public function initializeConnection()
{
// 初始化标志变量
$this->isDeclared = false;
while (!$this->isConnected && $this->retryAttempts < $this->maxRetryAttempts) {
try {
// 建立 RabbitMQ 连接
$this->connection = new AMQPSSLConnection($this->host, $this->port, $this->user, $this->pass, $this->vhost, ['verify_peer' => true, 'heartbeat' => 60, 'read_write_timeout' => 60]);
$this->channel = $this->connection->channel();
foreach ($this->queues as $queue) {
// 声明队列
$this->channel->queue_declare($queue, false, true, false, false);
// 将队列绑定到交换机
$this->channel->queue_bind($queue, 'fanout_exchange');
}
$this->isConnected = true;
$this->retryAttempts = 0;
\Log::channel('mq')->info('RabbitMQ connection established successfully.');
} catch (\Exception $e) {
\Log::channel('mq')->error('RabbitMQ connection failed: ' . $e->getMessage());
// 等待一段时间后重试
sleep($this->retryInterval);
$this->retryAttempts++;
}
}
if (!$this->isConnected) {
\Log::channel('mq')->error('Maximum retry attempts reached. Could not establish RabbitMQ connection.');
throw new \ErrorException('Could not establish RabbitMQ connection.');
}
}
/**
* 监听 RabbitMQ 队列
*
* @return void
* @throws \ErrorException
*
*/
public function listenToQueues()
{
\Log::channel('mq')->info('RabbitMQ listener started.');
while ($this->isConnected) {
try {
foreach ($this->queues as $queue) {
$this->channel->basic_consume($queue, '', false, false, false, false, function ($message) use ($queue) {
try {
$result = $this->processMessage($queue, $message);
if ($result) {
$this->channel->basic_ack($message->delivery_info['delivery_tag']);
}
} catch (\Exception $e) {
\Log::channel('mq')->error('Exception Error processing message: ' . $e->getMessage());
} catch (\Error $e) {
\Log::channel('mq')->error('Fatal Error processing message: ' . json_encode($e->getTrace()));
}
});
}
// 阻塞等待消息
\Log::channel('mq')->info('RabbitMQ listener waiting for messages.');
$this->channel->wait(null, false, $this->retryInterval);
} catch (\Exception $e) {
\Log::channel('mq')->error('Error while consuming messages: ' . $e->getMessage());
$this->isConnected = false; // 标记为断开连接,准备进行重连
$this->reconnect(); // 尝试重连
// 如果重连失败,退出监听循环
if (!$this->isConnected) {
\Log::channel('mq')->error('Failed to reconnect. Exiting listener loop.');
break;
}
}
}
}
/**
* 重连机制
*/
protected function reconnect()
{
try {
\Log::channel('mq')->info('Attempting to reconnect to RabbitMQ...');
// 关闭连接
// $this->closeConnection();
// 等待一段时间再重试
sleep(5); // 可以根据需要增加等待时间
// 重新初始化连接
$this->initializeConnection();
if ($this->isConnected) {
\Log::channel('mq')->info('Reconnected to RabbitMQ successfully.');
// 重新开始监听队列
$this->listenToQueues();
} else {
\Log::channel('mq')->error('Failed to reconnect to RabbitMQ.');
}
} catch (\Exception $e) {
\Log::channel('mq')->error('Error while reconnecting to RabbitMQ: ' . $e->getMessage());
}
}
/**
* 析构函数,用于关闭连接
*/
public function __destruct()
{
\Log::channel('mq')->info('RabbitMQService destructed');
}
3. 您期望得到的结果?
不理解这里重试逻辑到底存在什么问题,在某个节点之后,就开始出现了无限重连和断开的逻辑,也无法监听队列读取信息了。期望得到的正常表现为,消费服务进程常驻,不断监听队列信息。重连之后可以继续正常重新连接并监听队列信息。
4. 您实际得到的结果?
实际结果如上所述:
看了一下你的代码,存在一些问题:
我没记错的话,$this->channel->consume() 不就是常驻进程了吗,要定时任务干嘛?
不太懂,所以说只要合理的把异常都捕获到就ok了吗?
自己又根据官方文档提供的demo示例调整了下。虽然目前项目还未上线,但是目前测试环境中稳定运行天数30+。以下是最新的脚本关键内容。
最后,如果发现脚本有任何问题欢迎指正和交流。建议大家使用扩展或者工具时遇到问题,仔细研读官方文档。 :+1: