websocket长连接接查询订单

websocket

需求: 由于前端页面在创建订单时,会频繁请求服务端查询订单接口。如果多用户多个未支付订单,会导致越来越多的查询请求。所以这里使用websocket长链接来接收客户端的查询订单请求。

下面来实际操作下,我这里使用go-zero框架

服务端

  1. 创建client文件, 主要是读取和写入消息
package ws

import (
    "encoding/json"
    "fmt"
    "github.com/gorilla/websocket"
    "net/http"
    "time"
)

var upGrader = websocket.Upgrader{
    ReadBufferSize:  1024,
    WriteBufferSize: 1024,
    CheckOrigin: func(r *http.Request) bool {
        return true
    },
}

// Client is a middleman between the websocket connection and the hub.
type Client struct {
    hub *Hub

    // The websocket connection.
    conn *websocket.Conn

    // Buffered channel of outbound messages.
    send chan []byte

    uid int64
}

var clients map[int64]*Client


func (c *Client) WriteMsg() {
    defer func() {
        _ = c.conn.Close()
    }()
    for {
        select {
        case message, ok := <-c.send:
            if !ok {
                c.conn.WriteMessage(websocket.CloseMessage, []byte{}) // 错误 关闭 channel
                return
            }
            w, err := c.conn.NextWriter(websocket.TextMessage)
            if err != nil {
                return
            }
            _, _ = w.Write(message)
            n := len(c.send)
            for i := 0; i < n; i++ {
                _, _ = w.Write(<-c.send)
            }

            if err := w.Close(); err != nil {
                return
            }
        }
    }
}

func (c *Client) ReadMsg() {
    defer func() {
        c.hub.unregister <- c
        _ = c.conn.Close()
    }()
    for {
        _, strByte, err := c.conn.ReadMessage()
        if err != nil {
            if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
                fmt.Println("系统错误, err:", err)
            }
            break
        }
        // 检测用户是否连接
        fmt.Printf("clients :%+v\n", clients[c.uid])
        if clients[c.uid] != c {
            fmt.Println("用户未登录,无法发送消息, uid:", c.uid)
            continue
        }

        // 读取客户端发送的消息
        fmt.Printf("read msg: %s\n", string(strByte))
        var cliMsg CliMsg
        //err = copier.Copy(&cliMsg, strByte)
        err = json.Unmarshal(strByte, &cliMsg)
        if err != nil {
            fmt.Printf("客户端发送的数据错误,请使用json格式. err:%+v\n", err)
            /*msg := CliMsg{
                MsgType: ServerResp,
                MsgData: map[string]interface{}{
                    "msg": "您发送的消息无效,请使用json格式",
                },
            }
            msgSend.SendData(msg)*/
            continue
        }

        if string(strByte) != "" {
            //fmt.Println("记录信息发送, msg: ", cliMsg)
            if msgSend != nil {
                msgSend.SendData(cliMsg, c.uid)
            }
            //c.hub.broadcast <- cliMsg // 转发读取到的channel消息
        }
    }
}

  1. 创建hub, 维护客户端并广播消息
package ws

import (
    "encoding/json"
    "fmt"
)

// Hub maintains the set of active clients and broadcasts messages to the
// clients.
type Hub struct {
    // Registered clients.
    clients map[*Client]bool

    // Inbound messages from the clients.
    //broadcast chan []byte
    broadcast chan CliMsg

    // Register requests from the clients.
    register chan *Client

    // Unregister requests from clients.
    unregister chan *Client
}

var globHub *Hub

func NewHub() *Hub {
    return &Hub{
        //broadcast:  make(chan []byte),
        broadcast:  make(chan CliMsg),
        register:   make(chan *Client),
        unregister: make(chan *Client),
        clients:    make(map[*Client]bool),
    }
}

func (h *Hub) Run() {
    for {
        select {
        case client := <-h.register: // 创建连接
            if _, ok := clients[client.uid]; !ok {
                clients[client.uid] = client
                h.clients[client] = true
                fmt.Printf("用户uid:%v已连接\n", client.uid)
            }
        case client := <-h.unregister: // 断开连接
            if _, ok := h.clients[client]; ok {
                fmt.Printf("用户uid:%v,断开连接\n", client.uid)
                delete(h.clients, client)
                //close(client.send)
            }
            // 退出
            delete(clients, client.uid)
        case message := <-h.broadcast: // 接收消息
            // 解析消息
            send, err := json.Marshal(message)
            if err != nil {
                fmt.Printf("解析消息失败, err:%+v\n", err)
                return
            }
            //var sent []int64z
            uid := message.MsgData["uid"].(int64)
            if uid > 0 {
                // 发送到那个uid
                if cli, ok := clients[uid]; ok {
                    fmt.Printf("已发送到uid:%v\n", uid)
                    cli.send <- send
                } else {
                    fmt.Printf("关闭发送消息, 并删除cli\n")
                    close(cli.send)
                    delete(h.clients, cli)
                }
            } else {
                fmt.Printf("用户uid:%v不存在\n", uid)
            }
        }
    }
}
  1. 创建server文件, 初始化客户端,发送消息
package ws

import (
    "fmt"
    "net/http"
    "time"
)

func NewServer(w http.ResponseWriter, r *http.Request, uid int64, send messageInterface) {
    if globHub == nil {
        globHub = NewHub()
        go globHub.Run()
    }
    if clients == nil {
        clients = make(map[int64]*Client)
    }
    if msgSend == nil {
        msgSend = send
    }
    runWs(globHub, w, r, uid)
}

func runWs(hub *Hub, w http.ResponseWriter, r *http.Request, uid int64) {
    conn, err := upGrader.Upgrade(w, r, nil)
    if err != nil {
        fmt.Println("升级get请求错误\n", err)
        return
    }
    client := &Client{hub: hub, conn: conn, send: make(chan []byte), uid: uid}
    //连接时休眠1秒  防止刷新页面 先连接后退出
    time.Sleep(time.Duration(1) * time.Second)
    client.hub.register <- client
    go client.ReadMsg()
    go client.WriteMsg()
}

func SendMsgToUid(uid int64, msg CliMsg) {
    if cli, ok := clients[uid]; ok {
        fmt.Printf("server SendMsgToUid 正在发送到客户端数据,data:%+v\n", msg)
        cli.hub.broadcast <- msg
    }
}
  1. 创建message文件,定义消息体

package ws

const (
    QueryOrder    int64 = 1 // 查询订单
    ServerResp    int64 = 2 // 服务端响应
    ClientConfirm int64 = 3 // 客户端确认
)

type CliMsg struct {
    MsgType int64                  `json:"msg_type"`
    MsgData map[string]interface{} `json:"msg_data"`
}

type messageInterface interface {
    SendData(msg CliMsg, uid int64)
}

var msgSend messageInterface
  1. 创建wsServer文件,启动服务
package svc

import (
    cuser "app/common/user"
    "app/common/ws"
    "app/service/pay/rpc/pay"
    "context"
    "fmt"
    "github.com/zeromicro/go-zero/core/logx"
    "net/http"
    "time"
)

type Server struct {
    Ctx *ServiceContext
}

func (i Server) Start() {
    Run(i.Ctx)
}
func (i Server) Stop() {
    fmt.Println("ws service was stop...")
}

func Run(ctx *ServiceContext) {
    mux := http.NewServeMux()
    mux.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
        // 由于浏览器websocket连接,无法加header,所以这里把token放入到form表单中
        // 解析token
        uid, err := checkToken(ctx, r.FormValue("token"))
        if err != nil {
            fmt.Printf("解析token失败, ws 连接错误:%+v", err)
            return
        }

        ws.NewServer(w, r, uid, &data{ctx: ctx})
    })

    fmt.Printf("Starting Ws Server at %s:%v...\n", ctx.Config.WsServer.Host, ctx.Config.WsServer.Port)
    mPort := fmt.Sprintf(":%v", ctx.Config.WsServer.Port)
    err := http.ListenAndServe(mPort, mux)
    if err != nil {
        fmt.Println("ws err:", err)
        return
    }
}

// 检测客户端传递过来的token
func checkToken(ctx *ServiceContext, token string) (int64, error) {
    userTokenHandler := cuser.NewUserToken(ctx.RedisClient, ctx.Config.JwtAuth.AccessSecret, ctx.Config.JwtAuth.AccessExpire)
    uid, err := userTokenHandler.ParseTokenUnWithHttp(token, ctx.Config.JwtAuth.AccessSecret)
    if err != nil || uid == 0 {
        return uid, err
    }
    return uid, nil
}

type data struct {
    ctx *ServiceContext
}

// 查询订单信息,并发送消息
func (d *data) SendData(msg ws.CliMsg, uid int64) {
    switch msg.MsgType {
    case ws.QueryOrder: // 查询订单信息
        msg.MsgType = ws.ServerResp
        orderDetail, err := d.ctx.PayRpc.OrderDetail(context.TODO(), &pay.OrderDetailReq{
            OrderNo: msg.MsgData["order_no"].(string),
        })
        if err != nil {
            logx.Errorf("查询订单失败,err:%+v\n", err)
            msg.MsgData = map[string]interface{}{
                "uid": uid,
                "msg": "查询订单失败",
            }
            ws.SendMsgToUid(uid, msg)
            return
        }
        if orderDetail == nil {
            logx.Errorf("订单不存在\n")
            msg.MsgData = map[string]interface{}{
                "uid": uid,
                "msg": "订单不存在",
            }
            ws.SendMsgToUid(uid, msg)
            return
        }

        if orderDetail.Order.Uid != uid {
            logx.Errorf("该订单:%s不属于用户uid:%v\n", msg.MsgData["order_no"].(string), uid)
            msg.MsgData = map[string]interface{}{
                "uid": uid,
                "msg": "用户不存在改订单",
            }
            ws.SendMsgToUid(uid, msg)
            return
        }

        // 服务端回复给客户端消息
        msg.MsgData = map[string]interface{}{
            "uid":       orderDetail.Order.Uid,
            "status":    orderDetail.Order.OrderStatus,
            "order_no":  orderDetail.Order.OrderNo,
            "send_time": time.Now().Unix(),
            "msg":       "ok",
        }
        ws.SendMsgToUid(uid, msg)

    case ws.ClientConfirm: // 客户端确认信息收到
        fmt.Printf("客户端已收到响应, 结果:%+v\n", msg)
    }
}
  1. 服务加入组并启动

// api main
func main() {
    flag.Parse()

    var c config.Config
    conf.MustLoad(*configFile, &c)

    ctx := svc.NewServiceContext(c)
    server := rest.MustNewServer(c.RestConf)
    defer server.Stop()

    handler.RegisterHandlers(server, ctx)

    fmt.Printf("Starting server at %s:%d...\n", c.Host, c.Port)
    //server.Start()

    // 这里是添加的
    group := service.NewServiceGroup()  // 创建服务组
    defer group.Stop()
    group.Add(server)   // 之前的服务加入
    group.Add(svc.Server{Ctx: ctx}) // websocket 服务加入
    group.Start()   // 启动
}

客户端

上面服务端已经写好了,下面写个页面来测试下

<!DOCTYPE html>
<html lang="en">
<head>
    <title>WebSocket 客户端工具</title>
</head>
<body>

<form onsubmit="return false;">
    ws连接 <input type="text" name="ws" value="ws://localhost:3333/ws" style="width: 30%"/>
    <br/>
    token <input type="text" name="token" style="width:50%">
    <input type="button" id="btnConnection" value="连接" onclick="openConn(this.form.ws.value, this.form.token.value)"/>

    <!--    子协议 <input type="text" name="subprotocol" value="drpcjson"/> -->
    <!--    <input type="button" id="btnConnection" value="连接" onclick="openConn(this.form.ipaddr.value,this.form.subprotocol.value)"/>-->
    <br/>
    <br/>
    <textarea name="reqMsg" style="width: 1000px;height: 200px"></textarea>
    <input type="button" id="btnSend" value="发送请求" onclick="sendMsg(this.form.reqMsg.value)"/><br/><br/>

    <textarea name="resMsg" id='respText' style="width: 1000px;height: 1000px"></textarea>
    <input type="button" onclick="javascript: document.getElementById('respText').value=''" value="清空数据"/>
</form>
<script>

    var webSocket;

    // function openConn(url, subprotocol) {
    function openConn(url, token) {
        if (!window.WebSocket) {
            alert("不支持websocket");
            return;
        }

        var seq = 1;

        //地址 ws://127.0.0.1:7008/drpc
        //子协议 drpcjson
        // webSocket = new WebSocket(url, subprotocol);

        try {
            webSocket = new WebSocket(url + "?token="+token);
        } catch (e) {
            console.log(e)
        }

        webSocket.onopen = function () {
            var ta = document.getElementById('respText');
            //发送一个校验token的消息,token校验成功才算是业务连接成功
            // webSocket.send(token)
            ta.value = "连接已建立!";
        };

        webSocket.onmessage = function (ev) {
            //解析来自服务端的数据,做相应的业务逻辑处理
            var ta = document.getElementById('respText');
            ta.value = "已发送请求!";
            // console.log(ev)

            ta.value = ta.value + "\n\n" + ev.data;
            //do business
            var respData = JSON.parse(ev.data);
            console.log(respData);

            if (respData.msg_type === 2 && respData.msg_data.msg === 'ok') {//客户端需要响应服务端的push消息;服务端有多个push类型的服务号和消息号,详情咨询对应开发
                const respJson = `{
                    "msg_type": 3,
                    "msg_data": {
                        "uid": ` + respData.msg_data.uid + `,
                        "msg": "ok"
                    }
                }`;

                webSocket.send(respJson);
            }
        };

        webSocket.onclose = function (ev) {
            var ta = document.getElementById('respText');
            ta.value = ta.value + "\n\n" + "连接已关闭!";
            // webSocket.close();
        };

        webSocket.onerror = function (ev) {
            alert("error");
            // webSocket.close();
        };
    }

    function sendMsg(msg) {
        if (!window.WebSocket) {
            alert("不支持websocket");
            return;
        }

        if (webSocket.readyState === WebSocket.OPEN) {
            webSocket.send(msg);
        } else {
            alert("websocket连接尚未开启");
        }
    }

</script>
</body>
</html>

下面来测试下

  1. 创建连接,输入token

  1. 输入请求参数

  1. 更换token

  1. 输入错误参数

  1. 请求头

websocat 命令

当然也可以传递header头,使用命令来测试连接,不过代码服务端代码需要修改下,解析tokenheader里获取。

> websocat  ws://localhost:8803/ws -H='Authorization:eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE2OTA0MjgwOTIsImlhdCI6MTY1ODg5MjA5Miwiand0VXNlcklkIjoxMDAwMDE1fQ.IhPimbe9h4wbQT_En-IwvhAB6hZTbbcxEbTuekJ7Ogk'
# 输入后回车
{"msg_type": 1,"msg_data": {"order_no": "UC2022071918273749558633"}}
# 响应
{"msg_type":2,"msg_data":{"msg":"ok","order_no":"UC2022071918273749558633","send_time":1659090212,"status":2,"uid":1000014}}

网上很多教程,可以参考。
阮一峰的网络日志
用 Go + WebSocket 快速实现一个 chat 服务
golang 简易聊天室

本作品采用《CC 协议》,转载必须注明作者和本文链接
讨论数量: 1

感觉又回到了web商城时代一样,现在那么多APP,用web的都少多了

1周前 评论

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!