《从0到1搭建一个IM项目》消息模块开发之消息核心升级改造

[toc]

概况

到目前为止,我们已经将IM项目的信息发送接收逻辑完成了,但是这里我们要进一步完善,我们需要将这个过程提高并发量,这里我们需要将消息模块加入udp连接。

到目前为止,我们的项目目录结构:

HiChat   
    ├── common    //放置公共文件
    |      |——md5.go
    |      |——resp.go
    │  
    ├── config    //做配置文件
    │  
    ├── dao//数据库crud|——user.go
    |     |——relation.go
    |     |——community.go
    |
    ├── global    //放置各种连接池,配置等|——global.go
    |
    ├── initialize  //项目初始化文件|——db.go
    |              |——logger.go
    |
    ├── middlewear  //放置web中间件
    |              |——jwt.go
    ├── models      //数据库表设计|——user_basic.go
    |           |——relation.go
    |           |——message.go
    |           |——community.go
    |
    ├── router           //路由
    |       |——router.go
    │   
    ├── service     //对外api
    |       |——user.go
    |       |——relation.go
    │   
    ├── test        //测试文件
    │  
    ├── main.go     //项目入口
    ├── go.mod            //项目依赖管理
    ├── go.sum            //项目依赖管理

消息接收引入udp连接

在message.go中,recProc(node)方法用来,接收用户发来的消息, 现在我们这样做:

//recProc 读取websocket用户发送的消息
func recProc(node *Node) {
    for {
        //获取信息
        _, data, err := node.Conn.ReadMessage()
        if err != nil {
            zap.S().Info("读取消息失败", err)
            return
        }

    //将消息体放入全局channel中
        brodMsg(data)
    }
}

//全局channel
var upSendChan chan []byte = make(chan []byte, 1024)

func brodMsg(data []byte) {
    upSendChan <- data
}


//init方法,运行message包前调用
func init() {
    go UdpSendProc()  
    go UpdRecProc()
}


//UdpSendProc 完成upd数据发送, 连接到udp服务端,将全局channel中的消息体,写入udp服务端
func UdpSendProc() {
    udpConn, err := net.DialUDP("udp", nil, &net.UDPAddr{
        //192.168.31.147
        IP:   net.IPv4(127, 0, 0, 1),
        Port: 3000,
        Zone: "",
    })
    if err != nil {
        zap.S().Info("拨号udp端口失败", err)
        return
    }

    defer udpConn.Close()

    for {
        select {
        case data := <-upSendChan:
            _, err := udpConn.Write(data)
            if err != nil {
                zap.S().Info("写入udp消息失败", err)
                return
            }
        }
    }
}


//UpdRecProc 完成udp数据的接收,启动udp服务,获取udp客户端的写入的消息
func UpdRecProc() {
    udpConn, err := net.ListenUDP("udp", &net.UDPAddr{
        IP:   net.IPv4(127, 0, 0, 1),
        Port: 3000,
    })
    if err != nil {
        zap.S().Info("监听udp端口失败", err)
        return
    }

    defer udpConn.Close()

    for {
        var buf [1024]byte
        n, err := udpConn.Read(buf[0:])
        if err != nil {
            zap.S().Info("读取udp数据失败", err)
            return
        }

        //处理发送逻辑
        dispatch(buf[0:n])
    }
}

//dispatch 解析消息,聊天类型判断
func dispatch(data []byte) {
    //解析消息
    msg := Message{}
    err := json.Unmarshal(data, &msg)
    if err != nil {
        zap.S().Info("消息解析失败", err)
        return
    }

    //判断消息类型
    switch msg.Type {
    case 1: //私聊
    sendMsg(msg.TargetId, data)
    case 2: //群发
        sendGroupMsg(uint(msg.FormId), uint(msg.TargetId), data)
    }
}

//sendMs 向用户单聊发送消息
func sendMsg(id int64, msg []byte) {
    rwLocker.Lock()
    node, ok := clientMap[id]
    rwLocker.Unlock()

    if !ok {
        zap.S().Info("userID没有对应的node")
        return
    }

    zap.S().Info("targetID:", id, "node:", node)
    if ok {
        node.DataQueue <- msg
    }
}

//sendGroupMsg 群发逻辑
func sendGroupMsg(formId, target uint, data []byte) (int, error) {……}

改造到这里,其功能就和上一篇文章一样了,由于测试方法和前面一样,这里也就给大家测试了。

总结

总体上内容简单,其核心就是将原来的recProc()读取websocket用户发送的消息中,没有将用户发来的信息直接粗暴塞进接收者用户的websocket连接中,而是将消息体仍进全局channel,然后将通过udp连接将消息体从全局channel中写入udp服务端,进行消息解析,逻辑判断,然后进行转发。

本作品采用《CC 协议》,转载必须注明作者和本文链接
刻意学习
讨论数量: 2

这样写的好处是啥? 格外经过udp的处理,不是多耗费了性能吗?

1年前 评论

The approach of using a global channel to receive messages from users and then forwarding those messages via UDP to a server for processing and logic handling offers several benefits:

Decoupling: By using a global channel as an intermediate message queue, the WebSocket server (recProc()) is decoupled from the processing logic and message handling on the UDP server. This allows for a more modular and flexible design, where the WebSocket server doesn't need to know about the details of message processing.

Asynchronous Processing: Sending messages to the global channel and then processing them asynchronously via UDP allows the WebSocket server to quickly respond to incoming messages without waiting for the processing to complete. This can improve the overall responsiveness and scalability of the WebSocket server.

Load Balancing: By using UDP to forward messages to a separate server for processing, the application can distribute the message processing workload across multiple servers. This can help to balance the load and avoid bottlenecks in message handling.

Fault Isolation: If the UDP server experiences issues or becomes overloaded, it won't directly impact the WebSocket server. The two components are separated, and failures or performance issues in one don't affect the other.

Scalability: Since the global channel acts as a buffer for incoming messages, it allows the WebSocket server to handle bursts of traffic without dropping messages. This can improve the overall scalability of the system.

Protocol Flexibility: Using UDP for forwarding messages allows for flexibility in choosing the protocol for communication between the WebSocket server and the UDP server. UDP is a low-latency, connectionless protocol that can be suitable for certain real-time or high-throughput applications.

Overall, this architecture allows for better scalability, fault tolerance, and separation of concerns in handling WebSocket messages. It leverages the benefits of asynchronous communication and allows the system to adapt to varying message processing demands. However, it also introduces complexity and may require careful consideration of error handling and data consistency between the WebSocket server and the UDP server.

7个月前 评论

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!
未填写
文章
118
粉丝
88
喜欢
173
收藏
245
排名:367
访问:2.6 万
私信
所有博文
社区赞助商