《从0到1搭建一个IM项目》消息模块开发之消息核心升级改造

[toc]

概况

到目前为止,我们已经将IM项目的信息发送接收逻辑完成了,但是这里我们要进一步完善,我们需要将这个过程提高并发量,这里我们需要将消息模块加入udp连接。

到目前为止,我们的项目目录结构:

HiChat   
    ├── common    //放置公共文件
    |      |——md5.go
    |      |——resp.go
    │  
    ├── config    //做配置文件
    │  
    ├── dao//数据库crud|——user.go
    |     |——relation.go
    |     |——community.go
    |
    ├── global    //放置各种连接池,配置等|——global.go
    |
    ├── initialize  //项目初始化文件|——db.go
    |              |——logger.go
    |
    ├── middlewear  //放置web中间件
    |              |——jwt.go
    ├── models      //数据库表设计|——user_basic.go
    |           |——relation.go
    |           |——message.go
    |           |——community.go
    |
    ├── router           //路由
    |       |——router.go
    │   
    ├── service     //对外api
    |       |——user.go
    |       |——relation.go
    │   
    ├── test        //测试文件
    │  
    ├── main.go     //项目入口
    ├── go.mod            //项目依赖管理
    ├── go.sum            //项目依赖管理

消息接收引入udp连接

在message.go中,recProc(node)方法用来,接收用户发来的消息, 现在我们这样做:

//recProc 读取websocket用户发送的消息
func recProc(node *Node) {
    for {
        //获取信息
        _, data, err := node.Conn.ReadMessage()
        if err != nil {
            zap.S().Info("读取消息失败", err)
            return
        }

    //将消息体放入全局channel中
        brodMsg(data)
    }
}

//全局channel
var upSendChan chan []byte = make(chan []byte, 1024)

func brodMsg(data []byte) {
    upSendChan <- data
}


//init方法,运行message包前调用
func init() {
    go UdpSendProc()  
    go UpdRecProc()
}


//UdpSendProc 完成upd数据发送, 连接到udp服务端,将全局channel中的消息体,写入udp服务端
func UdpSendProc() {
    udpConn, err := net.DialUDP("udp", nil, &net.UDPAddr{
        //192.168.31.147
        IP:   net.IPv4(127, 0, 0, 1),
        Port: 3000,
        Zone: "",
    })
    if err != nil {
        zap.S().Info("拨号udp端口失败", err)
        return
    }

    defer udpConn.Close()

    for {
        select {
        case data := <-upSendChan:
            _, err := udpConn.Write(data)
            if err != nil {
                zap.S().Info("写入udp消息失败", err)
                return
            }
        }
    }
}


//UpdRecProc 完成udp数据的接收,启动udp服务,获取udp客户端的写入的消息
func UpdRecProc() {
    udpConn, err := net.ListenUDP("udp", &net.UDPAddr{
        IP:   net.IPv4(127, 0, 0, 1),
        Port: 3000,
    })
    if err != nil {
        zap.S().Info("监听udp端口失败", err)
        return
    }

    defer udpConn.Close()

    for {
        var buf [1024]byte
        n, err := udpConn.Read(buf[0:])
        if err != nil {
            zap.S().Info("读取udp数据失败", err)
            return
        }

        //处理发送逻辑
        dispatch(buf[0:n])
    }
}

//dispatch 解析消息,聊天类型判断
func dispatch(data []byte) {
    //解析消息
    msg := Message{}
    err := json.Unmarshal(data, &msg)
    if err != nil {
        zap.S().Info("消息解析失败", err)
        return
    }

    //判断消息类型
    switch msg.Type {
    case 1: //私聊
    sendMsg(msg.TargetId, data)
    case 2: //群发
        sendGroupMsg(uint(msg.FormId), uint(msg.TargetId), data)
    }
}

//sendMs 向用户单聊发送消息
func sendMsg(id int64, msg []byte) {
    rwLocker.Lock()
    node, ok := clientMap[id]
    rwLocker.Unlock()

    if !ok {
        zap.S().Info("userID没有对应的node")
        return
    }

    zap.S().Info("targetID:", id, "node:", node)
    if ok {
        node.DataQueue <- msg
    }
}

//sendGroupMsg 群发逻辑
func sendGroupMsg(formId, target uint, data []byte) (int, error) {……}

改造到这里,其功能就和上一篇文章一样了,由于测试方法和前面一样,这里也就给大家测试了。

总结

总体上内容简单,其核心就是将原来的recProc()读取websocket用户发送的消息中,没有将用户发来的信息直接粗暴塞进接收者用户的websocket连接中,而是将消息体仍进全局channel,然后将通过udp连接将消息体从全局channel中写入udp服务端,进行消息解析,逻辑判断,然后进行转发。

本作品采用《CC 协议》,转载必须注明作者和本文链接
讨论数量: 1

这样写的好处是啥? 格外经过udp的处理,不是多耗费了性能吗?

2周前 评论

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!