困扰了五天的问题,laravel Reverb为什么无法下发广播?紧急求助
这个问题困扰了我五天了。
laravel11。php8.3。前端是微信小程序。但这个不重要。
我安装了laravel Reverb。
下面是我的后端代码:
routes\channels.php
<?php
use Illuminate\Support\Facades\Broadcast;
use Illuminate\Support\Facades\Log;
Log::info('channels.php 文件已加载');
Broadcast::channel('private-user.{userId}', function ($user, $userId) {
Log::info('这里根本无法执行到,无法打印到日志: ' . $userId);
return $user;
});
env是这样的
APP_NAME=测试站点
APP_ENV=local
APP_KEY=xxxxx
APP_DEBUG=true
APP_MAINTENANCE_DRIVER=file
# APP_MAINTENANCE_STORE=database
PHP_CLI_SERVER_WORKERS=4
BCRYPT_ROUNDS=12
LOG_CHANNEL=stack
LOG_STACK=single
LOG_DEPRECATIONS_CHANNEL=null
LOG_LEVEL=debug
BROADCAST_CONNECTION=reverb
FILESYSTEM_DISK=local
QUEUE_CONNECTION=redis
CACHE_STORE=database
# CACHE_PREFIX=
MEMCACHED_HOST=127.0.0.1
REDIS_CLIENT=phpredis
REDIS_HOST=127.0.0.1
REDIS_PASSWORD=null
REDIS_PORT=6379
REVERB_APP_ID=830117
REVERB_APP_KEY=1kzbb6yzbw99ivh28hui
REVERB_APP_SECRET=nbencwirhqvisbcktbgq
REVERB_HOST="localhost"
REVERB_PORT=8080
REVERB_SCHEME=http
VITE_REVERB_APP_KEY="${REVERB_APP_KEY}"
VITE_REVERB_HOST="${REVERB_HOST}"
VITE_REVERB_PORT="${REVERB_PORT}"
VITE_REVERB_SCHEME="${REVERB_SCHEME}"
事件是这样的
<?php
namespace Plugins\Charge\Events;
use Illuminate\Broadcasting\InteractsWithSockets;
use Illuminate\Broadcasting\PrivateChannel;
use Illuminate\Contracts\Broadcasting\ShouldBroadcast;
use Illuminate\Foundation\Events\Dispatchable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Support\Facades\Log;
class ChargeStatusUpdated implements ShouldBroadcast
{
use Dispatchable, InteractsWithSockets, SerializesModels;
public string $status;
public int $userId;
public function __construct(string $status, int $userId)
{
$this->status = $status;
$this->userId = $userId;
}
public function broadcastOn()
{
return new PrivateChannel('private-user.' . $this->userId);
}
public function broadcastWith()
{
return ['status' => $this->status];
}
}
config\broadcasting.php 是这样的
<?php
return [
'default' => env('BROADCAST_CONNECTION', 'reverb'),
'connections' => [
'reverb' => [
'driver' => 'reverb',
'key' => env('REVERB_APP_KEY'),
'secret' => env('REVERB_APP_SECRET'),
'app_id' => env('REVERB_APP_ID'),
'options' => [
'host' => env('REVERB_HOST'),
'port' => env('REVERB_PORT', 8080),
'scheme' => env('REVERB_SCHEME', 'http'),
'useTLS' => env('REVERB_SCHEME', 'http') === 'https',
],
'client_options' => [
// Guzzle client options: https://docs.guzzlephp.org/en/stable/request-options.html
],
],
'pusher' => [
'driver' => 'pusher',
'key' => env('PUSHER_APP_KEY'),
'secret' => env('PUSHER_APP_SECRET'),
'app_id' => env('PUSHER_APP_ID'),
'options' => [
'cluster' => env('PUSHER_APP_CLUSTER'),
'host' => env('PUSHER_HOST') ?: 'api-' . env('PUSHER_APP_CLUSTER', 'mt1') . '.pusher.com',
'port' => env('PUSHER_PORT', 443),
'scheme' => env('PUSHER_SCHEME', 'https'),
'encrypted' => true,
'useTLS' => env('PUSHER_SCHEME', 'https') === 'https',
],
'client_options' => [
// Guzzle client options: https://docs.guzzlephp.org/en/stable/request-options.html
],
],
'ably' => [
'driver' => 'ably',
'key' => env('ABLY_KEY'),
],
'log' => [
'driver' => 'log',
],
'null' => [
'driver' => 'null',
],
],
];
我在api里触发了广播事件
$userId = 1;
$status = 'chargingComplete';
event(new ChargeStatusUpdated($status, $userId));
最后日志显示以下两个信息。
[2025-03-23 21:18:59] local.INFO: channels.php 文件已加载
dansh[2025-03-23 20:55:37] local.INFO: Broadcasting on channel: private-user.1
[2025-03-23 20:55:37] local.INFO: Broadcasting with data: {"status":"chargingComplete"}
队列执行了。
但是终端没有这个事件的reverb输出。
Message Handled ........................................ 498099879.687066781
Pruning Stale Connections .................................................
Pinging Inactive Connections ............................................
Connection Pinged ................................ 498099879.687066781
Message Received ................................. 498099879.687066781
1▕ {
2▕ "event": "pusher:pong"
3▕ }
Message Handled .............................. 498099879.687066781
补充前端代码
前端是微信小程序。主要是这三个文件:
第一个是请求封装:
request.ts
// request.ts
import config from './config'
import CryptoJS from 'crypto-js';
interface RequestOptions {
baseUrl?: string
url: string
data?: any
header?: any
method?: string
noToken?: boolean
}
// 新增 WebSocket 选项接口
interface WebSocketOptions {
url: string
header?: any
protocols?: string[]
noToken?: boolean; // 添加 noToken 选项
}
class Request {
private config = {
baseUrl: config.baseUrl,
header: {
'Content-Type': 'application/json',
'X-Requested-With': 'XMLHttpRequest'
},
method: 'GET',
noToken: false
}
public async wxLogin() {
console.log('调用微信登录接口,获取code')
try {
const { code } = await wx.login()
console.log('获取到的 code:', code)
const systemInfo = wx.getSystemInfoSync()
const device_name = systemInfo.model
const url = this.config.baseUrl + '/authorizations'
const data = { code, device_name }
const header = Object.assign({}, this.config.header, {
Authorization: ''
})
// 将 wx.request 方法封装成一个 Promise 对象
await new Promise((resolve, reject) => {
wx.request({
url,
data,
header,
method: 'POST',
success: (res) => {
console.log('获取到的 token:', res.data.token)
const expiredTime = new Date().getTime() + res.data.expire_in * 1000
wx.setStorageSync('token', res.data.token)
wx.setStorageSync('expiredTime', expiredTime)
resolve(res)
},
fail: (err) => {
console.log('wxLogin 错误:', err)
reject(err)
}
})
})
} catch (e) {
console.log('wxLogin 异常:', e)
throw e
}
}
public async request(options: RequestOptions): Promise<any> {
console.log('第一步:发起请求', options.url)
const { options: processedOptions, header } =
await this.processOptions(options)
// 请求的url
const url = processedOptions.baseUrl + processedOptions.url
console.log(url)
// 请求的参数
const data = processedOptions.data
// 请求的header
const processedHeader = header
// 请求的方法
const method = processedOptions.method
// 返回一个Promise对象
return new Promise((resolve, reject) => {
console.log('第七步:发起请求', url, data, processedHeader, method)
wx.request({
url,
data,
header: processedHeader,
method: method as
| 'GET'
| 'POST'
| 'PUT'
| 'DELETE'
| 'OPTIONS'
| 'HEAD'
| 'TRACE'
| 'CONNECT',
success: async (res) => {
// 请求成功,如果响应中有错误信息,则拒绝Promise
if (typeof res.data === 'string') {
reject(new Error(res.data))
} else {
const data =
typeof res.data === 'object' ? res.data : JSON.parse(res.data)
if (data.message === 'Unauthenticated.') {
// token 无效,重新登录获取新的 token
console.log('Token无效,正在重新登录...')
await this.wxLogin()
// 重新请求
try {
const newResponse = await this.request(options)
resolve(newResponse)
} catch (error) {
reject(error)
}
} else if (data.status === 'error') {
reject(new Error(data.message))
} else {
resolve(res)
}
}
},
fail: (err) => {
// 请求失败,拒绝Promise
reject(err)
}
})
})
}
public async uploadFile(options: wx.UploadFileOptions): Promise<any> {
const { options: processedOptions, header } =
await this.processOptions(options)
// 请求的文件路径
const filePath = processedOptions.filePath
// 请求的文件名
const name = processedOptions.name
// 请求的formData
const formData = processedOptions.formData
// 返回一个Promise对象
return new Promise((resolve, reject) => {
wx.uploadFile({
url: processedOptions.baseUrl + processedOptions.url,
filePath,
name,
formData,
header,
success: (res) => {
// 请求成功,如果响应中有错误信息,则拒绝Promise
const data = JSON.parse(res.data)
resolve(data)
},
fail: (err) => {
// 请求失败,拒绝Promise
reject(err)
}
})
})
}
public async getClientAuth(socketId: string, channelName: string): Promise<{auth: string, channelData: string}> { // 修改返回类型
const response = await this.request({
url: '/reverb/client-auth',
data: {
socket_id: socketId,
channel_name: channelName,
},
method: 'POST',
noToken: false,
});
console.log("获取到 auth:", response.data);
//return response.data.auth; // 原始的返回值
return { // 修改返回值
auth: response.data.auth,
channelData: response.data.channel_data
};
}
public connectWebSocket(options: WebSocketOptions): Promise<wx.SocketTask> {
return new Promise(async (resolve, reject) => {
const { options: processedOptions, header } = await this.processWebSocketOptions(options)
const url = processedOptions.url;
const processedHeader = header; // 包含了 Authorization: Bearer <token>
const socketTask = wx.connectSocket({
url,
header: processedHeader, // 使用包含 token 的 header
protocols: options.protocols,
success: () => {
console.log('WebSocket 连接尝试成功 (wx.connectSocket success)')
},
fail: (err) => {
console.log('WebSocket 连接失败:', err)
reject(err)
},
})
socketTask.onOpen(() => {
console.log('WebSocket 连接已打开 (socketTask.onOpen)')
resolve(socketTask);
});
socketTask.onMessage(async (msg) => { // onMessage 也改为 async
console.log('WebSocket 收到消息:', msg)
const data = JSON.parse(msg.data);
if (data.event === 'pusher:connection_established') {
const innerData = JSON.parse(data.data); // 对 data.data 再次解析
let socketId = innerData.socket_id;
console.log("获取到 socketId:", socketId);
}
})
socketTask.onClose(() => {
console.log('WebSocket 连接已关闭')
})
socketTask.onError((err) => {
console.log('WebSocket 连接出错:', err)
})
})
}
// 处理 WebSocket 选项,复用 token 逻辑
private async processWebSocketOptions(options: WebSocketOptions): Promise<{ options: WebSocketOptions; header: any }> {
const newOptions = { ...options };
newOptions.header = newOptions.header || {};
if (!newOptions.noToken) { // 检查 noToken
let token = wx.getStorageSync('token');
console.log('判断缓存是否过期', !token || this.isTokenExpired());
if (!token || this.isTokenExpired()) {
console.log('调用 wxLogin 方法重新获取 token');
await this.wxLogin();
token = wx.getStorageSync('token');
console.log('拿到新的 token', token);
}
if (token) {
newOptions.header.Authorization = `Bearer ${token}`;
}
}
return { options: newOptions, header: newOptions.header };
}
private async processOptions(
options: RequestOptions | wx.UploadFileOptions
): Promise<{ options: RequestOptions | wx.UploadFileOptions; header: any }> {
const newOptions = Object.assign({}, options)
newOptions.header = newOptions.header || {}
if (!newOptions.noToken) {
let token = wx.getStorageSync('token')
console.log('判断缓存是否过期', !token || this.isTokenExpired())
if (!token || this.isTokenExpired()) {
console.log('调用 wxLogin 方法重新获取 token')
await this.wxLogin()
token = wx.getStorageSync('token')
console.log('拿到新的 token', token)
}
if (token) {
newOptions.header.Authorization = `Bearer ${token}`
}
}
const processedOptions = {
...this.config,
...newOptions,
header: {
...this.config.header,
...newOptions.header
}
}
return { options: processedOptions, header: processedOptions.header }
}
private isTokenExpired(): boolean {
const expiredTime = wx.getStorageSync('expiredTime')
console.log('当前时间:', new Date().getTime())
console.log('过期时间:', expiredTime)
console.log('距离过期还有:', expiredTime - new Date().getTime(), '毫秒')
if (!expiredTime) {
return true
}
if (new Date().getTime() > expiredTime) {
return true
}
return false
}
public get(url: string, data?: any, noToken?: boolean): Promise<any> {
return this.request({ url, data, noToken })
}
public post(url: string, data?: any): Promise<any> {
return this.request({ url, data, method: 'POST' })
}
public put(url: string, data?: any): Promise<any> {
return this.request({ url, data, method: 'PUT' })
}
public delete(url: string, data?: any): Promise<any> {
return this.request({ url, data, method: 'DELETE' })
}
}
export default Request
第二个是页面文件,也就是接收websoket信息的页面
src\pages\charge\charge.ts
// src/pages/charge/charge.ts
import Request from '../../utils/request';
import config from '../../utils/config';
const request = new Request();
Page({
data: {
status: 'starting',
},
socketTask: null as wx.SocketTask | null,
onLoad: async function () {
await this.connectWebSocket();
this.updateStatusText();
},
onUnload: function () {
if (this.socketTask) {
this.socketTask.close();
}
},
async connectWebSocket() {
const userId = 1;
const appKey = config.reverbAppKey;
const url = `ws://jy.test:8080/app/${appKey}`;
console.log("connectWebSocket - Connecting...");
try {
this.socketTask = await request.connectWebSocket({ url, noToken: false });
let socketId: string | null = null;
this.socketTask.onMessage(async (msg) => {
console.log('WebSocket 收到消息:', msg);
const data = JSON.parse(msg.data);
if (data.event === 'pusher:connection_established') {
const innerData = JSON.parse(data.data); // 对 data.data 再次解析
socketId = innerData.socket_id;
console.log("获取到 socketId:", socketId);
if (socketId) {
const channelName = `private-user.${userId}`;
try {
//const auth = await request.getClientAuth(socketId, channelName); // 原始的获取 auth
const { auth, channelData } = await request.getClientAuth(socketId, channelName); // 使用修改后的 getClientAuth
this.socketTask.send({
data: JSON.stringify({
event: 'pusher:subscribe',
data: {
channel: channelName,
auth: auth,
channel_data: channelData, // 添加 channel_data
},
}),
});
} catch (authError) {
console.error("获取 auth 失败:", authError);
}
}
} else if (data.event === 'pusher_internal:subscription_succeeded') {
console.log('Successfully subscribed to channel:', data.channel);
} else if (data.event === 'pusher:error') {
console.error("Reverb Error:", data.data);
} else if (data.event === 'pusher:ping') {
// 处理 pusher:ping 事件
console.log('Received pusher:ping, sending pusher:pong');
this.socketTask.send({
data: JSON.stringify({ event: 'pusher:pong' })
});
}
});
this.socketTask.onClose(() => {
console.log('WebSocket 连接已关闭 (socketTask.onClose)');
});
this.socketTask.onError((err) => {
console.error('WebSocket 错误 (socketTask.onError):', err);
});
} catch (error) {
console.error('WebSocket 连接失败:', error);
wx.showToast({
title: '网络连接失败,请稍后重试',
icon: 'none',
duration: 2000
});
}
},
});
第三个config.ts
interface Config {
baseUrl: string
reverbAppKey: string
reverbAppSecret: string
}
const config: Config = {
baseUrl: 'http://jy.test/api/miniprogram',
reverbAppKey: '111111',
reverbAppSecret: '222222', //忽略这两个地方的参数,我随意改的,事实上和后端是一致的
}
export default config
小程序看到的消息
{"event":"pusher:connection_established","data":"{\"socket_id\":\"797649699.737739152\",\"activity_timeout\":30}"} 114
08:51:29.194
{"event":"pusher:subscribe","data":{"channel":"private-user.1","auth":"1kzbb6yzbw99ivh28hui:1d11ea7ecd7239c7f5532449483336e47255f0e20954fb361883251c51860863","channel_data":"{\"user_id\":1}"}} 192
08:51:31.097
{"event":"pusher_internal:subscription_succeeded","data":"{}","channel":"private-user.1"} 89
08:51:31.134
{"event":"pusher:ping"} 23
08:53:29.715
{"event":"pusher:pong"}
routes\channels.php 内的代码。
打印到日志内的只有channels.php 文件已加载
。其他信息没有打印到日志内。
我的问题是:
1,为什么channels.php内的Broadcast::channel()没被执行。
2,api触发的广播事件没有被发送出去
是什么原因呢?怎么解决呢?非常感谢。。。煎熬了5天了。
推荐文章: