websocket长连接接查询订单
websocket
需求: 由于前端页面在创建订单时,会频繁请求服务端查询订单接口。如果多用户多个未支付订单,会导致越来越多的查询请求。所以这里使用websocket
长链接来接收客户端的查询订单请求。
下面来实际操作下,我这里使用go-zero
框架
服务端
- 创建client文件, 主要是读取和写入消息
package ws
import (
"encoding/json"
"fmt"
"github.com/gorilla/websocket"
"net/http"
"time"
)
var upGrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
return true
},
}
// Client is a middleman between the websocket connection and the hub.
type Client struct {
hub *Hub
// The websocket connection.
conn *websocket.Conn
// Buffered channel of outbound messages.
send chan []byte
uid int64
}
var clients map[int64]*Client
func (c *Client) WriteMsg() {
defer func() {
_ = c.conn.Close()
}()
for {
select {
case message, ok := <-c.send:
if !ok {
c.conn.WriteMessage(websocket.CloseMessage, []byte{}) // 错误 关闭 channel
return
}
w, err := c.conn.NextWriter(websocket.TextMessage)
if err != nil {
return
}
_, _ = w.Write(message)
n := len(c.send)
for i := 0; i < n; i++ {
_, _ = w.Write(<-c.send)
}
if err := w.Close(); err != nil {
return
}
}
}
}
func (c *Client) ReadMsg() {
defer func() {
c.hub.unregister <- c
_ = c.conn.Close()
}()
for {
_, strByte, err := c.conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
fmt.Println("系统错误, err:", err)
}
break
}
// 检测用户是否连接
fmt.Printf("clients :%+v\n", clients[c.uid])
if clients[c.uid] != c {
fmt.Println("用户未登录,无法发送消息, uid:", c.uid)
continue
}
// 读取客户端发送的消息
fmt.Printf("read msg: %s\n", string(strByte))
var cliMsg CliMsg
//err = copier.Copy(&cliMsg, strByte)
err = json.Unmarshal(strByte, &cliMsg)
if err != nil {
fmt.Printf("客户端发送的数据错误,请使用json格式. err:%+v\n", err)
/*msg := CliMsg{
MsgType: ServerResp,
MsgData: map[string]interface{}{
"msg": "您发送的消息无效,请使用json格式",
},
}
msgSend.SendData(msg)*/
continue
}
if string(strByte) != "" {
//fmt.Println("记录信息发送, msg: ", cliMsg)
if msgSend != nil {
msgSend.SendData(cliMsg, c.uid)
}
//c.hub.broadcast <- cliMsg // 转发读取到的channel消息
}
}
}
- 创建
hub
, 维护客户端并广播消息
package ws
import (
"encoding/json"
"fmt"
)
// Hub maintains the set of active clients and broadcasts messages to the
// clients.
type Hub struct {
// Registered clients.
clients map[*Client]bool
// Inbound messages from the clients.
//broadcast chan []byte
broadcast chan CliMsg
// Register requests from the clients.
register chan *Client
// Unregister requests from clients.
unregister chan *Client
}
var globHub *Hub
func NewHub() *Hub {
return &Hub{
//broadcast: make(chan []byte),
broadcast: make(chan CliMsg),
register: make(chan *Client),
unregister: make(chan *Client),
clients: make(map[*Client]bool),
}
}
func (h *Hub) Run() {
for {
select {
case client := <-h.register: // 创建连接
if _, ok := clients[client.uid]; !ok {
clients[client.uid] = client
h.clients[client] = true
fmt.Printf("用户uid:%v已连接\n", client.uid)
}
case client := <-h.unregister: // 断开连接
if _, ok := h.clients[client]; ok {
fmt.Printf("用户uid:%v,断开连接\n", client.uid)
delete(h.clients, client)
//close(client.send)
}
// 退出
delete(clients, client.uid)
case message := <-h.broadcast: // 接收消息
// 解析消息
send, err := json.Marshal(message)
if err != nil {
fmt.Printf("解析消息失败, err:%+v\n", err)
return
}
//var sent []int64z
uid := message.MsgData["uid"].(int64)
if uid > 0 {
// 发送到那个uid
if cli, ok := clients[uid]; ok {
fmt.Printf("已发送到uid:%v\n", uid)
cli.send <- send
} else {
fmt.Printf("关闭发送消息, 并删除cli\n")
close(cli.send)
delete(h.clients, cli)
}
} else {
fmt.Printf("用户uid:%v不存在\n", uid)
}
}
}
}
- 创建
server
文件, 初始化客户端,发送消息
package ws
import (
"fmt"
"net/http"
"time"
)
func NewServer(w http.ResponseWriter, r *http.Request, uid int64, send messageInterface) {
if globHub == nil {
globHub = NewHub()
go globHub.Run()
}
if clients == nil {
clients = make(map[int64]*Client)
}
if msgSend == nil {
msgSend = send
}
runWs(globHub, w, r, uid)
}
func runWs(hub *Hub, w http.ResponseWriter, r *http.Request, uid int64) {
conn, err := upGrader.Upgrade(w, r, nil)
if err != nil {
fmt.Println("升级get请求错误\n", err)
return
}
client := &Client{hub: hub, conn: conn, send: make(chan []byte), uid: uid}
//连接时休眠1秒 防止刷新页面 先连接后退出
time.Sleep(time.Duration(1) * time.Second)
client.hub.register <- client
go client.ReadMsg()
go client.WriteMsg()
}
func SendMsgToUid(uid int64, msg CliMsg) {
if cli, ok := clients[uid]; ok {
fmt.Printf("server SendMsgToUid 正在发送到客户端数据,data:%+v\n", msg)
cli.hub.broadcast <- msg
}
}
- 创建
message
文件,定义消息体
package ws
const (
QueryOrder int64 = 1 // 查询订单
ServerResp int64 = 2 // 服务端响应
ClientConfirm int64 = 3 // 客户端确认
)
type CliMsg struct {
MsgType int64 `json:"msg_type"`
MsgData map[string]interface{} `json:"msg_data"`
}
type messageInterface interface {
SendData(msg CliMsg, uid int64)
}
var msgSend messageInterface
- 创建
wsServer
文件,启动服务
package svc
import (
cuser "app/common/user"
"app/common/ws"
"app/service/pay/rpc/pay"
"context"
"fmt"
"github.com/zeromicro/go-zero/core/logx"
"net/http"
"time"
)
type Server struct {
Ctx *ServiceContext
}
func (i Server) Start() {
Run(i.Ctx)
}
func (i Server) Stop() {
fmt.Println("ws service was stop...")
}
func Run(ctx *ServiceContext) {
mux := http.NewServeMux()
mux.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
// 由于浏览器websocket连接,无法加header,所以这里把token放入到form表单中
// 解析token
uid, err := checkToken(ctx, r.FormValue("token"))
if err != nil {
fmt.Printf("解析token失败, ws 连接错误:%+v", err)
return
}
ws.NewServer(w, r, uid, &data{ctx: ctx})
})
fmt.Printf("Starting Ws Server at %s:%v...\n", ctx.Config.WsServer.Host, ctx.Config.WsServer.Port)
mPort := fmt.Sprintf(":%v", ctx.Config.WsServer.Port)
err := http.ListenAndServe(mPort, mux)
if err != nil {
fmt.Println("ws err:", err)
return
}
}
// 检测客户端传递过来的token
func checkToken(ctx *ServiceContext, token string) (int64, error) {
userTokenHandler := cuser.NewUserToken(ctx.RedisClient, ctx.Config.JwtAuth.AccessSecret, ctx.Config.JwtAuth.AccessExpire)
uid, err := userTokenHandler.ParseTokenUnWithHttp(token, ctx.Config.JwtAuth.AccessSecret)
if err != nil || uid == 0 {
return uid, err
}
return uid, nil
}
type data struct {
ctx *ServiceContext
}
// 查询订单信息,并发送消息
func (d *data) SendData(msg ws.CliMsg, uid int64) {
switch msg.MsgType {
case ws.QueryOrder: // 查询订单信息
msg.MsgType = ws.ServerResp
orderDetail, err := d.ctx.PayRpc.OrderDetail(context.TODO(), &pay.OrderDetailReq{
OrderNo: msg.MsgData["order_no"].(string),
})
if err != nil {
logx.Errorf("查询订单失败,err:%+v\n", err)
msg.MsgData = map[string]interface{}{
"uid": uid,
"msg": "查询订单失败",
}
ws.SendMsgToUid(uid, msg)
return
}
if orderDetail == nil {
logx.Errorf("订单不存在\n")
msg.MsgData = map[string]interface{}{
"uid": uid,
"msg": "订单不存在",
}
ws.SendMsgToUid(uid, msg)
return
}
if orderDetail.Order.Uid != uid {
logx.Errorf("该订单:%s不属于用户uid:%v\n", msg.MsgData["order_no"].(string), uid)
msg.MsgData = map[string]interface{}{
"uid": uid,
"msg": "用户不存在改订单",
}
ws.SendMsgToUid(uid, msg)
return
}
// 服务端回复给客户端消息
msg.MsgData = map[string]interface{}{
"uid": orderDetail.Order.Uid,
"status": orderDetail.Order.OrderStatus,
"order_no": orderDetail.Order.OrderNo,
"send_time": time.Now().Unix(),
"msg": "ok",
}
ws.SendMsgToUid(uid, msg)
case ws.ClientConfirm: // 客户端确认信息收到
fmt.Printf("客户端已收到响应, 结果:%+v\n", msg)
}
}
- 服务加入组并启动
// api main
func main() {
flag.Parse()
var c config.Config
conf.MustLoad(*configFile, &c)
ctx := svc.NewServiceContext(c)
server := rest.MustNewServer(c.RestConf)
defer server.Stop()
handler.RegisterHandlers(server, ctx)
fmt.Printf("Starting server at %s:%d...\n", c.Host, c.Port)
//server.Start()
// 这里是添加的
group := service.NewServiceGroup() // 创建服务组
defer group.Stop()
group.Add(server) // 之前的服务加入
group.Add(svc.Server{Ctx: ctx}) // websocket 服务加入
group.Start() // 启动
}
客户端
上面服务端已经写好了,下面写个页面来测试下
<!DOCTYPE html>
<html lang="en">
<head>
<title>WebSocket 客户端工具</title>
</head>
<body>
<form onsubmit="return false;">
ws连接 <input type="text" name="ws" value="ws://localhost:3333/ws" style="width: 30%"/>
<br/>
token <input type="text" name="token" style="width:50%">
<input type="button" id="btnConnection" value="连接" onclick="openConn(this.form.ws.value, this.form.token.value)"/>
<!-- 子协议 <input type="text" name="subprotocol" value="drpcjson"/> -->
<!-- <input type="button" id="btnConnection" value="连接" onclick="openConn(this.form.ipaddr.value,this.form.subprotocol.value)"/>-->
<br/>
<br/>
<textarea name="reqMsg" style="width: 1000px;height: 200px"></textarea>
<input type="button" id="btnSend" value="发送请求" onclick="sendMsg(this.form.reqMsg.value)"/><br/><br/>
<textarea name="resMsg" id='respText' style="width: 1000px;height: 1000px"></textarea>
<input type="button" onclick="javascript: document.getElementById('respText').value=''" value="清空数据"/>
</form>
<script>
var webSocket;
// function openConn(url, subprotocol) {
function openConn(url, token) {
if (!window.WebSocket) {
alert("不支持websocket");
return;
}
var seq = 1;
//地址 ws://127.0.0.1:7008/drpc
//子协议 drpcjson
// webSocket = new WebSocket(url, subprotocol);
try {
webSocket = new WebSocket(url + "?token="+token);
} catch (e) {
console.log(e)
}
webSocket.onopen = function () {
var ta = document.getElementById('respText');
//发送一个校验token的消息,token校验成功才算是业务连接成功
// webSocket.send(token)
ta.value = "连接已建立!";
};
webSocket.onmessage = function (ev) {
//解析来自服务端的数据,做相应的业务逻辑处理
var ta = document.getElementById('respText');
ta.value = "已发送请求!";
// console.log(ev)
ta.value = ta.value + "\n\n" + ev.data;
//do business
var respData = JSON.parse(ev.data);
console.log(respData);
if (respData.msg_type === 2 && respData.msg_data.msg === 'ok') {//客户端需要响应服务端的push消息;服务端有多个push类型的服务号和消息号,详情咨询对应开发
const respJson = `{
"msg_type": 3,
"msg_data": {
"uid": ` + respData.msg_data.uid + `,
"msg": "ok"
}
}`;
webSocket.send(respJson);
}
};
webSocket.onclose = function (ev) {
var ta = document.getElementById('respText');
ta.value = ta.value + "\n\n" + "连接已关闭!";
// webSocket.close();
};
webSocket.onerror = function (ev) {
alert("error");
// webSocket.close();
};
}
function sendMsg(msg) {
if (!window.WebSocket) {
alert("不支持websocket");
return;
}
if (webSocket.readyState === WebSocket.OPEN) {
webSocket.send(msg);
} else {
alert("websocket连接尚未开启");
}
}
</script>
</body>
</html>
下面来测试下
- 创建连接,输入token
- 输入请求参数
- 更换token
- 输入错误参数
- 请求头
websocat 命令
当然也可以传递header
头,使用命令来测试连接,不过代码服务端代码需要修改下,解析token
从header
里获取。
> websocat ws://localhost:8803/ws -H='Authorization:eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE2OTA0MjgwOTIsImlhdCI6MTY1ODg5MjA5Miwiand0VXNlcklkIjoxMDAwMDE1fQ.IhPimbe9h4wbQT_En-IwvhAB6hZTbbcxEbTuekJ7Ogk'
# 输入后回车
{"msg_type": 1,"msg_data": {"order_no": "UC2022071918273749558633"}}
# 响应
{"msg_type":2,"msg_data":{"msg":"ok","order_no":"UC2022071918273749558633","send_time":1659090212,"status":2,"uid":1000014}}
网上很多教程,可以参考。
阮一峰的网络日志
用 Go + WebSocket 快速实现一个 chat 服务
golang 简易聊天室
本作品采用《CC 协议》,转载必须注明作者和本文链接
感觉又回到了web商城时代一样,现在那么多APP,用web的都少多了