Laravel + 微信小程序 websocket 搭建广播消息系统
在现代的 Web 应用中很多场景都需要运用到即时通讯,比如常见的扫码登录,聊天室,广播消息等。
在过去,为了实现这种即时通讯(推送)通常都是使用Ajax轮询。轮询就是在指定的时间间隔内,进行HTTP 请求来获取数据,而这种方式会产生一些弊端,一方面产生过多的HTTP请求,占用带宽,增大服务器的相应,浪费资源,另一方面,因为不是每一次请求都会有数据变化(就像聊天室),所以就会造成请求的利用率低。
而 websocket 正好能够解决上面的弊端,它是一种双向协议,允许服务端主动推送信息到客户端。
Redis
在开始之前,我们需要开启一个 redis 服务,并在 Laravel 应用中进行配置启用,因为在整个流程中,我们需要借助 redis 的订阅和发布机制来实现即时通讯。
安装 Predis 库
composer require predis/predis
配置
Redis 在应用中的配置文件存储在 config/database.php
,在这个文件中,你可以看到一个包含了 Redis 服务信息的 redis 数组:
'redis' => [
'client' => 'predis',
'default' => [
'host' => env('REDIS_HOST', '127.0.0.1'),
'password' => env('REDIS_PASSWORD', null),
'port' => env('REDIS_PORT', 6379),
'database' => 0,
],
],
广播
配置
所有关于事件广播的配置都保存在 config/broadcasting.php
配置文件中。 Laravel 自带了几个广播驱动: Pusher 、 Redis , 和一个用于本地开发与调试的 log 驱动,我们将使用 Redis 作为广播驱动,这里需要依赖 predis/predis
类库。
通知
首先我们需要先创建一个数据库表来存放通知,使用命令 notifications:table
生成包含特定表结构的迁移文件。
php artisan notifications:table
php artisan migrate
然后我们创建一个 CommentArticle 类用来广播用户的文章评论通知:
php artisan make:notification CommentArticle
<?php
namespace App\Notifications;
use App\Models\Comment;
use Illuminate\Bus\Queueable;
use Illuminate\Notifications\Messages\BroadcastMessage;
use Illuminate\Notifications\Notification;
use Illuminate\Contracts\Queue\ShouldQueue;
class CommentArticle extends Notification implements ShouldQueue
{
use Queueable;
protected $comment;
public function __construct(Comment $comment)
{
$this->comment = $comment;
}
public function via()
{
return ['database', 'broadcast']; // 指定 broadcast 频道系统会把通知消息广播出去
}
public function toArray()
{
return [
'form_id' => $this->comment->id, // 评论id
'form_user_id' => $this->comment->user_id, // 评论用户id
'form_user_name' => $this->comment->user->name, // 评论用户名
'form_user_avatar' => $this->comment->user->user_info->avatarUrl,
'content' => $this->comment->content, // 评论内容
'target_id' => $this->comment->target_id, // 文章id
'target_name' => $this->comment->target->title, // 文章标题
];
}
}
toArray
方法可以同时被 database
和 broadcast
渠道调用,如果你希望 database
和 broacast
两个渠道有不同的数组展现方式,你需要定义 toDatabase
或者 toBroadcast
以取代定义 toArray 方法。
自定义通知消息广播渠道
自定义通知实体接受其广播台通知的频道,我们需要在通知实体上定义一个 receivesBroadcastNotificationsOn 方法
编辑 App\Models\User
<?php
namespace App\Models;
use Eloquent;
use Illuminate\Contracts\Auth\Authenticatable as AuthenticatableContract;
use Illuminate\Contracts\Auth\Access\Authorizable as AuthorizableContract;
use Illuminate\Contracts\Auth\CanResetPassword as CanResetPasswordContract;
use Illuminate\Notifications\Notifiable;
use Illuminate\Auth\Authenticatable;
use Illuminate\Foundation\Auth\Access\Authorizable;
use Illuminate\Auth\Passwords\CanResetPassword;
use Tymon\JWTAuth\Contracts\JWTSubject;
class User extends Eloquent implements
AuthenticatableContract,
AuthorizableContract,
CanResetPasswordContract,
JWTSubject
{
use Authenticatable, Authorizable, CanResetPassword, Notifiable;
protected $table = 'users';
protected $fillable = ['name', 'email', 'password'];
protected $hidden = ['password', 'remember_token'];
public function receivesBroadcastNotificationsOn()
{
return 'notification.' . $this->id;
}
public function getJWTIdentifier()
{
return $this->getKey();
}
public function getJWTCustomClaims()
{
return [];
}
}
发送通知
首先我们给 App\Models\Comment
Model 注册一个观察者事件监听,当用户新建一条评论的时候我们给文章作者发送一条通知。
<?php
namespace App\Providers;
use App\Models\Comment;
use App\Observers\CommentObserver;
use Illuminate\Support\ServiceProvider;
class AppServiceProvider extends ServiceProvider
{
public function boot()
{
Comment::observe(CommentObserver::class);
}
}
<?php
namespace App\Observers;
use App\Models\Article;
use App\Models\Comment;
use App\Notifications\CommentArticle;
class CommentObserver
{
public function created(Comment $comment)
{
if ($comment->target_type == Article::class) {
if ($comment->target->author_id != $comment->user_id) {
$comment->target->author->notify(new CommentArticle($comment));
}
}
}
}
websocket 服务
socket.io 支持的协议版本(4)和 微信小程序 websocket 协议版本(13)不一致,微信小程序 websocket 是无法连上服务的,所以我们选用 websockets/ws 来做 websocket 服务
安装相关依赖
npm i -S ws ioredis ini jsonwebtoken
我们在 Laravel 项目根目录下新建一个server.js
const WebSocket = require('ws'); // socket.io 支持的协议版本(4)和 微信小程序 websocket 协议版本(13)不一致,所以选用ws
const Redis = require('ioredis');
const fs = require('fs');
const ini = require('ini');
const jwt = require('jsonwebtoken');
const url = require('url');
const config = ini.parse(fs.readFileSync('./.env', 'utf8')); // 读取.env配置
const redis = new Redis({
port: env('REDIS_PORT', 6379), // Redis port
host: env('REDIS_HOST', '127.0.0.1'), // Redis host
// family: 4, // 4 (IPv4) or 6 (IPv6)
password: env('REDIS_PASSWORD', null),
db: 0,
});
const wss = new WebSocket.Server({
port: 6001,
clientTracking: false,
verifyClient({req}, cb) {
try {
const urlParams = url.parse(req.url, true);
const token = urlParams.query.token || req.headers.authorization.split(' ')[1];
const jwtSecret = env('JWT_SECRET');
const algorithm = env('JWT_ALGO', 'HS256');
const {sub, nbf, exp} = jwt.verify(token, jwtSecret, {algorithm});
if (Date.now() / 1000 > exp) {
cb(false, 401, 'token已过期.')
}
if (Date.now() / 1000 < nbf) {
cb(false, 401, 'token未到生效时间.')
}
if (!sub) {
cb(false, 401, '无法验证令牌签名.')
}
cb(true)
} catch (e) {
console.info(e);
cb(false, 401, 'Token could not be parsed from the request.');
}
},
});
const clients = {};
wss.on('connection', (ws, req) => {
try {
const urlParams = url.parse(req.url, true);
const token = urlParams.query.token || req.headers.authorization.split(' ')[1];
const jwtSecret = env('JWT_SECRET');
const algorithm = env('JWT_ALGO', 'HS256');
const {sub} = jwt.verify(token, jwtSecret, {algorithm});
const uuid = sub;
ws.uuid = uuid;
if (!clients[uuid]) {
clients[uuid] = [];
}
clients[uuid].push(ws);
} catch (e) {
ws.close();
}
ws.on('message', message => { // 接收消息事件
if (ws.uuid) {
console.info('[%s] message:%s %s', getNowDateTimeString(), ws.uuid, message);
}
});
ws.on('close', () => { // 关闭链接事件
if (ws.uuid) {
console.info('[%s] closed:%s', getNowDateTimeString(), ws.uuid);
const wss = clients[ws.uuid];
if (wss instanceof Array) {
const index = wss.indexOf(ws);
if (index > -1) {
wss.splice(index, 1);
if (wss.length === 0) {
delete clients[ws.uuid];
}
}
}
}
})
});
// redis 订阅
redis.psubscribe('*', function (err, count) {
});
redis.on('pmessage', (subscrbed, channel, message) => { // 接收 laravel 推送的消息
console.info('[%s] %s %s', getNowDateTimeString(), channel, message);
const {event} = JSON.parse(message);
const uuid = channel.split('.')[1];
const wss = clients[uuid];
switch (event) {
case 'Illuminate\\Notifications\\Events\\BroadcastNotificationCreated':
case 'App\\Events\\WechatScanLogin':
if (wss instanceof Array) {
wss.forEach(ws => {
if (ws.readyState === 1) {
ws.send(message);
}
});
}
break;
}
});
function env(key, def = '') {
return config[key] || def
}
function getNowDateTimeString() {
const date = new Date();
return `${date.getFullYear()}-${date.getMonth() + 1}-${date.getDate()} ${date.getHours()}:${date.getMinutes()}:${date.getSeconds()}`;
}
启动服务
node server.js
Nginx配置实现ssl反向代理
upstream websocket {
server 127.0.0.1:6001; # websocket 服务
}
server {
listen 80;
server_name xxx.com www.xxx.com;
rewrite ^(.*) https://www.xxx.com$1 permanent;
}
server {
listen 443 ssl;
server_name xxx.com www.xxx.com;
root "/www/code/blog";
index index.html index.htm;
#add_header Content-Security-Policy "default-src 'self';script-src 'self' 'unsafe-inline' 'unsafe-eval' hm.baidu.com;style-src 'self' 'unsafe-inline';img-src 'self' data: *.einsition.com http://*.einsition.com hm.baidu.com;font-src 'self' data:;";
add_header X-Content-Type-Options "nosniff";
add_header X-XSS-Protection "1";
charset utf-8;
set $realip $remote_addr;
if ($http_x_forwarded_for ~ "^(\d+\.\d+\.\d+\.\d+)") {
set $realip $1;
}
fastcgi_param REMOTE_ADDR $realip;
location ~ ^/api/ {
rewrite /api/(.+)$ /$1 break;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_pass https://api.xxx.com;
}
location /wss {
access_log /www/log/nginx/www.xxx.com-websocket-access.log;
error_log /www/log/nginx/www.einsition.com-websocket-error.log;
proxy_pass http://websocket; # 代理到 websocket 服务
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header Host $host;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "Upgrade";
rewrite /wss/(.*) /$1 break;
proxy_redirect off;
proxy_read_timeout 300s;
}
location / {
#if ($http_user_agent ~* "Baiduspider|360Spider|Sogou web spider") {
# proxy_pass http://spider.xxx.com;
#}
try_files $uri $uri/ /index.html?$query_string;
}
location = /favicon.ico { access_log off; log_not_found off; }
location = /robots.txt { access_log off; log_not_found off; }
access_log /www/log/nginx/www.xxx.com-access.log main;
error_log /www/log/nginx/www.xxx.com-error.log error;
sendfile off;
gzip on;
gzip_static on;
gzip_min_length 5k;
gzip_buffers 4 16k;
gzip_http_version 1.1;
gzip_comp_level 4;
gzip_vary on;
gzip_types text/plain application/javascript text/css application/json image/jpeg image/gif image/png;
client_max_body_size 10m;
location ~ /\.ht {
deny all;
}
ssl_certificate /www/cert/blog/ssl.pem;
ssl_certificate_key /www/cert/blog/ssl.key;
ssl_session_timeout 5m;
ssl_ciphers ECDHE-RSA-AES128-GCM-SHA256:ECDHE:ECDH:AES:HIGH:!NULL:!aNULL:!MD5:!ADH:!RC4;
ssl_protocols TLSv1 TLSv1.1 TLSv1.2;
ssl_prefer_server_ciphers on;
}
websocket url:wss://www.xxx.com/wss
重启 nginx 服务
service nginx reload
微信小程序连接 websocket
我们新建一个 utils/websocket.js
文件:
import regeneratorRuntime from './runtime'
const { getAuthorization } = require('./authority')
const config = require('../config')
let socketTask = null
function createWebSocket () {
(async () => {
/*
* readyState:
* 0: 正在建立连接连接。
* 1: 连接成功建立,可以进行通信。
* 2: 连接正在进行关闭握手,即将关闭。
* 3: 连接已经关闭或者根本没有建立。
*/
if (socketTask && socketTask.readyState !== 3) {
return false
}
socketTask = await wxConnectSocket(config.socketUrl, {
header: {
Authorization: getAuthorization(),
},
})
heartbeat.reset().start() // 心跳检测重置
socketTask.onMessage(({ data: msg }) => { // 通知消息
heartbeat.reset().start()
const { event, data } = JSON.parse(msg)
switch (event) {
case 'Illuminate\\Notifications\\Events\\BroadcastNotificationCreated':
// 接收到通知消息
break
}
})
socketTask.onClose((data) => {
heartbeat.reset()
// createWebSocket()
})
})()
}
function getSocketTask() {
return socketTask
}
function wxConnectSocket(url, options) {
const socketTask = wx.connectSocket({
url,
...options,
})
wx.onSocketOpen(() => {
console.info('websocket opened!')
})
wx.onSocketClose(() => {
console.info('websocket fail!')
})
return new Promise((resolve, reject) => {
socketTask.onOpen(() => {
resolve(socketTask)
})
socketTask.onError(reject)
})
}
const heartbeat = {
timeout: 240000, // 4分钟触发一次心跳,避免 websocket 断线
pingTimeout: null,
reset () {
clearTimeout(this.pingTimeout)
return this;
},
start () {
this.pingTimeout = setTimeout(async () => {
await wxApiPromiseWrap(socketTask.send.bind(socketTask), { data: 'ping' }) // 这里发送一个心跳
this.reset().start()
}, this.timeout)
},
}
function wxApiPromiseWrap(fn, options = {}) {
return new Promise((resolve, reject) => {
fn({
...options,
success: resolve,
fail: reject,
})
})
}
module.exports = {
createWebSocket,
getSocketTask,
}
最后我们在登录完成后连接websocket就可以了
本作品采用《CC 协议》,转载必须注明作者和本文链接
推荐文章: