困扰了五天的问题,laravel Reverb为什么无法下发广播?紧急求助

这个问题困扰了我五天了。
laravel11。php8.3。前端是微信小程序。但这个不重要。
我安装了laravel Reverb。
下面是我的后端代码:
routes\channels.php

<?php
use Illuminate\Support\Facades\Broadcast;
use Illuminate\Support\Facades\Log;
Log::info('channels.php 文件已加载');
Broadcast::channel('private-user.{userId}', function ($user, $userId) {
    Log::info('这里根本无法执行到,无法打印到日志: ' . $userId);
    return $user;
});

env是这样的

APP_NAME=测试站点
APP_ENV=local
APP_KEY=xxxxx
APP_DEBUG=true

APP_MAINTENANCE_DRIVER=file
# APP_MAINTENANCE_STORE=database
PHP_CLI_SERVER_WORKERS=4
BCRYPT_ROUNDS=12

LOG_CHANNEL=stack
LOG_STACK=single
LOG_DEPRECATIONS_CHANNEL=null
LOG_LEVEL=debug

BROADCAST_CONNECTION=reverb

FILESYSTEM_DISK=local
QUEUE_CONNECTION=redis
CACHE_STORE=database
# CACHE_PREFIX=

MEMCACHED_HOST=127.0.0.1
REDIS_CLIENT=phpredis
REDIS_HOST=127.0.0.1
REDIS_PASSWORD=null
REDIS_PORT=6379

REVERB_APP_ID=830117
REVERB_APP_KEY=1kzbb6yzbw99ivh28hui
REVERB_APP_SECRET=nbencwirhqvisbcktbgq
REVERB_HOST="localhost"
REVERB_PORT=8080
REVERB_SCHEME=http

VITE_REVERB_APP_KEY="${REVERB_APP_KEY}"
VITE_REVERB_HOST="${REVERB_HOST}"
VITE_REVERB_PORT="${REVERB_PORT}"
VITE_REVERB_SCHEME="${REVERB_SCHEME}"

事件是这样的


<?php
namespace Plugins\Charge\Events;
use Illuminate\Broadcasting\InteractsWithSockets;
use Illuminate\Broadcasting\PrivateChannel;
use Illuminate\Contracts\Broadcasting\ShouldBroadcast;
use Illuminate\Foundation\Events\Dispatchable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Support\Facades\Log;
class ChargeStatusUpdated implements ShouldBroadcast
{
    use Dispatchable, InteractsWithSockets, SerializesModels;
    public string $status;
    public int $userId;
    public function __construct(string $status, int $userId)
    {
        $this->status  = $status;
        $this->userId  = $userId;
    }

    public function broadcastOn()
    {
        return new PrivateChannel('private-user.' . $this->userId);
    }

    public function broadcastWith()
    {
        return ['status' => $this->status];
    }
}

config\broadcasting.php 是这样的


<?php
return [
 'default' => env('BROADCAST_CONNECTION', 'reverb'),
 'connections' => [
 'reverb' => [
 'driver' => 'reverb',
 'key' => env('REVERB_APP_KEY'),
 'secret' => env('REVERB_APP_SECRET'),
 'app_id' => env('REVERB_APP_ID'),
 'options' => [
 'host' => env('REVERB_HOST'),
 'port' => env('REVERB_PORT', 8080),
 'scheme' => env('REVERB_SCHEME', 'http'),
 'useTLS' => env('REVERB_SCHEME', 'http') === 'https',
            ],
 'client_options' => [
                // Guzzle client options: https://docs.guzzlephp.org/en/stable/request-options.html
            ],
        ],
 'pusher' => [
 'driver' => 'pusher',
 'key' => env('PUSHER_APP_KEY'),
 'secret' => env('PUSHER_APP_SECRET'),
 'app_id' => env('PUSHER_APP_ID'),
 'options' => [
 'cluster' => env('PUSHER_APP_CLUSTER'),
 'host' => env('PUSHER_HOST') ?: 'api-' . env('PUSHER_APP_CLUSTER', 'mt1') . '.pusher.com',
 'port' => env('PUSHER_PORT', 443),
 'scheme' => env('PUSHER_SCHEME', 'https'),
 'encrypted' => true,
 'useTLS' => env('PUSHER_SCHEME', 'https') === 'https',
            ],
 'client_options' => [
                // Guzzle client options: https://docs.guzzlephp.org/en/stable/request-options.html
            ],
        ],
 'ably' => [
 'driver' => 'ably',
 'key' => env('ABLY_KEY'),
        ],
 'log' => [
 'driver' => 'log',
        ],
 'null' => [
 'driver' => 'null',
        ],
    ],
];

我在api里触发了广播事件

$userId  = 1;
$status  =  'chargingComplete';
event(new ChargeStatusUpdated($status, $userId));

最后日志显示以下两个信息。

[2025-03-23  21:18:59] local.INFO: channels.php 文件已加载
dansh[2025-03-23 20:55:37] local.INFO: Broadcasting on channel: private-user.1
[2025-03-23 20:55:37] local.INFO: Broadcasting with data: {"status":"chargingComplete"}

队列执行了。
但是终端没有这个事件的reverb输出。

Message Handled ........................................ 498099879.687066781
Pruning Stale Connections .................................................
Pinging Inactive Connections ............................................
Connection Pinged ................................ 498099879.687066781
Message Received ................................. 498099879.687066781
1{
2"event": "pusher:pong"
3}
Message Handled .............................. 498099879.687066781

补充前端代码

前端是微信小程序。主要是这三个文件:
第一个是请求封装:
request.ts

// request.ts
import config from './config'
import CryptoJS from 'crypto-js';
interface RequestOptions {
  baseUrl?: string
  url: string
  data?: any
  header?: any
  method?: string
  noToken?: boolean
}

// 新增 WebSocket 选项接口
interface WebSocketOptions {
  url: string
  header?: any
  protocols?: string[]
  noToken?: boolean; // 添加 noToken 选项
}

class Request {
  private config = {
    baseUrl: config.baseUrl,
    header: {
      'Content-Type': 'application/json',
      'X-Requested-With': 'XMLHttpRequest'
    },
    method: 'GET',
    noToken: false
  }

  public async wxLogin() {
    console.log('调用微信登录接口,获取code')
    try {
      const { code } = await wx.login()
      console.log('获取到的 code:', code)
      const systemInfo = wx.getSystemInfoSync()
      const device_name = systemInfo.model

      const url = this.config.baseUrl + '/authorizations'
      const data = { code, device_name }
      const header = Object.assign({}, this.config.header, {
        Authorization: ''
      })

      // 将 wx.request 方法封装成一个 Promise 对象
      await new Promise((resolve, reject) => {
        wx.request({
          url,
          data,
          header,
          method: 'POST',
          success: (res) => {
            console.log('获取到的 token:', res.data.token)
            const expiredTime = new Date().getTime() + res.data.expire_in * 1000
            wx.setStorageSync('token', res.data.token)
            wx.setStorageSync('expiredTime', expiredTime)
            resolve(res)
          },
          fail: (err) => {
            console.log('wxLogin 错误:', err)
            reject(err)
          }
        })
      })
    } catch (e) {
      console.log('wxLogin 异常:', e)
      throw e
    }
  }

  public async request(options: RequestOptions): Promise<any> {
    console.log('第一步:发起请求', options.url)
    const { options: processedOptions, header } =
      await this.processOptions(options)

    // 请求的url
    const url = processedOptions.baseUrl + processedOptions.url
    console.log(url)
    // 请求的参数
    const data = processedOptions.data
    // 请求的header
    const processedHeader = header
    // 请求的方法
    const method = processedOptions.method

    // 返回一个Promise对象
    return new Promise((resolve, reject) => {
      console.log('第七步:发起请求', url, data, processedHeader, method)
      wx.request({
        url,
        data,
        header: processedHeader,
        method: method as
          | 'GET'
          | 'POST'
          | 'PUT'
          | 'DELETE'
          | 'OPTIONS'
          | 'HEAD'
          | 'TRACE'
          | 'CONNECT',
        success: async (res) => {
          // 请求成功,如果响应中有错误信息,则拒绝Promise
          if (typeof res.data === 'string') {
            reject(new Error(res.data))
          } else {
            const data =
              typeof res.data === 'object' ? res.data : JSON.parse(res.data)
            if (data.message === 'Unauthenticated.') {
              // token 无效,重新登录获取新的 token
              console.log('Token无效,正在重新登录...')
              await this.wxLogin()
              // 重新请求
              try {
                const newResponse = await this.request(options)
                resolve(newResponse)
              } catch (error) {
                reject(error)
              }
            } else if (data.status === 'error') {
              reject(new Error(data.message))
            } else {
              resolve(res)
            }
          }
        },
        fail: (err) => {
          // 请求失败,拒绝Promise
          reject(err)
        }
      })
    })
  }
  public async uploadFile(options: wx.UploadFileOptions): Promise<any> {
    const { options: processedOptions, header } =
      await this.processOptions(options)

    // 请求的文件路径
    const filePath = processedOptions.filePath
    // 请求的文件名
    const name = processedOptions.name
    // 请求的formData
    const formData = processedOptions.formData

    // 返回一个Promise对象
    return new Promise((resolve, reject) => {
      wx.uploadFile({
        url: processedOptions.baseUrl + processedOptions.url,
        filePath,
        name,
        formData,
        header,
        success: (res) => {
          // 请求成功,如果响应中有错误信息,则拒绝Promise
          const data = JSON.parse(res.data)
          resolve(data)
        },
        fail: (err) => {
          // 请求失败,拒绝Promise
          reject(err)
        }
      })
    })
  }

public async getClientAuth(socketId: string, channelName: string): Promise<{auth: string, channelData: string}> { // 修改返回类型
  const response = await this.request({
      url: '/reverb/client-auth',
      data: {
          socket_id: socketId,
          channel_name: channelName,
      },
      method: 'POST',
      noToken: false,
  });
  console.log("获取到 auth:", response.data);
  //return response.data.auth; // 原始的返回值
   return { // 修改返回值
        auth: response.data.auth,
        channelData: response.data.channel_data
    };
}
public connectWebSocket(options: WebSocketOptions): Promise<wx.SocketTask> {
    return new Promise(async (resolve, reject) => {
      const { options: processedOptions, header } = await this.processWebSocketOptions(options)
        const url = processedOptions.url;
        const processedHeader = header; // 包含了 Authorization: Bearer <token>
      const socketTask = wx.connectSocket({
         url,
          header: processedHeader, // 使用包含 token 的 header
          protocols: options.protocols,
        success: () => {
          console.log('WebSocket 连接尝试成功 (wx.connectSocket success)')
        },
        fail: (err) => {
          console.log('WebSocket 连接失败:', err)
          reject(err)
        },
      })
      socketTask.onOpen(() => {
            console.log('WebSocket 连接已打开 (socketTask.onOpen)')
            resolve(socketTask);
        });
      socketTask.onMessage(async (msg) => { // onMessage 也改为 async
        console.log('WebSocket 收到消息:', msg)
         const data = JSON.parse(msg.data);
            if (data.event === 'pusher:connection_established') {
                const innerData = JSON.parse(data.data); // 对 data.data 再次解析
    let socketId = innerData.socket_id;
                console.log("获取到 socketId:", socketId);
            }
      })

      socketTask.onClose(() => {
        console.log('WebSocket 连接已关闭')
      })
      socketTask.onError((err) => {
        console.log('WebSocket 连接出错:', err)
      })
    })
  }
  // 处理 WebSocket 选项,复用 token 逻辑
  private async processWebSocketOptions(options: WebSocketOptions): Promise<{ options: WebSocketOptions; header: any }> {
    const newOptions = { ...options };
    newOptions.header = newOptions.header || {};

    if (!newOptions.noToken) { // 检查 noToken
      let token = wx.getStorageSync('token');
      console.log('判断缓存是否过期', !token || this.isTokenExpired());

      if (!token || this.isTokenExpired()) {
        console.log('调用 wxLogin 方法重新获取 token');
        await this.wxLogin();
        token = wx.getStorageSync('token');
        console.log('拿到新的 token', token);
      }
      if (token) {
        newOptions.header.Authorization = `Bearer ${token}`;
      }
    }
    return { options: newOptions, header: newOptions.header };
  }

  private async processOptions(
    options: RequestOptions | wx.UploadFileOptions
  ): Promise<{ options: RequestOptions | wx.UploadFileOptions; header: any }> {
    const newOptions = Object.assign({}, options)
    newOptions.header = newOptions.header || {}

    if (!newOptions.noToken) {
      let token = wx.getStorageSync('token')
      console.log('判断缓存是否过期', !token || this.isTokenExpired())

      if (!token || this.isTokenExpired()) {
        console.log('调用 wxLogin 方法重新获取 token')
        await this.wxLogin()
        token = wx.getStorageSync('token')
        console.log('拿到新的 token', token)
      }

      if (token) {
        newOptions.header.Authorization = `Bearer ${token}`
      }
    }

    const processedOptions = {
      ...this.config,
      ...newOptions,
      header: {
        ...this.config.header,
        ...newOptions.header
      }
    }

    return { options: processedOptions, header: processedOptions.header }
  }

  private isTokenExpired(): boolean {
    const expiredTime = wx.getStorageSync('expiredTime')
    console.log('当前时间:', new Date().getTime())
    console.log('过期时间:', expiredTime)
    console.log('距离过期还有:', expiredTime - new Date().getTime(), '毫秒')
    if (!expiredTime) {
      return true
    }
    if (new Date().getTime() > expiredTime) {
      return true
    }
    return false
  }

  public get(url: string, data?: any, noToken?: boolean): Promise<any> {
    return this.request({ url, data, noToken })
  }

  public post(url: string, data?: any): Promise<any> {
    return this.request({ url, data, method: 'POST' })
  }

  public put(url: string, data?: any): Promise<any> {
    return this.request({ url, data, method: 'PUT' })
  }

  public delete(url: string, data?: any): Promise<any> {
    return this.request({ url, data, method: 'DELETE' })
  }
}
export default Request

第二个是页面文件,也就是接收websoket信息的页面
src\pages\charge\charge.ts

// src/pages/charge/charge.ts
import Request from '../../utils/request';
import config from '../../utils/config';
const request = new Request();

Page({
  data: {
    status: 'starting',
  },
  socketTask: null as wx.SocketTask | null,

  onLoad: async function () {
    await this.connectWebSocket();
    this.updateStatusText();
  },

  onUnload: function () {
    if (this.socketTask) {
      this.socketTask.close();
    }
  },


  async connectWebSocket() {
    const userId = 1;
    const appKey = config.reverbAppKey;
    const url = `ws://jy.test:8080/app/${appKey}`;

    console.log("connectWebSocket - Connecting...");

    try {
      this.socketTask = await request.connectWebSocket({ url, noToken: false });
      let socketId: string | null = null;

      this.socketTask.onMessage(async (msg) => {
        console.log('WebSocket 收到消息:', msg);
        const data = JSON.parse(msg.data);

        if (data.event === 'pusher:connection_established') {
          const innerData = JSON.parse(data.data); // 对 data.data 再次解析
          socketId = innerData.socket_id;
          console.log("获取到 socketId:", socketId);

          if (socketId) {
            const channelName = `private-user.${userId}`;
            try {
              //const auth = await request.getClientAuth(socketId, channelName); // 原始的获取 auth
              const { auth, channelData } = await request.getClientAuth(socketId, channelName); // 使用修改后的 getClientAuth

              this.socketTask.send({
                data: JSON.stringify({
                  event: 'pusher:subscribe',
                  data: {
                    channel: channelName,
                    auth: auth,
                    channel_data: channelData, // 添加 channel_data
                  },
                }),
              });
            } catch (authError) {
              console.error("获取 auth 失败:", authError);
            }
          }
        } else if (data.event === 'pusher_internal:subscription_succeeded') {
          console.log('Successfully subscribed to channel:', data.channel);
        } else if (data.event === 'pusher:error') {
          console.error("Reverb Error:", data.data);
        } else if (data.event === 'pusher:ping') {
          // 处理 pusher:ping 事件
          console.log('Received pusher:ping, sending pusher:pong');
          this.socketTask.send({
            data: JSON.stringify({ event: 'pusher:pong' })
          });
        }
      });

      this.socketTask.onClose(() => {
        console.log('WebSocket 连接已关闭 (socketTask.onClose)');
      });

      this.socketTask.onError((err) => {
        console.error('WebSocket 错误 (socketTask.onError):', err);
      });
    } catch (error) {
      console.error('WebSocket 连接失败:', error);
      wx.showToast({
        title: '网络连接失败,请稍后重试',
        icon: 'none',
        duration: 2000
      });
    }
  },

});

第三个config.ts

interface Config {
  baseUrl: string
  reverbAppKey: string
  reverbAppSecret: string
}

const config: Config = {
  baseUrl: 'http://jy.test/api/miniprogram',
  reverbAppKey: '111111', 
  reverbAppSecret: '222222',  //忽略这两个地方的参数,我随意改的,事实上和后端是一致的
}

export default config

小程序看到的消息

{"event":"pusher:connection_established","data":"{\"socket_id\":\"797649699.737739152\",\"activity_timeout\":30}"}    114    
08:51:29.194
{"event":"pusher:subscribe","data":{"channel":"private-user.1","auth":"1kzbb6yzbw99ivh28hui:1d11ea7ecd7239c7f5532449483336e47255f0e20954fb361883251c51860863","channel_data":"{\"user_id\":1}"}}    192    
08:51:31.097
{"event":"pusher_internal:subscription_succeeded","data":"{}","channel":"private-user.1"}    89    
08:51:31.134
{"event":"pusher:ping"}    23    
08:53:29.715
{"event":"pusher:pong"}

routes\channels.php 内的代码。
打印到日志内的只有channels.php 文件已加载。其他信息没有打印到日志内。

我的问题是:

1,为什么channels.php内的Broadcast::channel()没被执行。
2,api触发的广播事件没有被发送出去

是什么原因呢?怎么解决呢?非常感谢。。。煎熬了5天了。

乌鸦嘴社区 wyz.xyz 来玩。
《L05 电商实战》
从零开发一个电商项目,功能包括电商后台、商品 & SKU 管理、购物车、订单管理、支付宝支付、微信支付、订单退款流程、优惠券等
《L03 构架 API 服务器》
你将学到如 RESTFul 设计风格、PostMan 的使用、OAuth 流程,JWT 概念及使用 和 API 开发相关的进阶知识。
讨论数量: 23
nff93

你前端没有订阅这个频道吧。

2天前 评论
shebaoting (楼主) 2天前
JaguarJack

你先不用队列推送试试。不行把你的前端发出来看看。私有频道我记得监听事件需要加个 .

1天前 评论
shebaoting (楼主) 1天前
JaguarJack (作者) 1天前
shebaoting (楼主) 1天前

你客户端成功收到了服务端的消息了吧,都有订阅成功提示了:

{"event":"pusher_internal:subscription_succeeded","data":"{}","channel":"private-user.1"}

你客户端 pong 也收到了。

你代码有个小问题,频道授权验证的匿名函数应该返回布尔值

file

1天前 评论
shebaoting (楼主) 1天前

看起来是你的 api 服务没有将数据发布到 reverb 服务中,广播是通过 Redis Pub/Sub 实现的,可能得看看你的 Redis 正不正常。

1天前 评论
shebaoting (楼主) 1天前
pardon110

你用的是ws 明文协议 使用本地测试域名 jy.test
可能未在微信公众平台配置合法 Socket 域名 若用ws建议先用非小程序的js写一段测试websocket 连接file

1天前 评论
shebaoting (楼主) 1天前
  1. 你的 ChargeStatusUpdated 事件是 implements ShouldBroadcast 的,这个是需要开启队列的 php artisan queue:work。无需异步队列的话,改成 ShouldBroadcastNow。

file

  1. 你后端设置的 PrivateChannel,会自带 private- 前缀

file

你前端没有使用 Echo 的 private 方法设置 channel,所以也许前端应该改成

const channelName = `private-private-user.${userId}`;
1天前 评论
sy_dante (作者) 1天前
shebaoting (楼主) 16小时前
sy_dante (作者) 16小时前
梦想星辰大海

laravel Reverb,是在webs4ocket之上封装的一套东西吗?看着使用起来各种概念,各种细节,好累人。 你要是愿意放弃laravel Reverb,换个我的方案,我愿意花费一个小时协助你打通websocket通路。

19小时前 评论
shebaoting (楼主) 16小时前
梦想星辰大海 (作者) 13小时前
shebaoting (楼主) 13小时前

你居然写的这么麻烦。我都是用的 laravel-each。直接就搞定了。如果你改成那样的,咱们可以聊聊

13小时前 评论
shebaoting (楼主) 13小时前

我之前也遇到了和你一样的问题,wss连接正常,ping pong 也正常,reverb 的 event 也是正常触发,但是 wss 收不到,后来我排查是 reverb 的监听写的有问题。博客:Laravel Reverb 生产环境配置

11小时前 评论

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!