问下windows系统下laravel8如何连接mqtt服务器然后能一直连接,可以关闭命令行和能重连?

情况是这样的,我这边项目上服务器用的是windows,然后一个物联设备只支持mqtt协议,然后我就在windows服务器上安装了emqx作为mqtt服务器
然后我需要在laravel框架里面订阅几个话题,将物联设备发送到mqtt服务器的数据就收过来
然后我用的库是workerman/mqtt,然后这是我的代码

<?php

namespace App\Console\Commands;

use App\Services\MqttService;
use Illuminate\Console\Command;
use Workerman\Worker;

class MqttWorker1 extends Command
{
    /**
     * 命令名称和签名
     *
     * @var string
     */
    protected $signature = 'mqtt:worker1
                            {action=start : start|stop|restart|reload|status}
                            {--daemon : 以守护进程方式运行}';

    /**
     * 命令描述
     *
     * @var string
     */
    protected $description = '启动MQTT工作进程';

    /**
     * MQTT服务
     *
     * @var MqttService
     */
    private $mqttService;

    /**
     * 创建新命令实例
     *
     * @param MqttService $mqttService
     */
    public function __construct(MqttService $mqttService)
    {
        parent::__construct();
        $this->mqttService = $mqttService;
    }

    /**
     * 执行命令
     *
     * @return mixed
     */
    public function handle()
    {
        global $argv;

        // 获取命令参数
        $action = $this->argument('action');
        $isDaemon = $this->option('daemon');

        // 设置Workerman参数
        $argv[0] = 'mqtt:worker';
        $argv[1] = $action;

        // 如果是守护进程模式,添加-d参数
        if ($isDaemon && in_array($action, ['start', 'restart', 'reload'])) {
            $argv[2] = '-d';
        }

        $this->registerSubscriptions();
        Worker::runAll();
    }

    /**
     * 注册MQTT订阅
     */
    private function registerSubscriptions()
    {
        $this->mqttService->subscribe('testtopic/9', function($topic, $content) {
            // 处理消息,接收topic和content参数
            $this->info("收到MQTT消息: 主题={$topic}, 内容={$content}");
        });
    }
}

然后我的service是这样的

<?php

namespace App\Services;

use Workerman\Mqtt\Client;
use Illuminate\Support\Facades\Log;
use Illuminate\Support\Facades\Event;

class MqttService
{
    private $client;
    private $connected = false;
    private $topics = [];

    public function __construct()
    {
        // 从配置文件中获取MQTT连接参数
        $this->proto = config('mqtt.proto', 'mqtt');
        $this->host = config('mqtt.host', 'localhost');
        $this->port = config('mqtt.port', 1883);
        $this->clientId = config('mqtt.client_id', 'shian_client_' . uniqid());
        $this->username = config('mqtt.username', null);
        $this->password = config('mqtt.password', null);
        $this->cleanSession = config('mqtt.clean_session', true);
        $this->connectTimeout = config('mqtt.connect_timeout', 60);
        $this->reconnectPeriod = config('mqtt.reconnect_period', 5);
    }

    /**
     * 连接到MQTT服务器
     */
    public function connect()
    {
        // 防止重复创建客户端实例
        if ($this->client) {
            return;
        }

        Log::info("尝试连接MQTT服务器: {$this->proto}://{$this->host}:{$this->port}");

        // 创建MQTT客户端
        try {
            $this->client = new Client("{$this->proto}://{$this->host}:{$this->port}", [
                'client_id' => $this->clientId,
                'username' => $this->username,
                'password' => $this->password,
                'clean_session' => $this->cleanSession,
                'connect_timeout' => $this->connectTimeout,
                'reconnect_period' => $this->reconnectPeriod
            ]);
        } catch (\Exception $e) {
            Log::error("创建MQTT客户端失败: " . $e->getMessage());
            throw $e;
        }

        // 设置连接成功回调
        $this->client->onConnect = function() {
            $this->connected = true;
            Log::info('MQTT 连接成功');

            // 连接成功后重新订阅所有主题
            foreach ($this->topics as $topic => $data) {
                $this->client->subscribe($topic, $data['qos']);
                Log::info("MQTT 重新订阅主题: $topic");
            }
        };

        // 设置断开连接回调
        $this->client->onClose = function() {
            $this->connected = false;
            Log::warning('MQTT 连接已断开');
        };

        // 设置错误回调
        $this->client->onError = function($code, $msg) {
            $this->connected = false;
            Log::error("MQTT 错误: $code - $msg");
        };

        // 设置消息接收回调
        $this->client->onMessage = function($topic, $content) {
            Log::info("收到MQTT消息: 主题={$topic}, 内容={$content}");

            // 触发Laravel事件
            Event::dispatch('mqtt.message', [$topic, $content]);

            // 调用注册的回调函数
            if (isset($this->topics[$topic])) {
                call_user_func($this->topics[$topic]['callback'], $topic, $content);
            }
        };

        // 启动客户端连接
        try {
            $this->client->connect();
        } catch (\Exception $e) {
            Log::error("MQTT连接异常: " . $e->getMessage());
            $this->client = null;
            throw $e;
        }
    }

    /**
     * 发布消息到指定主题
     */
    public function publish(string $topic, string $message, int $qos = 0, bool $retain = false)
    {
        if (!$this->connected) {
            $this->connect();
        }

        // 确保连接成功后再发布
        if ($this->connected) {
            $this->client->publish($topic, $message, $qos, $retain);
            Log::info("MQTT 消息已发布到主题: $topic");
        } else {
            Log::error("无法发布MQTT消息: 未连接到服务器");
        }
    }

    /**
     * 订阅主题并处理消息
     */
    public function subscribe(string $topic, callable $callback = null, int $qos = 0)
    {
        // 保存回调函数和QoS
        $this->topics[$topic] = [
            'callback' => $callback,
            'qos' => $qos
        ];

        // 确保客户端已连接
        if (!$this->connected) {
            $this->connect();
        }

        // 如果已经连接,立即订阅
        if ($this->connected) {
            $this->client->subscribe($topic, $qos);
            Log::info("MQTT 已订阅主题: $topic (QoS: $qos)");
        }
    }

    /**
     * 断开MQTT连接
     */
    public function disconnect()
    {
        if ($this->connected && $this->client) {
            $this->client->close();
            $this->connected = false;
            $this->client = null;
            Log::info('MQTT 连接已断开');
        }
    }

    /**
     * 启动MQTT客户端(阻塞模式)
     */
    public function start()
    {
        if (!$this->connected) {
            $this->connect();
        }
    }
}

然后我创建了一个yml文件,我希望是能用pm2来管理进程

apps:
  - name: mqtt
    script: artisan
    exec_mode: fork
    interpreter: php
    instances: 1
    args:
      - mqtt:worker
      - single
      - --start = mqtt
      - --daemon
    restart_delay: 3000
    pid_file: ./storage/app/mqtt.pid

然后我执行pm2 start startmqtt.yml 然后一直报这个错误
[2025-06-16 14:45:28] local.INFO: 尝试连接MQTT服务器: mqtt://127.0.0.1:1883
[2025-06-16 14:45:28] local.ERROR: Call to a member function add() on null {“exception”:”[object] (Error(code: 0): Call to a member function add() on null at C:\Users\test\Desktop\food\backend\vendor\workerman\workerman\Connection\AsyncTcpConnection.php:215)
[stacktrace]
我测试过了,这个mqtt服务器是能连接的

《L05 电商实战》
从零开发一个电商项目,功能包括电商后台、商品 & SKU 管理、购物车、订单管理、支付宝支付、微信支付、订单退款流程、优惠券等
《L01 基础入门》
我们将带你从零开发一个项目并部署到线上,本课程教授 Web 开发中专业、实用的技能,如 Git 工作流、Laravel Mix 前端工作流等。
讨论数量: 3

你是試試這個包,"php-mqtt/client": "^1.8", 我的可以了。。。。

1天前 评论
donggan (楼主) 15小时前

你既然用的workman的包,就应该把应该把 MqttService::connect() 的调用放到 Workerman Worker 的 onWorkerStart 回调里,确保事件循环已启动。

    $worker = new Worker();
    $worker->onWorkerStart = function() {
        $this->registerSubscriptions();
    };

    Worker::runAll();
22小时前 评论

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!