uniapp 小程序 websocket 前后端
自己编写 谨慎用于生产环境
6.27 修复运行中出现的异常导致的接不到消息
7.4 优化
8.22 线上运行1个月版本 支持多端登录多端推送,设计比较冗余,请多指教
2021/08/13 线上运行1年+,运行稳定
场景
uniapp
小程序
需求
列表区自动推送更新,系统通知,动态通知,聊天室等..
描述
- 系统级的消息推送
- 页面级的推送,列表的自动刷新;聊天页面。
- 断线重连
- laravel + swoole
简单的 socket.js
本文是笔者自己摸索出来的,可靠性自然比不上socket.io
,无奈目前只有微信小程序有开源的socket.io
。
import { isJson } from '@/utils/socket/helper.js' // 见下方
import jwt from '@/utils/auth/jwt.js' // 管理用户登陆信息的js,见其他文章
var Pool = {}; // Promise池
var IsConnected = false; // 是否connect成功;
var IsOpen = false; // 是否打开,必须open之后才能发送消息;
var IsLogin = false; //是否登录到socket进行fd绑定
var heartTimer = null; // 心跳句柄 如果服务器连接不可用 则重新连接
var reConnectTimer = null; // 重连句柄
var user=null; // 用户
var callback={}; // 自定义事件
var that;
export default class PromiseWebSocket
{
config = {
url: '',
debug: false,
reConnectTime: 5000, // 断线重连检测时间间隔
heartTime: 1 * 60 * 1000, //心跳间隔
};
// socket 重连
_connectionLoop () {
reConnectTimer = setInterval(
() => {
if( getApp().globalData.isLogin !== true ){
this.config.debug && console.warn('用户未登录!');
return;
}
if (!IsConnected || !IsOpen || !IsLogin) {
this.config.debug && console.log('连接中..')
this.initSocket(
function () {
user = jwt.getUser();
if(user){
that._send('login',{u: 'user:'+ user.user_id, token: user.socket_token}).then(res=>{
IsLogin = true;
uni.showToast({icon:'none',title: '上线成功',duration:2000});
}).catch(err=>{
IsLogin = false;
});
}
}
)
}else{
this.config.debug && console.log('连接正常')
}
},
this.config.reConnectTime
)
}
// ----------初始化websocket连接----------
initSocket (success) {
if (IsConnected) {
this.config.debug && console.log('已经建立连接')
if(IsOpen){
// 必须在open之后才能发送消息
typeof success === 'function' && success()
}
return
}
uni.getNetworkType({
success: (res) => {
if (res.networkType === 'none') {
this.config.debug && console.log('网络异常,无法连接');
} else {
this.config.debug && console.log('网络正常,开始建立连接...');
uni.connectSocket({
url: this.config.url,
success: () => {
IsConnected = true;
this.config.debug && console.log('建立连接成功')
typeof success === 'function' && success()
this.config.debug && console.log('开始心跳...')
this._clearHeart();
this._startHeart();
},
fail: (err) => {
this.config.debug && console.log('建立连接失败',err);
}
});
}
}
})
}
constructor(config){
that = this;
this.config = {
url: config.url,
debug: config.debug || this.config.debug,
reConnectTime: config.reConnectTime || this.config.reConnectTime,
heartTime: config.heartTime || this.config.heartTime
};
uni.onSocketOpen((header) => {
IsConnected = true;
IsOpen = true;
this.config.debug && console.log('socket打开成功')
})
uni.onSocketMessage((e) => {
try{
const msg = isJson(e.data);
if(!msg){
this.config.debug && console.log('不是json对象'); return;
}else{
this.config.debug && console.log('收到消息:', msg)
}
const type = msg['type'];
if( type == 'app' ){
let event = msg['event'];
if( callback.hasOwnProperty(event) ){
callback[event](msg);
}
}
else if( type == 'respon' ){
let uuid = msg['event'];
if(!uuid){
this.config.debug && console.log('响应缺少event参数');
return;
}
if( !Pool[uuid] ){
this.config.debug && console.log('pool池中没有该event');
return;
}
let data = msg['data'] || null
if (data.error === 0) {
Pool[uuid].resolve(data);
} else {
Pool[uuid].reject(data);
}
delete Pool[uuid]
}
else if( type == 'ping' ){
this._response( msg['uuid'] )
}else{
this.config.debug && console.log('缺少type参数或无此事件!');
}
}catch (e) {
//console.log('socket on message',e)
}
})
uni.onSocketError((res) => {
this.config.debug && console.error('发生错误', res)
this._close();
})
uni.onSocketClose((res) => {
this.config.debug && console.error('连接被关闭', res)
this._close();
})
// 监听网络状态
uni.onNetworkStatusChange((res) => {
if (res.isConnected) {
this.config.debug && console.log('监听到有网络服务')
} else {
this.config.debug && console.log('监听到没有网络服务')
}
})
}
// 清理心跳
_clearHeart () {
clearInterval(heartTimer)
heartTimer = null
}
// 开始心跳
_startHeart () {
heartTimer = setInterval(() => {
if( user ){
this._send('ping').then(res=>{
IsLogin = true;
this.config.debug && console.log('socket身份验证正常');
}).catch(err=>{
IsLogin = false;
this.config.debug && console.log('socket身份验证失败重新登入');
});
}
}, this.config.heartTime)
}
/**
* 发送socket消息
* @param string event 事件名称 ask 响应式问答 | ping
* @param object data 请求数据 必须json对象或者空对象{}或者不传值
* @param object extraData 同级附加参数 注意event,data会被覆盖
*/
_send (event, data) {
let message = { event, data };
const uuid = (new Date()).getTime();
return new Promise((resolve, reject) => {
if ( IsConnected && IsOpen ) {
if (!Pool[uuid]) {
Pool[uuid] = { message, resolve, reject }
}
this.config.debug && console.log('发送消息:', message);
message.uuid = uuid;
uni.sendSocketMessage({
data: JSON.stringify(message),
success: (res) => {
that.config.debug && console.log(res,'sendSocketMessage成功..')
},
fail: (fail) => {
that._close();
that.config.debug && console.log(res,'sendSocketMessage失败..')
}
})
} else {
this.config.debug && console.log('PING..socket 未打开:', message);
}
})
}
// 响应服务器端主动ping
_response(uuid){
if(!uuid){
this.config.debug && console.log('缺少uuid');return;
}
let event = 'response';
let message = {event};
message.uuid = uuid;
if(IsConnected && IsOpen){
uni.sendSocketMessage({
data: JSON.stringify(message),
success: (res) => {
that.config.debug && console.log(res,'发送响应成功..');
},
fail: (fail) => {
that._close();
that.config.debug && console.log(fail,'发送响应失败..');
}
})
}else{
this.config.debug && console.log('RESPONSE..socket 未打开:', message);
}
}
// 主动关闭
_close (option) {
IsConnected = false;
IsOpen = false;
IsLogin = false;
Pool = {};
this._clearHeart();
uni.closeSocket(option);
}
// 添加自定义事件
on (event,func){
if(typeof func === 'function'){
callback[event] = func;
}
}
// 移除自定义事件
uninstall (event){
delete(callback[event]);
}
}
helper.js
/**
* 是否是json字符串 如果是直接返回json对象
* @param str
*/
exports.isJson = function (str) {
if (typeof str === 'object')
return str;
try {
str = str.replace(/\ufeff/g, "");
var obj = JSON.parse(str);
return !!(typeof obj === 'object' && obj) ? obj : false;
}
catch (e) {
return false;
}
};
app.vue 中在应用启动时建立 socket 连接
<script>
import Vue from 'vue'
import UniSocketPromise from "@/utils/socket/socket.js"
export default {
onLaunch: function() {
console.log('App Launch');
// 登录检测
var user = this.checkLogin();
if( !user ){this.globalData.isLogin = false;}
// websocket
this.globalData.socket = new UniSocketPromise({
url: "ws://4.2.7.2:9502",
debug: true,
reConnectTime: 5*1000,
heartTime: 30 * 1000
});
// 连接
this.globalData.socket._connectionLoop();
// 系统消息
var that = this;
this.globalData.socket.on('user_tabbar',function(msg){
let index = Number(msg.msg);
uni.setTabBarBadge({
index: index,
text:'新消息',
success:function(r) {},
fail:function(e) {
uni.setStorageSync('msg_tobbar',1)
}
})
})
},
onShow: function() {
console.log('App Show');
},
onHide: function() {
console.log('App Hide')
},
globalData:{
isLogin : true,
socket: null,
chatting: false,
innerAudioContext:null
}
}
</script>
<style>
</style>
页面级的连接
<template>
<view>
<view>chat 聊天页面</view>
<scroll-view scroll-y="true" :scroll-top="scrollTop" @scroll="scroll">
</scroll-view>
</view>
</template>
<script>
var that;
export default {
data() {
return {
chat_id:0,
scrollTop:0,
old: {
scrollTop: 0
},
}
},
onLoad(option){
that = this;
this.chat_id = option.chat_id;
getApp().globalData.socket.on('chat'+this.chat_id,function(msg){
console.log('聊天页面收到消息',msg)
that.pageInit(that.chat_id);
that.scrollTop = that.old.scrollTop
setTimeout(function(){
that.$nextTick(function() {
that.scrollTop = 99999
});
},500)
})
},
onShow() {
this.pageInit(this.chat_id);
},
onReady() {
this.scrollTop = 99999; // 拉到底部
},
onUnload(){
// 页面卸载时 卸载页面socket事件
console.log('chat on onUnload!!!!!')
getApp().globalData.socket.uninstall('chat'+this.chat_id)
},
methods: {
scroll:function(e){
this.old.scrollTop = e.detail.scrollTop
},
pageInit(chat_id){
//拉取聊天记录
}
},
components: {}
}
</script>
<style>
</style>
后端 laravel swoole
<?php
namespace App\Console\Commands;
use Illuminate\Console\Command;
use App\Library\SwConsTool;
class SwooleServer extends Command
{
protected $signature = 'swoole:server';
protected $description = 'swoole 系统消息';
public function __construct(){
parent::__construct();
}
protected $server;
public function handle()
{
$this->server = new \Swoole\WebSocket\Server("0.0.0.0", 6001);
$this->server->set([
'worker_num' => 2,
'debug_mode' => 1,
'daemonize' => true,
'log_file' => '/www/wwwroot/swoole_log.txt',
'heartbeat_idle_time' => 60, // 表示一个连接如果60秒内未向服务器发送任何数据,此连接将被强制关闭
'heartbeat_check_interval' => 10, // 表示每10秒遍历一次
]);
$this->server->on('start',function($server){
echo "swoole服务开启,重置....\n";
SwConsTool::removeAllFds();
SwConsTool::removeAllUser();
});
$this->server->on('WorkerStart', function (\Swoole\WebSocket\Server $server, $worker_id){
echo $worker_id."worker Start\n";
if($worker_id == 0){
// 定时ping
swoole_timer_tick(20000, function()use($server){
foreach ($server->connections as $fd) {
if ($server->isEstablished($fd)) {
$server->push($fd, $this->ping() );
}
}
});
}
});
$this->server->on('open', function (\Swoole\WebSocket\Server $server, $request){
echo "握手成功 fd{$request->fd}".date('Y-m-d H:i:s',time())."\n";
});
$this->server->on('message', function (\Swoole\WebSocket\Server $server, $frame{
echo "收到消息 {$frame->fd}:{$frame->data},opcode:{$frame->opcode},fin:{$frame->finish}\n";
$receive = json_decode($frame->data,true);
$uuid = @$receive['uuid']; // 消息唯一标识 毫秒时间戳
$event = @$receive['event']; // 事件名称
$message = @$receive['data']; // 携带的数据
try{
if( $event == 'login' ){
//{"event":"login","data":{"u":"user:6","token":""}}
try{
$u = $message['u'];
$token = $message['token'];
$this->clear_up($u);
$this->checkToken($u, $token);
SwConsTool::setOnline($u);
SwConsTool::addToGroup($u,$frame->fd);
echo 'U:'.$u.'登录成功'.PHP_EOL;
$server->push($frame->fd, $this->respon($uuid,0,'登录成功') );
} catch (\Exception $e) {
echo $e->getMessage().PHP_EOL;
echo $e->getTraceAsString().PHP_EOL;
$server->push($frame->fd, $this->respon($uuid,1,'出现错误,登录失败') );
}
}elseif( $event == 'ping' ){
try{
$user_key = SwConsTool::getKeyByFd($frame->fd);
if($user_key){
$check_bind = SwConsTool::isGroupMember($user_key, $frame->fd);
if($check_bind){
SwConsTool::setOnline($user_key);
echo 'U:'.$user_key.'PING检查连接可用'.PHP_EOL;
$server->push($frame->fd, $this->respon($uuid,0,'PING检查连接可用') );
}else{
echo 'U:'.$user_key.'PING检查连接不可用'.PHP_EOL;
$server->push($frame->fd, $this->respon($uuid,1,'PING检查连接不可用') );
}
}else{
echo 'U:'.$user_key.'PING检查连接不可用'.PHP_EOL;
$server->push($frame->fd, $this->respon($uuid,1,'PING检查连接不可用') );
}
} catch (\Exception $e) {
echo $e->getMessage().PHP_EOL;
echo $e->getTraceAsString().PHP_EOL;
$server->push($frame->fd, $this->respon($uuid,1,'PING检查连接服务器出现错误') );
}
}elseif( $event == 'response' ){}
} catch (\Exception $e) {
echo 'ONMESSAGE服务器出现错误'.PHP_EOL;
echo $e->getMessage().PHP_EOL;
echo $e->getTraceAsString().PHP_EOL;
$server->push($frame->fd, $this->respon($key,1,'ONMESSAGE服务器出现错误') );
}
});
$this->server->on('close', function ($server, $fd) {
echo "客户端 {$fd} 关闭\n";
SwConsTool::delFd($fd);
});
$this->server->on('request', function (\Swoole\Http\Request $request, \Swoole\Http\Response $response) {
$post = $request->post;
try{
var_dump('onRequest 收到消息:', $post);
$event = @$post['event']; //事件名称
$json2Arr = json_decode($post['msg'],true);
$post['msg'] = is_null($json2Arr) ? $post['msg'] : $json2Arr;
$to = @$post['to'];
switch (true) {
// 小程序底部tobar
case $event == 'user_tabbar':
if( $to != '' ){
$this->sendMsg($post, $to);
$response->end('ok');
}
break;
// 聊天
case substr($event,0,4) == 'chat':
if( $to != '' ){
$this->sendMsg($post, $to);
$response->end('ok');
}
break;
default:
$response->end('fail');
break;
}
} catch (\Exception $e) {
echo 'ON REQUEST服务器出现错误'.PHP_EOL;
echo $e->getMessage().PHP_EOL;
echo $e->getTraceAsString().PHP_EOL;
}
});
$this->server->start();
}
/**
* data 数组
* to 发送人 数组或,分割的字符串
*/
public function sendMsg($data, $to){
$to_list = is_array($to)? $to : explode(',',trim($to,','));
foreach( $to_list as $v){
$v = str_replace('online:','',$v);
$fds = SwConsTool::getFdsByKey($v);
foreach ($fds as $fd) {
if ($this->server->isEstablished($fd)) {
if( SwConsTool::getKeyByFd($fd) != $v ){
SwConsTool::remFdFromGroup($v, $fd);
}else{
$this->server->push($fd,json_encode($data));
}
}else{
SwConsTool::delFdFromGroup($v, $fd);
}
}
}
}
/**
* 应答 用于登陆和ping
* $key 唯一标识一条信息
* errno 是否成功
* body 响应内容
*/
public function respon($key,$errno,$body){
$respon['type'] = 'respon';
$respon['event'] = $key;
$respon['data'] = [
'error' => $errno,
'body' => $body
];
return json_encode($respon);
}
/**
* ping
* $uuid 唯一标识一条信息
* errno 是否成功
* body 响应内容
*/
public function ping(){
$ping['type'] = 'ping';
$ping['uuid'] = microtime(true);
return json_encode($ping);
}
/**
* 整理
*/
public function clear_up($u){
$group = SwConsTool::getFdsByKey($u);
foreach ($group as $fd) {
if ($this->server->isEstablished($fd)) {
if( SwConsTool::getKeyByFd($fd) != $u ){
SwConsTool::remFdFromGroup($u, $fd);
}
}else{
SwConsTool::delFdFromGroup($u, $fd);
}
}
}
/**
* $u user:1982
*/
public function checkToken($u, $token)
{
$u_arr = explode(':',$u);
list($role,$uid) = $u_arr;
$check_token = mt_rand(1111,9999);
if( $role == 'user'){ $check_token = md5($uid.'key_example');}
elseif($role=='xxx'){ $check_token = md5($uid.'key_example'); }
else{
echo 'U:'.$u.'不明身份'.PHP_EOL;
$this->server->push($frame->fd, $this->respon($key,1,'不明身份') );
}
if( $token != $check_token ){
echo 'U:'.$u.'连接密钥错误'.PHP_EOL;
$this->server->push($frame->fd, $this->respon($key,1,'连接密钥错误') );
}
}
}
fd 管理文件
<?php
namespace App\Library;
use Illuminate\Support\Facades\Redis;
/**
* 基本数据结构
*----------------------------------------------------------------------------------------
*
* fd1 => user:1983
* user:1983 => [1,2,3]
*
* online:user:1983 => 2020-01-01 00:00:00
*
*/
class SwConsTool {
/**
* 添加到用户的fd组 一个用户可以登录多端
* ----------------------------------------------------------------
*
* 如果该fd被占用的话 那么就先清除这个fd
*
* 加入组
*
* 双向绑定
*/
public static function addToGroup($key,$fd){
self::delFd($fd);
Redis::SADD($key,$fd);
Redis::SET('fd:'.$fd,$key);
}
/**
* 根据 fd 或者 key(用户) 获取绑定的另一方
* ----------------------------------------------------------------
*/
public static function getKeyByFd($fd){
return Redis::GET('fd:'.$fd);
}
public static function getFdsByKey($key){
return Redis::SMEMBERS($key);
}
public static function isGroupMember($key,$fd){
return Redis::SISMEMBER($key, $fd);
}
/**
* 删除用户组中fd 和 删除fd
* -----------------------------------------------------------------------------
*
* 将fd 从 用户的fd组删除 但不删除fd
*
* 将fd删除
*/
public static function delFd($fd){
$key = Redis::GET('fd:'.$fd);
Redis::SREM($key, $fd);
Redis::DEL('fd:'.$fd);
}
public static function remFdFromGroup($key, $fd){
Redis::SREM($key, $fd);
}
public static function delFdFromGroup($key, $fd){
Redis::SREM($key, $fd);
Redis::DEL('fd:'.$fd);
}
/*
* 用户活跃与在线
* -------------------------------------------------------------------------------
* active 活跃时间 记录客户端最后一次 ping 的时间
*
* online 上线与下线
*/
public static function active($key){
Redis::SET('active:'.$key, date('Y-m-d H:i:s', time()) );
}
public static function setOnline($key){
Redis::SETEX('online:'.$key, 5*60 , date('Y-m-d H:i:s', time()) );
}
public static function notOnline($key){
Redis::DEL('online:'.$key);
}
public static function isOnline($key){
return Redis::EXISTS('online:'.$key);
}
public static function getOnlineUsers(){
return Redis::KEYS('online:user:*');
}
/**
* 清除所有键值 初始化
* ---------------------------------------------------------------------------------
*/
public static function removeAllFds(){
$fds = REDIS::KEYS('fd:*');
foreach ($fds as $fd) {
REDIS::DEL($fd);
}
}
public static function removeAllUser(){
$fds = REDIS::KEYS('user:*');
foreach ($fds as $fd) {
REDIS::DEL($fd);
}
}
}
消息转发 Job 任务
<?php
namespace App\Jobs;
use Illuminate\Bus\Queueable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use App\Library\Y;
use Illuminate\Support\Facades\Log;
class Notice implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
public $tries = 2; // 最大尝试次数
public $timeout = 30; // 任务执行最大秒数
protected $data;
public function __construct($data=[])
{
$this->data = $data;
}
/*
* type='频道' event='事件名称' msg=[自定义载体] to='发送人'
*/
public function handle()
{
$data = $this->data;
if( !isset($data['event']) || !isset($data['msg']) ){
return;
}
if( !isset($data['to']) || empty($data['to']) ){
$data['to'] = '';
}
$data['msg'] = is_array($data['msg'])?json_encode($data['msg']):$data['msg'];
$data['type'] = 'app'; // 为了和应答区别
Y::curl('http://127.0.0.1:6001',$data,1); // 发送到swoole服务器
}
//要处理的失败任务
public function failed(\Exception $exception)
{
Log::error('notice:' . $exception->getMessage());
Log::error('notice:' . $exception->getTraceAsString());
}
}
消息投递
<?php
namespace App\Http\Controllers\Api\V1;
use Illuminate\Http\Request;
use App\Http\Controllers\Controller;
use App\Jobs\Notice;
class TestController extends Controller
{
public function index(){
$user_id = 999; //用户ID
$chat_id = 1; //聊天室ID
Notice::dispatch(['event'=>'chat'.$chat_id,'msg'=>'reload','to'=>'user:'.$user_id]);
Notice::dispatch(['event'=>'user_tabbar','msg'=>0,'to'=>'user:'.$user_id]);
}
}
本作品采用《CC 协议》,转载必须注明作者和本文链接
推荐文章: