Laravel + 微信小程序 websocket 搭建广播消息系统

在现代的 Web 应用中很多场景都需要运用到即时通讯,比如常见的扫码登录,聊天室,广播消息等。

在过去,为了实现这种即时通讯(推送)通常都是使用Ajax轮询。轮询就是在指定的时间间隔内,进行HTTP 请求来获取数据,而这种方式会产生一些弊端,一方面产生过多的HTTP请求,占用带宽,增大服务器的相应,浪费资源,另一方面,因为不是每一次请求都会有数据变化(就像聊天室),所以就会造成请求的利用率低。

而 websocket 正好能够解决上面的弊端,它是一种双向协议,允许服务端主动推送信息到客户端。

Redis

在开始之前,我们需要开启一个 redis 服务,并在 Laravel 应用中进行配置启用,因为在整个流程中,我们需要借助 redis 的订阅和发布机制来实现即时通讯。

安装 Predis 库:

composer require predis/predis

配置:

Redis 在应用中的配置文件存储在 config/database.php,在这个文件中,你可以看到一个包含了 Redis 服务信息的 redis 数组:

'redis' => [
    'client' => 'predis',
    'default' => [
        'host' => env('REDIS_HOST', '127.0.0.1'),
        'password' => env('REDIS_PASSWORD', null),
        'port' => env('REDIS_PORT', 6379),
        'database' => 0,
    ],
],

广播

配置:

所有关于事件广播的配置都保存在 config/broadcasting.php 配置文件中。 Laravel 自带了几个广播驱动: Pusher 、 Redis , 和一个用于本地开发与调试的 log 驱动,我们将使用 Redis 作为广播驱动,这里需要依赖 predis/predis 类库。

通知

首先我们需要先创建一个数据库表来存放通知,使用命令 notifications:table 生成包含特定表结构的迁移文件。

php artisan notifications:table
php artisan migrate

然后我们创建一个 CommentArticle 类用来广播用户的文章评论通知:

php artisan make:notification CommentArticle
<?php

namespace App\Notifications;

use App\Models\Comment;
use Illuminate\Bus\Queueable;
use Illuminate\Notifications\Messages\BroadcastMessage;
use Illuminate\Notifications\Notification;
use Illuminate\Contracts\Queue\ShouldQueue;

class CommentArticle extends Notification implements ShouldQueue
{
    use Queueable;

    protected $comment;

    public function __construct(Comment $comment)
    {
        $this->comment = $comment;
    }

    public function via()
    {
        return ['database', 'broadcast']; // 指定 broadcast 频道系统会把通知消息广播出去
    }

    public function toArray()
    {
        return [
            'form_id' => $this->comment->id, // 评论id
            'form_user_id' => $this->comment->user_id, // 评论用户id
            'form_user_name' => $this->comment->user->name, // 评论用户名
            'form_user_avatar' => $this->comment->user->user_info->avatarUrl,
            'content' => $this->comment->content, // 评论内容
            'target_id' => $this->comment->target_id, // 文章id
            'target_name' => $this->comment->target->title, // 文章标题
        ];
    }
}

toArray 方法可以同时被 databasebroadcast 渠道调用,如果你希望 databasebroacast 两个渠道有不同的数组展现方式,你需要定义 toDatabase 或者 toBroadcast 以取代定义 toArray 方法。

自定义通知消息广播渠道:

自定义通知实体接受其广播台通知的频道,我们需要在通知实体上定义一个 receivesBroadcastNotificationsOn 方法

编辑 App\Models\User

<?php

namespace App\Models;

use Eloquent;
use Illuminate\Contracts\Auth\Authenticatable as AuthenticatableContract;
use Illuminate\Contracts\Auth\Access\Authorizable as AuthorizableContract;
use Illuminate\Contracts\Auth\CanResetPassword as CanResetPasswordContract;
use Illuminate\Notifications\Notifiable;
use Illuminate\Auth\Authenticatable;
use Illuminate\Foundation\Auth\Access\Authorizable;
use Illuminate\Auth\Passwords\CanResetPassword;
use Tymon\JWTAuth\Contracts\JWTSubject;

class User extends Eloquent implements
    AuthenticatableContract,
    AuthorizableContract,
    CanResetPasswordContract,
    JWTSubject
{
    use Authenticatable, Authorizable, CanResetPassword, Notifiable;

    protected $table = 'users';

    protected $fillable = ['name', 'email', 'password'];

    protected $hidden = ['password', 'remember_token'];

    public function receivesBroadcastNotificationsOn()
    {
        return 'notification.' . $this->id;
    }

    public function getJWTIdentifier()
    {
        return $this->getKey();
    }

    public function getJWTCustomClaims()
    {
        return [];
    }
}

发送通知:

首先我们给 App\Models\Comment Model 注册一个观察者事件监听,当用户新建一条评论的时候我们给文章作者发送一条通知。

<?php

namespace App\Providers;

use App\Models\Comment;
use App\Observers\CommentObserver;
use Illuminate\Support\ServiceProvider;

class AppServiceProvider extends ServiceProvider
{
    public function boot()
    {
        Comment::observe(CommentObserver::class);
    }
}
<?php

namespace App\Observers;

use App\Models\Article;
use App\Models\Comment;
use App\Notifications\CommentArticle;

class CommentObserver
{
    public function created(Comment $comment)
    {
        if ($comment->target_type == Article::class) {
            if ($comment->target->author_id != $comment->user_id) {
                $comment->target->author->notify(new CommentArticle($comment));
            }
        }
    }
}

websocket 服务

socket.io 支持的协议版本(4)和 微信小程序 websocket 协议版本(13)不一致,微信小程序 websocket 是无法连上服务的,所以我们选用 websockets/ws 来做 websocket 服务

安装相关依赖:

npm i -S ws ioredis ini jsonwebtoken

我们在 Laravel 项目根目录下新建一个server.js:

const WebSocket = require('ws')
const Redis = require('ioredis')
const fs = require('fs')
const ini = require('ini')
const jwt = require('jsonwebtoken')

const config = ini.parse(fs.readFileSync('./.env', 'utf8')) // 读取.env配置

const wss = new WebSocket.Server({
    port: 6001,
    clientTracking: false,
    verifyClient ({ req }, cb) {
        try {
            const { authorization } = req.headers
            const token = authorization.split(' ')[1]
            const jwtSecret = env('JWT_SECRET')
            const algorithm = env('JWT_ALGO', 'HS256')

            const { sub, nbf, exp } = jwt.verify(token, jwtSecret, { algorithm })

            // if (Date.now() / 1000 + 30 * 60 > exp) {
            //     cb(false, 401, 'token已过期.')
            // }
            //
            // if (Date.now() /1000 < nbf) {
            //     cb(false, 401, 'token在(nbf)时间之前不能被接收处理.')
            // }

            if (!(sub > 0)) {
                cb(false, 401, '无法验证令牌签名.')
            }

            cb(true)
        } catch (e) {
            cb(false, 401, 'Token could not be parsed from the request.')
        }

    },
})

const clients = {}
wss.on('connection', (ws, req) => {
    try {
        const { authorization } = req.headers
        const token = authorization.split(' ')[1]
        const jwtSecret = env('JWT_SECRET')
        const algorithm = env('JWT_ALGO', 'HS256')

        const { sub } = jwt.verify(token, jwtSecret, {algorithm})

        if (clients[sub] && clients[sub].readyState === 1) {
            try {
                clients[sub].close()
            } catch (e) {
                //
            }
        }

        ws.user_id = sub
        clients[sub] = ws
    } catch (e) {
        ws.close()
    }

    ws.on('message', message => { // 接收消息事件
        console.info('message:%s', message)
    }) 

    ws.on('close', () => { // 关闭链接事件
        if (ws.user_id) {
            try {
                delete clients[ws.user_id]
            } catch (e) {
                //
            }
        }
    })
})

// redis 订阅
const redis = new Redis({
    port: env('REDIS_PORT', 6379),          // Redis port
    host: env('REDIS_HOST', '127.0.0.1'),   // Redis host
    // family: 4,           // 4 (IPv4) or 6 (IPv6)
    password: env('REDIS_PASSWORD', null),
    db: 0,
})

redis.psubscribe('*', function (err, count) {
})

redis.on('pmessage', (subscrbed, channel, message) => { // 接收 laravel 推送的消息
    console.info('[%s] %s %s', getNowDateTimeString(), channel, message)

    const { event, data } = JSON.parse(message)
    switch (event) {
        case 'Illuminate\\Notifications\\Events\\BroadcastNotificationCreated':
            const userId = channel.split('.')[1]
            if (clients[userId] && clients[userId].readyState === 1) { // 只给对应的用户发送通知
                clients[userId].send(message)
            }
        break
    }
})

function env(key, def = '') {
    return config[key] || def
}

function getNowDateTimeString () {
    const date = new Date()
    return `${date.getFullYear()}-${date.getMonth() + 1}-${date.getDate()} ${date.getHours()}:${date.getMinutes()}:${date.getSeconds()}`
}

启动服务:

node server.js

Nginx配置实现ssl反向代理:

upstream websocket {
    server 127.0.0.1:6001; // websocket 服务
}

server {
    listen 80;
    server_name xxx.com www.xxx.com;
    rewrite ^(.*) https://www.xxx.com$1 permanent;
}

server {
    listen 443 ssl;
    server_name xxx.com  www.xxx.com;

    root "/www/code/blog";
    index index.html index.htm;

    #add_header Content-Security-Policy "default-src 'self';script-src 'self' 'unsafe-inline' 'unsafe-eval' hm.baidu.com;style-src 'self' 'unsafe-inline';img-src 'self' data: *.xxx.com http://*.xxx.com hm.baidu.com;font-src 'self' data:;";
    add_header X-Content-Type-Options "nosniff";
    add_header X-XSS-Protection "1";

    charset utf-8;

    set $realip $remote_addr;
    if ($http_x_forwarded_for ~ "^(\d+\.\d+\.\d+\.\d+)") {
        set $realip $1;
    }
    fastcgi_param REMOTE_ADDR $realip;

    location ~ ^/api/ {
        rewrite /api/(.+)$ /$1 break;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_pass https://api.xxx.com;
    }

    location /wss {
        access_log /www/log/nginx/www.xxx.com-websocket-access.log;
        error_log  /www/log/nginx/www.xxx.com-websocket-error.log;
        proxy_pass http://websocket; // 代理到 websocket 服务
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header Host $host;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "Upgrade";
        rewrite /wss/(.*) /$1 break;
        proxy_redirect off;
        proxy_read_timeout 300s;
    }

    location / {
        #if ($http_user_agent ~* "Baiduspider|360Spider|Sogou web spider") {
        #    proxy_pass http://spider.einsition.com;
        #}
        try_files $uri $uri/ /index.html?$query_string;
    }

    location = /favicon.ico { access_log off; log_not_found off; }
    location = /robots.txt  { access_log off; log_not_found off; }

    access_log /www/log/nginx/www.xxx.com-access.log main;
    error_log  /www/log/nginx/www.xxx.com-error.log error;

    sendfile off;

    gzip on;
    gzip_static on;
    gzip_min_length 5k;
    gzip_buffers 4 16k;
    gzip_http_version 1.1;
    gzip_comp_level 4;
    gzip_vary on;
    gzip_types text/plain application/javascript text/css application/json image/jpeg image/gif image/png;

    client_max_body_size 10m;

    location ~ /\.ht {
        deny all;
    }

    ssl_certificate /www/cert/blog/ssl.pem;
    ssl_certificate_key /www/cert/blog/ssl.key;
    ssl_session_timeout 5m;
    ssl_ciphers ECDHE-RSA-AES128-GCM-SHA256:ECDHE:ECDH:AES:HIGH:!NULL:!aNULL:!MD5:!ADH:!RC4;
    ssl_protocols TLSv1 TLSv1.1 TLSv1.2;
    ssl_prefer_server_ciphers on;
}

websocket url:wss://www.xxx.com/wss

重启 nginx 服务:

service nginx reload

微信小程序连接 websocket

我们新建一个 utils/websocket.js 文件:

import regeneratorRuntime from './runtime'
const { getAuthorization } = require('./authority')
const config = require('../config')

let socketTask = null

function createWebSocket () {
  (async () => {
    /*
    * readyState:
    * 0: 正在建立连接连接。
    * 1: 连接成功建立,可以进行通信。
    * 2: 连接正在进行关闭握手,即将关闭。
    * 3: 连接已经关闭或者根本没有建立。
    */
    if (socketTask && socketTask.readyState !== 3) {
      return false
    }

    socketTask = await wxConnectSocket(config.socketUrl, {
      header: {
        Authorization: getAuthorization(),
      },
    })

    heartbeat.reset().start() // 心跳检测重置

    socketTask.onMessage(({ data: msg }) => { // 通知消息
      heartbeat.reset().start() // 拿到任何消息都说明当前连接是正常的
      const { event, data } = JSON.parse(msg)
      switch (event) {
        case 'Illuminate\\Notifications\\Events\\BroadcastNotificationCreated':
          addUnreadCount()
          break
      }
    })

    socketTask.onClose((data) => {
      heartbeat.reset()
      // createWebSocket()
    })
  })()
}

function getSocketTask() {
  return socketTask
}

function wxConnectSocket(url, options) {
  const socketTask = wx.connectSocket({
    url,
    ...options,
  })

  wx.onSocketOpen(() => {
    console.info('websocket opened!')
  })

  wx.onSocketClose(() => {
    console.info('websocket fail!')
  })

  return new Promise((resolve, reject) => {
    socketTask.onOpen(() => {
      resolve(socketTask)
    })

    socketTask.onError(reject)
  })
}

const heartbeat = {
  timeout: 240000, // 4分钟触发一次心跳,避免 websocket 断线
  pingTimeout: null,
  reset () {
    clearTimeout(this.pingTimeout)
    return this;
  },
  start () {
    this.pingTimeout = setTimeout(async () => {
      await wxApiPromiseWrap(socketTask.send.bind(socketTask), { data: 'ping' }) // 这里发送一个心跳
      this.reset().start()
    }, this.timeout)
  },
}

function wxApiPromiseWrap(fn, options = {}) {
  return new Promise((resolve, reject) => {
    fn({
      ...options,
      success: resolve,
      fail: reject,
    })
  })
}

module.exports = {
  createWebSocket,
  getSocketTask,
}

最后我们在登录完成后连接websocket就可以了

本帖由系统于 3个月前 自动加精
yanthink
《L04 微信小程序从零到发布》
从小程序个人账户申请开始,带你一步步进行开发一个微信小程序,直到提交微信控制台上线发布。
《L05 电商实战》
从零开发一个电商项目,功能包括电商后台、商品 & SKU 管理、购物车、订单管理、支付宝支付、微信支付、订单退款流程、优惠券等
讨论数量: 6
深入浅出

:+1:赞!

3个月前 评论
Reason_bobo

!赞

3个月前 评论
XiaohuiLam

:+1:

3个月前 评论

Workerman 我用这个,似乎更加简单

3个月前 评论
jdzor: gateway 更好用 1个月前

请勿发布不友善或者负能量的内容。与人为善,比聪明更重要!