《从0到1搭建一个IM项目》消息模块开发之消息分类与Redis的引入

[toc]

概况

前面我们已经完成了单聊模式的开发,并且已经完成了测试同时对信息发送流程进行了升级改造引入了udp,在本篇内容中,我们将开始完善聊天功能,将聊天类型划分为单聊和群聊分别来实现他们的信息转发;与此同时对应聊天系统一定是要有将聊天记录进行存储的,由于我们的应用是web服务,不能将其聊天记录存储到浏览器的cook中,这是不安全的,所以最后是将聊天记录存储到服务端,所以选择Redis来缓存聊天记录。

下面目录结构:

HiChat   
    ├── common    //放置公共文件
    |      |——md5.go
    |      |——resp.go
    │  
    ├── config    //做配置文件
    │  
    ├── dao//数据库crud|——user.go
    |     |——relation.go
    |     |——community.go
    |
    ├── global    //放置各种连接池,配置等|——global.go
    |
    ├── initialize  //项目初始化文件|——db.go
    |              |——logger.go
    |
    ├── middlewear  //放置web中间件
    |              |——jwt.go
    ├── models      //数据库表设计|——user_basic.go
    |           |——relation.go
    |           |——message.go
    |           |——community.go
    |
    ├── router           //路由
    |       |——router.go
    │   
    ├── service     //对外api
    |       |——user.go
    |       |——relation.go
    │   
    ├── test        //测试文件
    │  
    ├── main.go     //项目入口
    ├── go.mod            //项目依赖管理
    ├── go.sum            //项目依赖管理

单聊信息的收发及存储

在上一篇文章中,我们已经完成了消息收发以及udp连接,所以这里我们将重点讲解消息的分类,对单聊消息的处理和存储。

在udp服务端中调用了函数:dispatch(data []byte)

func dispatch(data []byte) {
    //解析消息
    msg := Message{}
    err := json.Unmarshal(data, &msg)
    if err != nil {
        zap.S().Info("消息解析失败", err)
        return
    }

    fmt.Println("解析数据:", msg, "msg.FormId", msg.FormId, "targetId:", msg.TargetId, "type:", msg.Type)

    //判断消息类型
    switch msg.Type {
    case 1: //私聊
        sendMsgAndSave(msg.TargetId, data)
    case 2: //群发
        sendGroupMsg(uint(msg.FormId), uint(msg.TargetId), data)
    }
}

消息的私发与存储


//sendMsgTest 发送消息 并存储聊天记录到redis
func sendMsgAndSave(userId int64, msg []byte) {


    rwLocker.RLock()              //保证线程安全,上锁
    node, ok := clientMap[userId] //对方是否在线
    rwLocker.RUnlock()                        //解锁

    jsonMsg := Message{}
    json.Unmarshal(msg, &jsonMsg)
    ctx := context.Background()
    targetIdStr := strconv.Itoa(int(userId))
    userIdStr := strconv.Itoa(int(jsonMsg.FormId))


    if ok {
    //如果当前用户在线,将消息转发到当前用户的websocket连接中,然后进行存储
        node.DataQueue <- msg
    }

    //userIdStr和targetIdStr进行拼接唯一key
    var key string
    if userId > jsonMsg.FormId {
        key = "msg_" + userIdStr + "_" + targetIdStr
    } else {
        key = "msg_" + targetIdStr + "_" + userIdStr
    }

    //创建记录
    res, err := global.RedisDB.ZRevRange(ctx, key, 0, -1).Result()
    if err != nil {
        fmt.Println(err)
    return
    }

    //将聊天记录写入redis缓存中
    score := float64(cap(res)) + 1
    ress, e := global.RedisDB.ZAdd(ctx, key, &redis.Z{score, msg}).Result() //jsonMsg
    //res, e := utils.Red.Do(ctx, "zadd", key, 1, jsonMsg).Result() //备用 后续拓展 记录完整msg
    if e != nil {
        fmt.Println(e)
    return
    }
    fmt.Println(ress)
}

消息的群消息收发与存储

群发消息的逻辑:当群成员在当前群聊发送一条消息后,然后服务器将该群所有成员获取到,然后向除发送消息的用户外的所有群成员一一单发消息;简单的说就是:将群消息,给群成员都进行一次单聊信息的发送。

func dispatch(data []byte) {
    //解析消息
    msg := Message{}
    err := json.Unmarshal(data, &msg)
    if err != nil {
        zap.S().Info("消息解析失败", err)
        return
    }

    fmt.Println("解析数据:", msg, "msg.FormId", msg.FormId, "targetId:", msg.TargetId, "type:", msg.Type)

    //判断消息类型
    switch msg.Type {
    case 1: //私聊
        sendMsgAndSave(msg.TargetId, data)
    case 2: //群发
        sendGroupMsg(uint(msg.FormId), uint(msg.TargetId), data)
    }
}

群信息收发及存储

//sendGroupMsg 群发
func sendGroupMsg(formId, target uint, data []byte) (int, error) {
    //群发的逻辑:1获取到群里所有用户,然后向除开自己的每一位用户发送消息
    userIDs, err := FindUsers(target)
    if err != nil {
        return -1, err
    }

    for _, userId := range *userIDs {
        if formId != userId {  //不能给当前发送消息的成员进行转发
      //调用单聊的函数,群聊变成了多次单聊
            sendMsgAndSave(int64(userId), data)
        }
    }
    return 0, nil
}

调用的FindUsers(target)在community.go中

//FindUsers 获取群成员id
func FindUsers(groupId uint) (*[]uint, error) {
    relation := make([]Relation, 0)
    if tx := global.DB.Where("target_id = ? and type = 2", groupId).Find(&relation); tx.RowsAffected == 0 {
        return nil, errors.New("未查询到成员信息")
    }

    userIDs := make([]uint, 0)
    for _, v := range relation {
        userId := v.OwnerId
        userIDs = append(userIDs, userId)
    }
    return &userIDs, nil
}

这样消息分类和聊天记录的存储就完成了

聊天记录的获取

对于聊天记录,不管是单聊还是群聊,目前为止都已经将其有标识的存储到缓存Redis中了,对于聊天记录我们只需要完成相应的api,当客户端进行请求时就可以请求到,对应的聊天记录

聊天记录的获取

//RedisMsg 获取缓存里面的聊天记录
func RedisMsg(userIdA int64, userIdB int64, start int64, end int64, isRev bool) []string {
    ctx := context.Background()
    userIdStr := strconv.Itoa(int(userIdA))
    targetIdStr := strconv.Itoa(int(userIdB))

    //userIdStr和targetIdStr进行拼接唯一key
    var key string
    if userIdA > userIdB {
        key = "msg_" + targetIdStr + "_" + userIdStr
    } else {
        key = "msg_" + userIdStr + "_" + targetIdStr
    }

    var rels []string
    var err error
    if isRev {
        rels, err = global.RedisDB.ZRange(ctx, key, start, end).Result()
    } else {
        rels, err = global.RedisDB.ZRevRange(ctx, key, start, end).Result()
    }
    if err != nil {
        fmt.Println(err) //没有找到
    }
    return rels
}

对外api的暴露

在service目录下relation.go中编写函数:

func RedisMsg(c *gin.Context) {
    userIdA, _ := strconv.Atoi(c.PostForm("userIdA"))
    userIdB, _ := strconv.Atoi(c.PostForm("userIdB"))
    start, _ := strconv.Atoi(c.PostForm("start"))
    end, _ := strconv.Atoi(c.PostForm("end"))
    isRev, _ := strconv.ParseBool(c.PostForm("isRev"))
    res := models.RedisMsg(int64(userIdA), int64(userIdB), int64(start), int64(end), isRev)
    common.RespOKList(c.Writer, "ok", res)
}

路由

在router目录下的router.go中:

//聊天记录
    v1.POST("/user/redisMsg", service.RedisMsg).Use(middlewear.JWY())

message完整代码

package models

import (
    "context"
    "encoding/json"
    "fmt"
    "net"
    "net/http"
    "strconv"
    "sync"

    "HiChat/global"

    "github.com/go-redis/redis/v8"
    "github.com/gorilla/websocket"
    "go.uber.org/zap"
    "gopkg.in/fatih/set.v0"
)

type Message struct {
    Model
    FormId   int64  `json:"userId"`   //信息发送者
    TargetId int64  `json:"targetId"` //信息接收者
    Type     int    //聊天类型:群聊 私聊 广播
    Media    int    //信息类型:文字 图片 音频
    Content  string //消息内容
    Pic      string `json:"url"` //图片相关
    Url      string //文件相关
    Desc     string //文件描述
    Amount   int    //其他数据大小
}

func (m *Message) MsgTableName() string {
    return "message"
}

//Node 构造连接
type Node struct {
    Conn      *websocket.Conn //连接
    Addr      string          //客户端地址
    DataQueue chan []byte     //消息
    GroupSets set.Interface   //好友 / 群
}

//映射关系
var clientMap map[int64]*Node = make(map[int64]*Node, 0)

//读写锁
var rwLocker sync.RWMutex

//Chat    需要 :发送者ID ,接受者ID ,消息类型,发送的内容,发送类型
func Chat(w http.ResponseWriter, r *http.Request) {
    //1.  获取参数 并 检验 token 等合法性
    query := r.URL.Query()
    fmt.Println("handle:", query)
    Id := query.Get("userId")
    //token := query.Get("token")

    userId, err := strconv.ParseInt(Id, 10, 64)
    if err != nil {
        zap.S().Info("类型转换失败", err)
        return
    }

    //升级为socket
    var isvalida = true
    conn, err := (&websocket.Upgrader{
        //token 校验
        CheckOrigin: func(r *http.Request) bool {
            return isvalida
        },
    }).Upgrade(w, r, nil)
    if err != nil {
        fmt.Println(err)
        return
    }

    //获取socket连接,构造消息节点
    node := &Node{
        Conn:      conn,
        DataQueue: make(chan []byte, 50),
        GroupSets: set.New(set.ThreadSafe),
    }

    //用户关系

    //将userId和Node绑定
    rwLocker.Lock()
    clientMap[userId] = node
    rwLocker.Unlock()

    fmt.Println("uid", userId)

    //发送接收消息
    //发送消息
    go sendProc(node)
    //接收消息
    go recProc(node)
    sendMsg(userId, []byte("欢迎进入聊天系统"))
}

func sendProc(node *Node) {
    for {
        select {
        case data := <-node.DataQueue:
            err := node.Conn.WriteMessage(websocket.TextMessage, data)
            if err != nil {
                zap.S().Info("写入消息失败", err)
                return
            }
            fmt.Println("数据发送socket成功")
        }

    }
}

func recProc(node *Node) {
    for {
        //获取信息
        _, data, err := node.Conn.ReadMessage()
        if err != nil {
            zap.S().Info("读取消息失败", err)
            return
        }

        //dispatch(data)

        brodMsg(data)

        //这里是简单实现的一种方法
        //msg := Message{}
        //err = json.Unmarshal(data, &msg)
        //if err != nil {
        //    zap.S().Info("json解析失败", err)
        //    return
        //}
        //
        //if msg.Type == 1 {
        //    zap.S().Info("这是一条私信:", msg.Content)
        //    tarNode, ok := clientMap[msg.TargetId]
        //    if !ok {
        //        zap.S().Info("不存在对应的node", msg.TargetId)
        //        return
        //    }
        //
        //    tarNode.DataQueue <- data
        //    fmt.Println("发送成功:", string(data))
        //}

    }
}

var upSendChan chan []byte = make(chan []byte, 1024)

func brodMsg(data []byte) {
    upSendChan <- data

}

func init() {
    go UdpSendProc()
    go UpdRecProc()
}

//UdpSendProc 完成upd数据发送
func UdpSendProc() {
    udpConn, err := net.DialUDP("udp", nil, &net.UDPAddr{
        //192.168.31.147
        IP:   net.IPv4(127, 0, 0, 1),
        Port: 3000,
        Zone: "",
    })
    if err != nil {
        zap.S().Info("拨号udp端口失败", err)
        return
    }

    defer udpConn.Close()

    for {
        select {
        case data := <-upSendChan:
            _, err := udpConn.Write(data)
            if err != nil {
                zap.S().Info("写入udp消息失败", err)
                return
            }
            fmt.Println("数据成功发送到udp服务端:", string(data))
        }
    }

}

//UpdRecProc 完成udp数据的接收
func UpdRecProc() {
    udpConn, err := net.ListenUDP("udp", &net.UDPAddr{
        IP:   net.IPv4(127, 0, 0, 1),
        Port: 3000,
    })
    if err != nil {
        zap.S().Info("监听udp端口失败", err)
        return
    }

    defer udpConn.Close()

    for {
        var buf [1024]byte
        n, err := udpConn.Read(buf[0:])
        if err != nil {
            zap.S().Info("读取udp数据失败", err)
            return
        }

        fmt.Println("udp服务端接收udp数据", buf[0:n])

        //处理发送逻辑
        dispatch(buf[0:n])
    }
}

func dispatch(data []byte) {
    //解析消息
    msg := Message{}
    err := json.Unmarshal(data, &msg)
    if err != nil {
        zap.S().Info("消息解析失败", err)
        return
    }

    fmt.Println("解析数据:", msg, "msg.FormId", msg.FormId, "targetId:", msg.TargetId, "type:", msg.Type)

    //判断消息类型
    switch msg.Type {
    case 1: //私聊
        sendMsgAndSave(msg.TargetId, data)
    case 2: //群发
        sendGroupMsg(uint(msg.FormId), uint(msg.TargetId), data)
    }
}

//sendGroupMsg 群发
func sendGroupMsg(formId, target uint, data []byte) (int, error) {
    //群发的逻辑:1获取到群里所有用户,然后向除开自己的每一位用户发送消息
    userIDs, err := FindUsers(target)
    if err != nil {
        return -1, err
    }

    for _, userId := range *userIDs {
        if formId != userId {
            sendMsgAndSave(int64(userId), data)
        }
    }
    return 0, nil
}

//sendMs 向用户发送消息
func sendMsg(id int64, msg []byte) {
    rwLocker.Lock()
    node, ok := clientMap[id]
    rwLocker.Unlock()

    if !ok {
        zap.S().Info("userID没有对应的node")
        return
    }

    zap.S().Info("targetID:", id, "node:", node)
    if ok {
        node.DataQueue <- msg
    }
}

//sendMsgTest 发送消息 并存储聊天记录到redis
func sendMsgAndSave(userId int64, msg []byte) {

    rwLocker.RLock()
    node, ok := clientMap[userId] //对方是否在线
    rwLocker.RUnlock()

    jsonMsg := Message{}
    json.Unmarshal(msg, &jsonMsg)
    ctx := context.Background()
    targetIdStr := strconv.Itoa(int(userId))
    userIdStr := strconv.Itoa(int(jsonMsg.FormId))

    //如果不在线
    if ok {
        zap.S().Info(userId, "不在线, 没有对应的node,")

        node.DataQueue <- msg
    }

    //拼接记录名称
    var key string
    if userId > jsonMsg.FormId {
        key = "msg_" + userIdStr + "_" + targetIdStr
    } else {
        key = "msg_" + targetIdStr + "_" + userIdStr
    }

    //创建记录
    res, err := global.RedisDB.ZRevRange(ctx, key, 0, -1).Result()
    if err != nil {
        fmt.Println(err)
    }

    //将聊天记录写入数据库
    score := float64(cap(res)) + 1
    ress, e := global.RedisDB.ZAdd(ctx, key, &redis.Z{score, msg}).Result() //jsonMsg
    //res, e := utils.Red.Do(ctx, "zadd", key, 1, jsonMsg).Result() //备用 后续拓展 记录完整msg
    if e != nil {
        fmt.Println(e)
    }
    fmt.Println(ress)
}

//MarshalBinary 需要重写此方法才能完整的msg转byte[]
func (msg Message) MarshalBinary() ([]byte, error) {
    return json.Marshal(msg)
}

//RedisMsg 获取缓存里面的消息
func RedisMsg(userIdA int64, userIdB int64, start int64, end int64, isRev bool) []string {
    ctx := context.Background()
    userIdStr := strconv.Itoa(int(userIdA))
    targetIdStr := strconv.Itoa(int(userIdB))

    //拼接key
    var key string
    if userIdA > userIdB {
        key = "msg_" + targetIdStr + "_" + userIdStr
    } else {
        key = "msg_" + userIdStr + "_" + targetIdStr
    }

    var rels []string
    var err error
    if isRev {
        rels, err = global.RedisDB.ZRange(ctx, key, start, end).Result()
    } else {
        rels, err = global.RedisDB.ZRevRange(ctx, key, start, end).Result()
    }
    if err != nil {
        fmt.Println(err) //没有找到
    }
    return rels
}

测试

消息的发送与接收

消息发送与接收不管是单聊还是群聊测试方法都和《「从0到1搭建一个IM项目」信息模块开发之消息体的设计》中的测试是一样的,这里就不做过多介绍了。

获取聊天记录

%E6%88%AA%E5%B1%8F2023-01-08%20%E4%B8%8B%E5%8D%887.48.05.png

总结

本篇中主要讲解了聊天分类,聊天记录存储和获取,IM系统的核心还是消息的收发过程,这个过程你可能需要多看代码,先把这个思路整理出来,我想这才是最重要的。到这里,我们的项目大部分功能就完成了,整个api的开发就完成,所有api:

package router

import (
    "HiChat/middlewear"
    "HiChat/service"

    "github.com/gin-gonic/gin"
)

func Router() *gin.Engine {
    router := gin.Default()
    v1 := router.Group("v1")

    //用户模块
    user := v1.Group("user")
    {
        user.GET("/list", middlewear.JWY(), service.List)
        user.POST("/login_pw", service.LoginByNameAndPassWord)
        user.POST("/new", middlewear.JWY(), service.NewUser)
        user.DELETE("/delete", middlewear.JWY(), service.DeleteUser)
        user.POST("/updata", middlewear.JWY(), service.UpdataUser)
        user.GET("/SendUserMsg", middlewear.JWY(), service.SendUserMsg)
    }

    //图片、语音模块
    upload := v1.Group("upload").Use(middlewear.JWY())
    {
        upload.POST("/image", service.Image)
    }

    //好友关系
    relation := v1.Group("relation").Use(middlewear.JWY())
    {
        relation.POST("/list", service.FriendList)
        relation.POST("/add", service.AddFriendByName)
        relation.POST("/new_group", service.NewGroup)
        relation.POST("/group_list", service.GroupList)
        relation.POST("/join_group", service.JoinGroup)
    }

    //聊天记录
    v1.POST("/user/redisMsg", service.RedisMsg).Use(middlewear.JWY())

    return router
}

在后续文章中,我们将项目参数使用viper进行配置化,然后加入前端最后进行部署。

本作品采用《CC 协议》,转载必须注明作者和本文链接
刻意学习
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!
未填写
文章
118
粉丝
88
喜欢
173
收藏
244
排名:367
访问:2.6 万
私信
所有博文
社区赞助商