问下windows系统下laravel8如何连接mqtt服务器然后能一直连接,可以关闭命令行和能重连?
情况是这样的,我这边项目上服务器用的是windows,然后一个物联设备只支持mqtt协议,然后我就在windows服务器上安装了emqx作为mqtt服务器
然后我需要在laravel框架里面订阅几个话题,将物联设备发送到mqtt服务器的数据就收过来
然后我用的库是workerman/mqtt,然后这是我的代码
<?php
namespace App\Console\Commands;
use App\Services\MqttService;
use Illuminate\Console\Command;
use Workerman\Worker;
class MqttWorker1 extends Command
{
/**
* 命令名称和签名
*
* @var string
*/
protected $signature = 'mqtt:worker1
{action=start : start|stop|restart|reload|status}
{--daemon : 以守护进程方式运行}';
/**
* 命令描述
*
* @var string
*/
protected $description = '启动MQTT工作进程';
/**
* MQTT服务
*
* @var MqttService
*/
private $mqttService;
/**
* 创建新命令实例
*
* @param MqttService $mqttService
*/
public function __construct(MqttService $mqttService)
{
parent::__construct();
$this->mqttService = $mqttService;
}
/**
* 执行命令
*
* @return mixed
*/
public function handle()
{
global $argv;
// 获取命令参数
$action = $this->argument('action');
$isDaemon = $this->option('daemon');
// 设置Workerman参数
$argv[0] = 'mqtt:worker';
$argv[1] = $action;
// 如果是守护进程模式,添加-d参数
if ($isDaemon && in_array($action, ['start', 'restart', 'reload'])) {
$argv[2] = '-d';
}
$this->registerSubscriptions();
Worker::runAll();
}
/**
* 注册MQTT订阅
*/
private function registerSubscriptions()
{
$this->mqttService->subscribe('testtopic/9', function($topic, $content) {
// 处理消息,接收topic和content参数
$this->info("收到MQTT消息: 主题={$topic}, 内容={$content}");
});
}
}
然后我的service是这样的
<?php
namespace App\Services;
use Workerman\Mqtt\Client;
use Illuminate\Support\Facades\Log;
use Illuminate\Support\Facades\Event;
class MqttService
{
private $client;
private $connected = false;
private $topics = [];
public function __construct()
{
// 从配置文件中获取MQTT连接参数
$this->proto = config('mqtt.proto', 'mqtt');
$this->host = config('mqtt.host', 'localhost');
$this->port = config('mqtt.port', 1883);
$this->clientId = config('mqtt.client_id', 'shian_client_' . uniqid());
$this->username = config('mqtt.username', null);
$this->password = config('mqtt.password', null);
$this->cleanSession = config('mqtt.clean_session', true);
$this->connectTimeout = config('mqtt.connect_timeout', 60);
$this->reconnectPeriod = config('mqtt.reconnect_period', 5);
}
/**
* 连接到MQTT服务器
*/
public function connect()
{
// 防止重复创建客户端实例
if ($this->client) {
return;
}
Log::info("尝试连接MQTT服务器: {$this->proto}://{$this->host}:{$this->port}");
// 创建MQTT客户端
try {
$this->client = new Client("{$this->proto}://{$this->host}:{$this->port}", [
'client_id' => $this->clientId,
'username' => $this->username,
'password' => $this->password,
'clean_session' => $this->cleanSession,
'connect_timeout' => $this->connectTimeout,
'reconnect_period' => $this->reconnectPeriod
]);
} catch (\Exception $e) {
Log::error("创建MQTT客户端失败: " . $e->getMessage());
throw $e;
}
// 设置连接成功回调
$this->client->onConnect = function() {
$this->connected = true;
Log::info('MQTT 连接成功');
// 连接成功后重新订阅所有主题
foreach ($this->topics as $topic => $data) {
$this->client->subscribe($topic, $data['qos']);
Log::info("MQTT 重新订阅主题: $topic");
}
};
// 设置断开连接回调
$this->client->onClose = function() {
$this->connected = false;
Log::warning('MQTT 连接已断开');
};
// 设置错误回调
$this->client->onError = function($code, $msg) {
$this->connected = false;
Log::error("MQTT 错误: $code - $msg");
};
// 设置消息接收回调
$this->client->onMessage = function($topic, $content) {
Log::info("收到MQTT消息: 主题={$topic}, 内容={$content}");
// 触发Laravel事件
Event::dispatch('mqtt.message', [$topic, $content]);
// 调用注册的回调函数
if (isset($this->topics[$topic])) {
call_user_func($this->topics[$topic]['callback'], $topic, $content);
}
};
// 启动客户端连接
try {
$this->client->connect();
} catch (\Exception $e) {
Log::error("MQTT连接异常: " . $e->getMessage());
$this->client = null;
throw $e;
}
}
/**
* 发布消息到指定主题
*/
public function publish(string $topic, string $message, int $qos = 0, bool $retain = false)
{
if (!$this->connected) {
$this->connect();
}
// 确保连接成功后再发布
if ($this->connected) {
$this->client->publish($topic, $message, $qos, $retain);
Log::info("MQTT 消息已发布到主题: $topic");
} else {
Log::error("无法发布MQTT消息: 未连接到服务器");
}
}
/**
* 订阅主题并处理消息
*/
public function subscribe(string $topic, callable $callback = null, int $qos = 0)
{
// 保存回调函数和QoS
$this->topics[$topic] = [
'callback' => $callback,
'qos' => $qos
];
// 确保客户端已连接
if (!$this->connected) {
$this->connect();
}
// 如果已经连接,立即订阅
if ($this->connected) {
$this->client->subscribe($topic, $qos);
Log::info("MQTT 已订阅主题: $topic (QoS: $qos)");
}
}
/**
* 断开MQTT连接
*/
public function disconnect()
{
if ($this->connected && $this->client) {
$this->client->close();
$this->connected = false;
$this->client = null;
Log::info('MQTT 连接已断开');
}
}
/**
* 启动MQTT客户端(阻塞模式)
*/
public function start()
{
if (!$this->connected) {
$this->connect();
}
}
}
然后我创建了一个yml文件,我希望是能用pm2来管理进程
apps:
- name: mqtt
script: artisan
exec_mode: fork
interpreter: php
instances: 1
args:
- mqtt:worker
- single
- --start = mqtt
- --daemon
restart_delay: 3000
pid_file: ./storage/app/mqtt.pid
然后我执行pm2 start startmqtt.yml 然后一直报这个错误
[2025-06-16 14:45:28] local.INFO: 尝试连接MQTT服务器: mqtt://127.0.0.1:1883
[2025-06-16 14:45:28] local.ERROR: Call to a member function add() on null {“exception”:”[object] (Error(code: 0): Call to a member function add() on null at C:\Users\test\Desktop\food\backend\vendor\workerman\workerman\Connection\AsyncTcpConnection.php:215)
[stacktrace]
我测试过了,这个mqtt服务器是能连接的
推荐文章: