laravel定时任务实现rabbitMQ消费者常驻进程服务。

1. 运行环境

lnmp宝塔

1). 当前使用的 Laravel 版本?

Laravel Framework 7.30.6

2). 当前使用的 php/php-fpm 版本?

PHP 7.4.33 (cli) (built: Nov 2 2022 16:00:55) ( ZTS Visual C++ 2017 x64 )
Copyright (c) The PHP Group
Zend Engine v3.4.0, Copyright (c) Zend Technologies
with Xdebug v3.1.6, Copyright (c) 2002-2022, by Derick Rethans

3). 当前系统

laravel定时任务实现rabbitMQ消费者服务。

4). 业务环境

开发环境

5). 相关软件版本

php mq 扩展包:
“php-amqplib/php-amqplib”: “^2.8”,

2. 问题描述?

队列服务莫名奇妙断连之后,重试逻辑异常。不断触发重连和断开,无法正常监听队列信息。
日志表现记录为,记录到最后一次尝试重试连接及连接成功之后的日志
laravel定时任务实现rabbitMQ消费者服务。

laravel定时任务实现rabbitMQ消费者服务。
重点来了!后续的日志开始了无限重新连接再断开的记录,具体表现如下图

laravel定时任务实现rabbitMQ消费者服务。

下面是脚本的关键代码。

/**
     * 构造函数,初始化连接配置信息
     */
    public function __construct()
    {
        if (!defined('SOCKET_EAGAIN')) {
            define('SOCKET_EAGAIN', 11);
        }
        if (!defined('SOCKET_EWOULDBLOCK')) {
            define('SOCKET_EWOULDBLOCK', 11);
        }
        if (!defined('SOCKET_EINTR')) {
            define('SOCKET_EINTR', 4);
        }

        // 判断服务是否开启
        if (!env('RABBITMQ_ENABLE', false)) {
            return;
        }
        // RabbitMQ 连接配置信息
        $this->host = env("RABBITMQ_HOST", "localhost");
        $this->port = env("RABBITMQ_PORT", 5671);
        $this->user = env("RABBITMQ_USER", "guest");
        $this->pass = env("RABBITMQ_PASSWORD", "guest");
        $this->vhost = env("RABBITMQ_VHOST", "/");
        $this->initializeConnection();
    }

    public bool $isDeclared = false;

    /**
     * 建立 RabbitMQ 连接
     *
     * @return void
     * @throws \ErrorException
     */
    public function initializeConnection()
    {
        // 初始化标志变量
        $this->isDeclared = false;
        while (!$this->isConnected && $this->retryAttempts < $this->maxRetryAttempts) {
            try {
                // 建立 RabbitMQ 连接
                $this->connection = new AMQPSSLConnection($this->host, $this->port, $this->user, $this->pass, $this->vhost, ['verify_peer' => true, 'heartbeat' => 60, 'read_write_timeout' => 60]);
                $this->channel = $this->connection->channel();
                foreach ($this->queues as $queue) {
                    // 声明队列
                    $this->channel->queue_declare($queue, false, true, false, false);
                    // 将队列绑定到交换机
                    $this->channel->queue_bind($queue, 'fanout_exchange');
                }
                $this->isConnected = true;
                $this->retryAttempts = 0;
                \Log::channel('mq')->info('RabbitMQ connection established successfully.');
            } catch (\Exception $e) {
                \Log::channel('mq')->error('RabbitMQ connection failed: ' . $e->getMessage());
                // 等待一段时间后重试
                sleep($this->retryInterval);
                $this->retryAttempts++;
            }
        }
        if (!$this->isConnected) {
            \Log::channel('mq')->error('Maximum retry attempts reached. Could not establish RabbitMQ connection.');
            throw new \ErrorException('Could not establish RabbitMQ connection.');
        }
    }

    /**
     * 监听 RabbitMQ 队列
     *
     * @return void
     * @throws \ErrorException
     *
     */
    public function listenToQueues()
    {
        \Log::channel('mq')->info('RabbitMQ listener started.');
        while ($this->isConnected) {
            try {
                foreach ($this->queues as $queue) {
                    $this->channel->basic_consume($queue, '', false, false, false, false, function ($message) use ($queue) {
                        try {
                            $result = $this->processMessage($queue, $message);
                            if ($result) {
                                $this->channel->basic_ack($message->delivery_info['delivery_tag']);
                            }
                        } catch (\Exception $e) {
                            \Log::channel('mq')->error('Exception Error processing message: ' . $e->getMessage());
                        } catch (\Error $e) {
                            \Log::channel('mq')->error('Fatal Error processing message: ' . json_encode($e->getTrace()));
                        }
                    });
                }
                // 阻塞等待消息
                \Log::channel('mq')->info('RabbitMQ listener waiting for messages.');
                $this->channel->wait(null, false, $this->retryInterval);
            } catch (\Exception $e) {
                \Log::channel('mq')->error('Error while consuming messages: ' . $e->getMessage());
                $this->isConnected = false; // 标记为断开连接,准备进行重连
                $this->reconnect(); // 尝试重连
                // 如果重连失败,退出监听循环
                if (!$this->isConnected) {
                    \Log::channel('mq')->error('Failed to reconnect. Exiting listener loop.');
                    break;
                }
            }
        }
    }

    /**
     * 重连机制
     */
    protected function reconnect()
    {
        try {
            \Log::channel('mq')->info('Attempting to reconnect to RabbitMQ...');
            // 关闭连接
//            $this->closeConnection();
            // 等待一段时间再重试
            sleep(5);  // 可以根据需要增加等待时间

            // 重新初始化连接
            $this->initializeConnection();
            if ($this->isConnected) {
                \Log::channel('mq')->info('Reconnected to RabbitMQ successfully.');
                // 重新开始监听队列
                $this->listenToQueues();
            } else {
                \Log::channel('mq')->error('Failed to reconnect to RabbitMQ.');
            }
        } catch (\Exception $e) {
            \Log::channel('mq')->error('Error while reconnecting to RabbitMQ: ' . $e->getMessage());
        }
    }

    /**
     * 析构函数,用于关闭连接
     */
    public function __destruct()
    {
        \Log::channel('mq')->info('RabbitMQService destructed');
    }

3. 您期望得到的结果?

不理解这里重试逻辑到底存在什么问题,在某个节点之后,就开始出现了无限重连和断开的逻辑,也无法监听队列读取信息了。期望得到的正常表现为,消费服务进程常驻,不断监听队列信息。重连之后可以继续正常重新连接并监听队列信息。

laravel定时任务实现rabbitMQ消费者服务。

4. 您实际得到的结果?

实际结果如上所述:

laravel定时任务实现rabbitMQ消费者服务。

《L02 从零构建论坛系统》
以构建论坛项目 LaraBBS 为线索,展开对 Laravel 框架的全面学习。应用程序架构思路贴近 Laravel 框架的设计哲学。
《G01 Go 实战入门》
从零开始带你一步步开发一个 Go 博客项目,让你在最短的时间内学会使用 Go 进行编码。项目结构很大程度上参考了 Laravel。
讨论数量: 6

看了一下你的代码,存在一些问题:

1.你的代码有递归调用的风险
2.catch (\Exception $e) 不能完全捕获到所有异常,使用catch (\Throwable $e)
3.常驻进程你最好是设置一下执行次数自动退出当前进程,你可以使用supervisor监管进程
3个月前 评论
beyondM_S (楼主) 3个月前
莫名私下里 1个月前

我没记错的话,$this->channel->consume() 不就是常驻进程了吗,要定时任务干嘛?

3个月前 评论

不太懂,所以说只要合理的把异常都捕获到就ok了吗?

3个月前 评论

自己又根据官方文档提供的demo示例调整了下。虽然目前项目还未上线,但是目前测试环境中稳定运行天数30+。以下是最新的脚本关键内容。

public function createConnection()
{
    while ($this->retryAttempts < $this->maxRetryAttempts) {
        try {
            $this->connection = new AMQPSSLConnection(
                $this->host,
                $this->port,
                $this->user,
                $this->pass,
                $this->vhost,
                ['verify_peer' => true, 'heartbeat' => 60, 'read_write_timeout' => 60]
            );
            $this->isConnected = true;
            $this->retryAttempts = 0;
            return; // 连接成功,退出循环
        } catch (\Exception $e) {
            $this->retryAttempts++;
            \Log::channel('mq')->error("RabbitMQ connection failed. Retry attempt {$this->retryAttempts}. Error: " . $e->getMessage());
            sleep($this->retryInterval); // 等待后重试
        }
    }
    // 如果超过最大重试次数
    \Log::channel('mq')->error("Exceeded maximum retry attempts ({$this->maxRetryAttempts}). Exiting script.");
    exit(1);
}

/**
 * 持续监听 RabbitMQ 队列
 * 队列入口
 */
public function listenToQueues()
{
    register_shutdown_function([$this, 'shutdown']);

    $lastMemoryLogTime = time(); // 记录上次记录内存的时间
    $memoryLogInterval = 60;    // 内存记录间隔(秒)
    while (true) {
        try {
            $this->createConnection();
            // 定期记录内存使用情况
            if (time() - $lastMemoryLogTime >= $memoryLogInterval) {
                $this->logMemoryUsage($lastMemoryLogTime);
                $lastMemoryLogTime = time();
            }
            $this->consumeConnections();
        } catch (AMQPRuntimeException $e) {
            \Log::channel('mq')->error('RabbitMQ amqp runtime exception: ' . $e->getMessage());
            $this->cleanUpConnection($this->connection);
            usleep(self::WAIT_BEFORE_RECONNECT_uS);
        } catch (AMQPIOException $e) {
            \Log::channel('mq')->error('AMQP IO exception: ' . $e->getMessage());
            $this->cleanUpConnection($this->connection);
            usleep(self::WAIT_BEFORE_RECONNECT_uS);
        } catch (\RuntimeException $e) {
            \Log::channel('mq')->error('RabbitMQ runtime exception: ' . $e->getMessage());
            $this->cleanUpConnection($this->connection);
            usleep(self::WAIT_BEFORE_RECONNECT_uS);
        } catch (\ErrorException $e) {
            \Log::channel('mq')->error('Error processing message: ' . $e->getMessage());
            $this->cleanUpConnection($this->connection);
            usleep(self::WAIT_BEFORE_RECONNECT_uS);
        }
    }
}

/**
 * 记录内存使用情况
 */
protected function logMemoryUsage($lastMemoryLogTime)
{
    $currentMemoryUsage = memory_get_usage(true); // 当前分配的内存
    $peakMemoryUsage = memory_get_peak_usage(true); // 内存使用峰值

    \Log::channel('mq')->log('info', sprintf("Memory Usage: %s MB | Peak Memory Usage: %s MB | Last Log Time: %s", number_format($currentMemoryUsage / 1024 / 1024, 2), number_format($peakMemoryUsage / 1024 / 1024, 2), date('Y-m-d H:i:s', $lastMemoryLogTime)));
}

/**
 * 消费队列
 */
public function consumeConnections()
{
    $this->channel = $this->connection->channel();
    $maxAttempts = self::MAX_RETRY_ATTEMPTS;
    foreach ($this->queues as $queue) {
        $consumerTag = $queue . 'consumer';
        $exchangeName = $this->exchanges[$queue];
        $this->channel->exchange_declare($exchangeName, 'fanout', false, true, false);
        $this->channel->queue_declare($queue, false, true, false, false);
        $this->channel->queue_bind($queue, $exchangeName, $this->routingKey);
        $this->channel->basic_consume($queue, $consumerTag, false, false, false, false, function ($message) use ($queue, $maxAttempts) {
            $result = $this->processMessage($queue, $message);
            if ($result) {
                // 手动ack确认
                $this->channel->basic_ack($message->delivery_info['delivery_tag']);
            } else {
                // 手动nack拒绝丢弃
                $this->channel->basic_nack($message->delivery_info['delivery_tag']);
            }
            unset($message);
        });
    }
    while (count($this->channel->callbacks)) {
        $this->channel->wait(null, false, $this->retryInterval);
    }
}

/**
 * 处理消息
 */
public function processMessage($queue, AMQPMessage $message): bool
{
    // 业务代码省略
}

/**
 * 清除 RabbitMQ 连接
 */
public function cleanUpConnection($connection)
{
    try {
        if ($this->channel !== null) {
            $this->channel->close();
        }
        if ($connection !== null) {
            $connection->close();
        }
    } catch (\ErrorException $e) {
        \Log::channel('mq')->error('Error closing RabbitMQ connection: ' . $e->getMessage());
    }
}

/**
 * 关闭 RabbitMQ 连接
 */
public function shutdown()
{
    \Log::channel('mq')->info('RabbitMQ connection closed.');
    if ($this->connection !== null) {
        $this->connection->close();
    }
}

最后,如果发现脚本有任何问题欢迎指正和交流。建议大家使用扩展或者工具时遇到问题,仔细研读官方文档。 :+1:

2个月前 评论

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!