Laravel + 微信小程序 websocket 搭建广播消息系统

在现代的 Web 应用中很多场景都需要运用到即时通讯,比如常见的扫码登录,聊天室,广播消息等。

在过去,为了实现这种即时通讯(推送)通常都是使用Ajax轮询。轮询就是在指定的时间间隔内,进行HTTP 请求来获取数据,而这种方式会产生一些弊端,一方面产生过多的HTTP请求,占用带宽,增大服务器的相应,浪费资源,另一方面,因为不是每一次请求都会有数据变化(就像聊天室),所以就会造成请求的利用率低。

而 websocket 正好能够解决上面的弊端,它是一种双向协议,允许服务端主动推送信息到客户端。

Redis

在开始之前,我们需要开启一个 redis 服务,并在 Laravel 应用中进行配置启用,因为在整个流程中,我们需要借助 redis 的订阅和发布机制来实现即时通讯。

安装 Predis 库

composer require predis/predis

配置

Redis 在应用中的配置文件存储在 config/database.php,在这个文件中,你可以看到一个包含了 Redis 服务信息的 redis 数组:

'redis' => [
    'client' => 'predis',
    'default' => [
        'host' => env('REDIS_HOST', '127.0.0.1'),
        'password' => env('REDIS_PASSWORD', null),
        'port' => env('REDIS_PORT', 6379),
        'database' => 0,
    ],
],

广播

配置

所有关于事件广播的配置都保存在 config/broadcasting.php 配置文件中。 Laravel 自带了几个广播驱动: Pusher 、 Redis , 和一个用于本地开发与调试的 log 驱动,我们将使用 Redis 作为广播驱动,这里需要依赖 predis/predis 类库。

通知

首先我们需要先创建一个数据库表来存放通知,使用命令 notifications:table 生成包含特定表结构的迁移文件。

php artisan notifications:table
php artisan migrate

然后我们创建一个 CommentArticle 类用来广播用户的文章评论通知:

php artisan make:notification CommentArticle
<?php

namespace App\Notifications;

use App\Models\Comment;
use Illuminate\Bus\Queueable;
use Illuminate\Notifications\Messages\BroadcastMessage;
use Illuminate\Notifications\Notification;
use Illuminate\Contracts\Queue\ShouldQueue;

class CommentArticle extends Notification implements ShouldQueue
{
    use Queueable;

    protected $comment;

    public function __construct(Comment $comment)
    {
        $this->comment = $comment;
    }

    public function via()
    {
        return ['database', 'broadcast']; // 指定 broadcast 频道系统会把通知消息广播出去
    }

    public function toArray()
    {
        return [
            'form_id' => $this->comment->id, // 评论id
            'form_user_id' => $this->comment->user_id, // 评论用户id
            'form_user_name' => $this->comment->user->name, // 评论用户名
            'form_user_avatar' => $this->comment->user->user_info->avatarUrl,
            'content' => $this->comment->content, // 评论内容
            'target_id' => $this->comment->target_id, // 文章id
            'target_name' => $this->comment->target->title, // 文章标题
        ];
    }
}

toArray 方法可以同时被 databasebroadcast 渠道调用,如果你希望 databasebroacast 两个渠道有不同的数组展现方式,你需要定义 toDatabase 或者 toBroadcast 以取代定义 toArray 方法。

自定义通知消息广播渠道

自定义通知实体接受其广播台通知的频道,我们需要在通知实体上定义一个 receivesBroadcastNotificationsOn 方法

编辑 App\Models\User

<?php

namespace App\Models;

use Eloquent;
use Illuminate\Contracts\Auth\Authenticatable as AuthenticatableContract;
use Illuminate\Contracts\Auth\Access\Authorizable as AuthorizableContract;
use Illuminate\Contracts\Auth\CanResetPassword as CanResetPasswordContract;
use Illuminate\Notifications\Notifiable;
use Illuminate\Auth\Authenticatable;
use Illuminate\Foundation\Auth\Access\Authorizable;
use Illuminate\Auth\Passwords\CanResetPassword;
use Tymon\JWTAuth\Contracts\JWTSubject;

class User extends Eloquent implements
    AuthenticatableContract,
    AuthorizableContract,
    CanResetPasswordContract,
    JWTSubject
{
    use Authenticatable, Authorizable, CanResetPassword, Notifiable;

    protected $table = 'users';

    protected $fillable = ['name', 'email', 'password'];

    protected $hidden = ['password', 'remember_token'];

    public function receivesBroadcastNotificationsOn()
    {
        return 'notification.' . $this->id;
    }

    public function getJWTIdentifier()
    {
        return $this->getKey();
    }

    public function getJWTCustomClaims()
    {
        return [];
    }
}

发送通知

首先我们给 App\Models\Comment Model 注册一个观察者事件监听,当用户新建一条评论的时候我们给文章作者发送一条通知。

<?php

namespace App\Providers;

use App\Models\Comment;
use App\Observers\CommentObserver;
use Illuminate\Support\ServiceProvider;

class AppServiceProvider extends ServiceProvider
{
    public function boot()
    {
        Comment::observe(CommentObserver::class);
    }
}
<?php

namespace App\Observers;

use App\Models\Article;
use App\Models\Comment;
use App\Notifications\CommentArticle;

class CommentObserver
{
    public function created(Comment $comment)
    {
        if ($comment->target_type == Article::class) {
            if ($comment->target->author_id != $comment->user_id) {
                $comment->target->author->notify(new CommentArticle($comment));
            }
        }
    }
}

websocket 服务

socket.io 支持的协议版本(4)和 微信小程序 websocket 协议版本(13)不一致,微信小程序 websocket 是无法连上服务的,所以我们选用 websockets/ws 来做 websocket 服务

安装相关依赖

npm i -S ws ioredis ini jsonwebtoken

我们在 Laravel 项目根目录下新建一个server.js

const WebSocket = require('ws'); // socket.io 支持的协议版本(4)和 微信小程序 websocket 协议版本(13)不一致,所以选用ws
const Redis = require('ioredis');
const fs = require('fs');
const ini = require('ini');
const jwt = require('jsonwebtoken');
const url = require('url');

const config = ini.parse(fs.readFileSync('./.env', 'utf8')); // 读取.env配置

const redis = new Redis({
    port: env('REDIS_PORT', 6379),          // Redis port
    host: env('REDIS_HOST', '127.0.0.1'),   // Redis host
    // family: 4,           // 4 (IPv4) or 6 (IPv6)
    password: env('REDIS_PASSWORD', null),
    db: 0,
});

const wss = new WebSocket.Server({
    port: 6001,
    clientTracking: false,
    verifyClient({req}, cb) {
        try {
            const urlParams = url.parse(req.url, true);
            const token = urlParams.query.token || req.headers.authorization.split(' ')[1];
            const jwtSecret = env('JWT_SECRET');
            const algorithm = env('JWT_ALGO', 'HS256');

            const {sub, nbf, exp} = jwt.verify(token, jwtSecret, {algorithm});

            if (Date.now() / 1000 > exp) {
                cb(false, 401, 'token已过期.')
            }

            if (Date.now() / 1000 < nbf) {
                cb(false, 401, 'token未到生效时间.')
            }

            if (!sub) {
                cb(false, 401, '无法验证令牌签名.')
            }

            cb(true)
        } catch (e) {
            console.info(e);
            cb(false, 401, 'Token could not be parsed from the request.');
        }

    },
});

const clients = {};

wss.on('connection', (ws, req) => {
    try {
        const urlParams = url.parse(req.url, true);
        const token = urlParams.query.token || req.headers.authorization.split(' ')[1];
        const jwtSecret = env('JWT_SECRET');
        const algorithm = env('JWT_ALGO', 'HS256');

        const {sub} = jwt.verify(token, jwtSecret, {algorithm});
        const uuid = sub;

        ws.uuid = uuid;
        if (!clients[uuid]) {
            clients[uuid] = [];
        }

        clients[uuid].push(ws);
    } catch (e) {
        ws.close();
    }

    ws.on('message', message => { // 接收消息事件
        if (ws.uuid) {
            console.info('[%s] message:%s %s', getNowDateTimeString(), ws.uuid, message);
        }
    });

    ws.on('close', () => { // 关闭链接事件
        if (ws.uuid) {
            console.info('[%s] closed:%s', getNowDateTimeString(), ws.uuid);

            const wss = clients[ws.uuid];

            if (wss instanceof Array) {
                const index = wss.indexOf(ws);

                if (index > -1) {
                    wss.splice(index, 1);
                    if (wss.length === 0) {
                        delete clients[ws.uuid];
                    }
                }
            }
        }
    })
});

// redis 订阅
redis.psubscribe('*', function (err, count) {
});

redis.on('pmessage', (subscrbed, channel, message) => { // 接收 laravel 推送的消息
    console.info('[%s] %s %s', getNowDateTimeString(), channel, message);

    const {event} = JSON.parse(message);
    const uuid = channel.split('.')[1];
    const wss = clients[uuid];

    switch (event) {
        case 'Illuminate\\Notifications\\Events\\BroadcastNotificationCreated':
        case 'App\\Events\\WechatScanLogin':
            if (wss instanceof Array) {
                wss.forEach(ws => {
                    if (ws.readyState === 1) {
                        ws.send(message);
                    }
                });
            }
            break;
    }
});

function env(key, def = '') {
    return config[key] || def
}

function getNowDateTimeString() {
    const date = new Date();
    return `${date.getFullYear()}-${date.getMonth() + 1}-${date.getDate()} ${date.getHours()}:${date.getMinutes()}:${date.getSeconds()}`;
}

启动服务

node server.js

Nginx配置实现ssl反向代理

upstream websocket {
    server 127.0.0.1:6001; # websocket 服务
}

server {
    listen 80;
    server_name xxx.com www.xxx.com;
    rewrite ^(.*) https://www.xxx.com$1 permanent;
}

server {
    listen 443 ssl;
    server_name xxx.com  www.xxx.com;

    root "/www/code/blog";
    index index.html index.htm;

    #add_header Content-Security-Policy "default-src 'self';script-src 'self' 'unsafe-inline' 'unsafe-eval' hm.baidu.com;style-src 'self' 'unsafe-inline';img-src 'self' data: *.einsition.com http://*.einsition.com hm.baidu.com;font-src 'self' data:;";
    add_header X-Content-Type-Options "nosniff";
    add_header X-XSS-Protection "1";

    charset utf-8;

    set $realip $remote_addr;
    if ($http_x_forwarded_for ~ "^(\d+\.\d+\.\d+\.\d+)") {
        set $realip $1;
    }
    fastcgi_param REMOTE_ADDR $realip;

    location ~ ^/api/ {
        rewrite /api/(.+)$ /$1 break;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_pass https://api.xxx.com;
    }

    location /wss {
        access_log /www/log/nginx/www.xxx.com-websocket-access.log;
        error_log  /www/log/nginx/www.einsition.com-websocket-error.log;
        proxy_pass http://websocket; # 代理到 websocket 服务
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header Host $host;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "Upgrade";
        rewrite /wss/(.*) /$1 break;
        proxy_redirect off;
        proxy_read_timeout 300s;
    }

    location / {
        #if ($http_user_agent ~* "Baiduspider|360Spider|Sogou web spider") {
        #    proxy_pass http://spider.xxx.com;
        #}
        try_files $uri $uri/ /index.html?$query_string;
    }

    location = /favicon.ico { access_log off; log_not_found off; }
    location = /robots.txt  { access_log off; log_not_found off; }

    access_log /www/log/nginx/www.xxx.com-access.log main;
    error_log  /www/log/nginx/www.xxx.com-error.log error;

    sendfile off;

    gzip on;
    gzip_static on;
    gzip_min_length 5k;
    gzip_buffers 4 16k;
    gzip_http_version 1.1;
    gzip_comp_level 4;
    gzip_vary on;
    gzip_types text/plain application/javascript text/css application/json image/jpeg image/gif image/png;

    client_max_body_size 10m;

    location ~ /\.ht {
        deny all;
    }

    ssl_certificate /www/cert/blog/ssl.pem;
    ssl_certificate_key /www/cert/blog/ssl.key;
    ssl_session_timeout 5m;
    ssl_ciphers ECDHE-RSA-AES128-GCM-SHA256:ECDHE:ECDH:AES:HIGH:!NULL:!aNULL:!MD5:!ADH:!RC4;
    ssl_protocols TLSv1 TLSv1.1 TLSv1.2;
    ssl_prefer_server_ciphers on;
}

websocket url:wss://www.xxx.com/wss

重启 nginx 服务

service nginx reload

微信小程序连接 websocket

我们新建一个 utils/websocket.js 文件:

import regeneratorRuntime from './runtime'
const { getAuthorization } = require('./authority')
const config = require('../config')

let socketTask = null

function createWebSocket () {
  (async () => {
    /*
    * readyState:
    * 0: 正在建立连接连接。
    * 1: 连接成功建立,可以进行通信。
    * 2: 连接正在进行关闭握手,即将关闭。
    * 3: 连接已经关闭或者根本没有建立。
    */
    if (socketTask && socketTask.readyState !== 3) {
      return false
    }

    socketTask = await wxConnectSocket(config.socketUrl, {
      header: {
        Authorization: getAuthorization(),
      },
    })

    heartbeat.reset().start() // 心跳检测重置

    socketTask.onMessage(({ data: msg }) => { // 通知消息
      heartbeat.reset().start()
      const { event, data } = JSON.parse(msg)
      switch (event) {
        case 'Illuminate\\Notifications\\Events\\BroadcastNotificationCreated':
          // 接收到通知消息
          break
      }
    })

    socketTask.onClose((data) => {
      heartbeat.reset()
      // createWebSocket()
    })
  })()
}

function getSocketTask() {
  return socketTask
}

function wxConnectSocket(url, options) {
  const socketTask = wx.connectSocket({
    url,
    ...options,
  })

  wx.onSocketOpen(() => {
    console.info('websocket opened!')
  })

  wx.onSocketClose(() => {
    console.info('websocket fail!')
  })

  return new Promise((resolve, reject) => {
    socketTask.onOpen(() => {
      resolve(socketTask)
    })

    socketTask.onError(reject)
  })
}

const heartbeat = {
  timeout: 240000, // 4分钟触发一次心跳,避免 websocket 断线
  pingTimeout: null,
  reset () {
    clearTimeout(this.pingTimeout)
    return this;
  },
  start () {
    this.pingTimeout = setTimeout(async () => {
      await wxApiPromiseWrap(socketTask.send.bind(socketTask), { data: 'ping' }) // 这里发送一个心跳
      this.reset().start()
    }, this.timeout)
  },
}

function wxApiPromiseWrap(fn, options = {}) {
  return new Promise((resolve, reject) => {
    fn({
      ...options,
      success: resolve,
      fail: reject,
    })
  })
}

module.exports = {
  createWebSocket,
  getSocketTask,
}

最后我们在登录完成后连接websocket就可以了

本作品采用《CC 协议》,转载必须注明作者和本文链接
本帖由系统于 5年前 自动加精
《L01 基础入门》
我们将带你从零开发一个项目并部署到线上,本课程教授 Web 开发中专业、实用的技能,如 Git 工作流、Laravel Mix 前端工作流等。
《L03 构架 API 服务器》
你将学到如 RESTFul 设计风格、PostMan 的使用、OAuth 流程,JWT 概念及使用 和 API 开发相关的进阶知识。
讨论数量: 11

Workerman 我用这个,似乎更加简单

5年前 评论
jdzor 4年前
hemaker 3年前
TigerLin

:+1:赞!

5年前 评论
一亩三分地儿

WebSocket Version13 是 2011 年发布的 Socket.IO 2.0 是2017年发布的理应支持才对。

4年前 评论
yanthink (楼主) 4年前

个人建议独立使用 workerman 做 websocket 服务器,使用 GatewayWorker 能快速开发相关功能,前端通过调用 lLaravel 或者 tp 等 php 常规框架的接口,接口在调用 websocket 服务器,就能完美解决 websocket 通讯问题,实现了聊天室、客服,包括小程序中的各种实时交互与应用相关的功能。可以看 demo(https://element.wmhello.cn)

8个月前 评论

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!