最佳实践之弹幕功能设计

[toc]

写在前面

最近逛B站刷各种鬼畜,发现弹幕真是有意思的话题,弹幕内容各种搞怪鬼畜等,那么言归正传,弹幕在在线视频平台或者社交直播的不可或缺之物,本文将如何开发一个简单的弹幕和在线观看统计功能,如果你正在考虑在你的应用中加入弹幕功能,那么你来对地方了。

在深入实现之前,我们先来了解一下弹幕是什么。弹幕,源自日本,是指在视频、直播等画面上飘动的一些实时评论或互动信息。用户可以通过输入文字,将其发送到共享的画面上,形成一种实时互动的社交体验。

主要功能

这里主要实现接口:

  • 实时获取弹幕
  • 发布弹幕

数据库表设计

这里需要设计弹幕表,用来记录对应视频的弹幕信息

// Barrage 弹幕表结构
type Barrage struct {
    Id          int    //主键
    Content     string //弹幕内容
    CurrentTime int    //弹幕在视频中的展示时间,秒
    AddTime     int64  //添加时间
    UserId      int    //添加用户
    Status      int    //弹幕状态
    EpisodesId  int    //归属具体视频
    VideoId     int    //归属影视作品
}

相信下图很好理解:

实时获取弹幕

下面来实现弹幕的核心功能,实习获取弹幕信息,为了系统性能和方便统计在线观看,这里使用到Websocket保证实时性。

主要逻辑

1、使用ws协议,从视频播放开始每隔60s获取一次弹幕内容,当前60s播放结束后再次请求下一次60s开始的时候后期60s的弹幕内容,前端这要在60s循环,对比弹幕时间和视频播放时间对应,渲染到屏幕即可。

2、为了统计在线观看数,这里需要设计全局线程安全的map:

type VideoStats struct {
    views map[int]int  //id->view_count
    mu    sync.Mutex   //互斥锁保证并发安全
}

代码实现

获取弹幕,最终需要在数据库中查询,查询条件是:EpisodesId视频ID,startTime-endTime时间范围,前端需要给到两个参数:

  • EpisodesId
  • CurrentTime

为了统计在线观看数,我们需要实现一个并发安全的map和对应操作方法:


type VideoStats struct {
    views map[int]int
    mu    sync.Mutex
}

var videoStats = NewVideoStats()

func NewVideoStats() *VideoStats {
    return &VideoStats{
        views: make(map[int]int),
    }
}

func (vs *VideoStats) AddView(videoID int) {
    vs.mu.Lock()
    defer vs.mu.Unlock()
    vs.views[videoID]++
}

func (vs *VideoStats) ReduceView(videoID int) {
    vs.mu.Lock()
    defer vs.mu.Unlock()
    vs.views[videoID]--
}

func (vs *VideoStats) GetViews(videoID int) int {
    vs.mu.Lock()
    defer vs.mu.Unlock()
    return vs.views[videoID]
}

定义一些结构体:

// WsData 数据格式
type WsData struct {
    CurrentTime int //当前时间
    EpisodesId  int //视频ID
}

//返回的数据格式
type Barrage struct {
    BarrageMsg []models.BarrageData `json:"barrage_msg"`
    UserOnline int                  `json:"user_online"`
}

这里需要解决跨域问题:

// 设置websocket跨域问题
var (
    upgrader = websocket.Upgrader{
        CheckOrigin: func(r *http.Request) bool {
            return true
        },
    }
)

接下来看Controller层的核心代码:

func (b *BarrageControllers) BarrageWs() {
    var (
        conn    *websocket.Conn
        err     error
        data    []byte
        barrage Barrage
    )

    //将http转为websocket
    if conn, err = upgrader.Upgrade(b.Ctx.ResponseWriter, b.Ctx.Request, nil); err != nil {
        conn.Close()
    }

    if _, data, err = conn.ReadMessage(); err != nil {
        conn.Close()
    }

    var wsDataInfo WsData
    json.Unmarshal(data, &wsDataInfo)

    videoStats.AddView(wsDataInfo.EpisodesId)
    ResBarrageMsg(conn, wsDataInfo, barrage)

    //用户退出播放视频,需要对对应视频在线数减一,并且关闭连接
    defer func(Eid int) {
        videoStats.ReduceView(Eid)
        conn.Close()
    }(wsDataInfo.EpisodesId)

    //监听消息
    for {
        if _, data, err = conn.ReadMessage(); err != nil {
            conn.Close()
        }
        var wsData WsData
        json.Unmarshal(data, &wsData)
        ResBarrageMsg(conn, wsData, barrage)
    }
}

func ResBarrageMsg(conn *websocket.Conn, wsData WsData, barrage Barrage) {
    var err error

    //当前时间开始后的60s
    endTime := wsData.CurrentTime + 60

    //获取弹幕数据
    _, barrage.BarrageMsg, err = models.BarrageList(wsData.EpisodesId, wsData.CurrentTime, endTime)

    //返回在线人数
    barrage.UserOnline = videoStats.GetViews(wsData.EpisodesId)
    if err == nil {
        if err := conn.WriteJSON(barrage); err != nil {
            conn.Close()
        }
    }
}

这里您可能有疑问,为什么在for之前要获取一次websocket的数据?

原因是:为了方便统计对应视频的在线观看人数,如果我们之间在for中做这就是,其实是很复杂的,可能会涉及到channel,并发,数据一致性等各种问题,您可以理解为为了简化流程。

model层代码:

// BarrageData 弹幕返回结构
type BarrageData struct {
    Id          int    `json:"id"`
    Content     string `json:"content"`
    CurrentTime int    `json:"currentTime"`
}

// BarrageList 获取指定时间范围弹幕内容
func BarrageList(episodesId int, startTime int, endTime int) (int64, []BarrageData, error) {
    o := orm.NewOrm()
    var barrages []BarrageData
    num, err := o.Raw("SELECT id,content,`current_time` FROM barrage WHERE status=1 AND episodes_id=? AND `current_time`>=? AND `current_time`<? ORDER BY `current_time` ASC", episodesId, startTime, endTime).QueryRows(&barrages)
    return num, barrages, err
}

我们在router配置接口路由:

package routers

import (
    "fyoukuApi/controllers"
    "github.com/astaxie/beego"
)

// 路由配置
func init() {
    //弹幕功能
    beego.Router("/barrage/ws", &controllers.BarrageControllers{}, "get:BarrageWs")
}

完整代码

Controller层:

package controllers

import (
    "encoding/json"
    "fyoukuApi/models"
    "github.com/astaxie/beego"
    "github.com/gorilla/websocket"
    "net/http"
    "sync"
)

type BarrageControllers struct {
    beego.Controller
}

type VideoStats struct {
    views map[int]int
    mu    sync.Mutex
}

var videoStats = NewVideoStats()

func NewVideoStats() *VideoStats {
    return &VideoStats{
        views: make(map[int]int),
    }
}

func (vs *VideoStats) AddView(videoID int) {
    vs.mu.Lock()
    defer vs.mu.Unlock()
    vs.views[videoID]++
}

func (vs *VideoStats) ReduceView(videoID int) {
    vs.mu.Lock()
    defer vs.mu.Unlock()
    vs.views[videoID]--
}

func (vs *VideoStats) GetViews(videoID int) int {
    vs.mu.Lock()
    defer vs.mu.Unlock()
    return vs.views[videoID]
}

// WsData 数据格式
type WsData struct {
    CurrentTime int //当前时间
    EpisodesId  int //视频ID
}

type Barrage struct {
    BarrageMsg []models.BarrageData `json:"barrage_msg"`
    UserOnline int                  `json:"user_online"`
}

// 设置websocket跨域问题
var (
    upgrader = websocket.Upgrader{
        CheckOrigin: func(r *http.Request) bool {
            return true
        },
    }
)

// BarrageWs 获取弹幕websocket 核心逻辑:使用ws协议,从视频播放开始获取60s的弹幕内容,60s播放结束后再次请求后60s的弹幕内容,
// 前端这要在60s循环对比弹幕时间和视频播放时间对应,渲染到屏幕即可。
func (b *BarrageControllers) BarrageWs() {
    var (
        conn    *websocket.Conn
        err     error
        data    []byte
        barrage Barrage
    )

    //将http转为websocket
    if conn, err = upgrader.Upgrade(b.Ctx.ResponseWriter, b.Ctx.Request, nil); err != nil {
        conn.Close()
    }

    if _, data, err = conn.ReadMessage(); err != nil {
        conn.Close()
    }

    var wsDataInfo WsData
    json.Unmarshal(data, &wsDataInfo)

    videoStats.AddView(wsDataInfo.EpisodesId)
    ResBarrageMsg(conn, wsDataInfo, barrage)

    //用户退出视频
    defer func(Eid int) {
        videoStats.ReduceView(Eid)
        conn.Close()
    }(wsDataInfo.EpisodesId)

    //监听消息
    for {
        if _, data, err = conn.ReadMessage(); err != nil {
            conn.Close()
        }
        var wsData WsData
        json.Unmarshal(data, &wsData)
        ResBarrageMsg(conn, wsData, barrage)
    }
}

func ResBarrageMsg(conn *websocket.Conn, wsData WsData, barrage Barrage) {
    var err error

    //当前时间开始后的60s
    endTime := wsData.CurrentTime + 60

    //获取弹幕数据
    _, barrage.BarrageMsg, err = models.BarrageList(wsData.EpisodesId, wsData.CurrentTime, endTime)

    //返回在线人数
    barrage.UserOnline = videoStats.GetViews(wsData.EpisodesId)
    if err == nil {
        if err := conn.WriteJSON(barrage); err != nil {
            conn.Close()
        }
    }
}

model层:

package models

import (
    "github.com/astaxie/beego/orm"
    "time"
)

// Barrage 弹幕表结构
type Barrage struct {
    Id          int    //主键
    Content     string //弹幕内容
    CurrentTime int    //当前时间,秒
    AddTime     int64  //添加时间
    UserId      int    //添加用户
    Status      int    //弹幕状态
    EpisodesId  int    //弹幕视频
    VideoId     int    //归属视频
}

// BarrageData 弹幕返回结构
type BarrageData struct {
    Id          int    `json:"id"`
    Content     string `json:"content"`
    CurrentTime int    `json:"currentTime"`
}

func init() {
    orm.RegisterModel(new(Barrage))
}

// BarrageList 获取指定时间范围弹幕内容
func BarrageList(episodesId int, startTime int, endTime int) (int64, []BarrageData, error) {
    o := orm.NewOrm()
    var barrages []BarrageData
    num, err := o.Raw("SELECT id,content,`current_time` FROM barrage WHERE status=1 AND episodes_id=? AND `current_time`>=? AND `current_time`<? ORDER BY `current_time` ASC", episodesId, startTime, endTime).QueryRows(&barrages)
    return num, barrages, err
}

测试

使用api工具测试

返回数据:

{
    "barrage_msg": [
        {
            "id": 53,
            "content": "冲冲冲!",
            "currentTime": 1
        },
        {
            "id": 54,
            "content": "斗破大陆一片天,谁见海老不递烟!",
            "currentTime": 1
        },
        {
            "id": 55,
            "content": "你们看到这里的时候我已经看完了",
            "currentTime": 1
        },
        {
            "id": 56,
            "content": "打倒唐三,胜利属于武魂殿",
            "currentTime": 8
        },
        {
            "id": 57,
            "content": "萧炎突破斗帝了哎",
            "currentTime": 10
        },
        {
            "id": 58,
            "content": "魂天帝受死吧",
            "currentTime": 10
        },
        {
            "id": 61,
            "content": "斗破大陆一片天,谁见海老不递烟!",
            "currentTime": 10
        },
        {
            "id": 62,
            "content": "冲冲冲!",
            "currentTime": 14
        },
        {
            "id": 63,
            "content": "反派死于话多!",
            "currentTime": 14
        },
        {
            "id": 64,
            "content": "话太多了!",
            "currentTime": 17
        },
        {
            "id": 52,
            "content": "发个弹幕试一试",
            "currentTime": 18
        }
    ],
    "user_online": 2
}

最终效果:

发布弹幕

主要逻辑

发布弹幕功能其实就很简单了,主要有两步:1、发布弹幕后前端立即渲染到屏幕上,2、写入数据库。直接上代码吧!

代码实现

Controller层:


func (b *BarrageControllers) Save() {
    uid, _ := b.GetInt("uid")
    content := b.GetString("content")
    currentTime, _ := b.GetInt("currentTime")
    episodesId, _ := b.GetInt("episodesId")
    videoId, _ := b.GetInt("videoId")

    if content == "" {
        b.Data["json"] = ReturnError(4001, "弹幕不能为空")
        b.ServeJSON()
    }
    if uid == 0 {
        b.Data["json"] = ReturnError(4002, "请先登录")
        b.ServeJSON()
    }
    if episodesId == 0 {
        b.Data["json"] = ReturnError(4003, "必须指定剧集ID")
        b.ServeJSON()
    }
    if videoId == 0 {
        b.Data["json"] = ReturnError(4005, "必须指定视频ID")
        b.ServeJSON()
    }

    if currentTime == 0 {
        b.Data["json"] = ReturnError(4006, "必须指定视频播放时间")
        b.ServeJSON()
    }
    err := models.SaveBarrage(episodesId, videoId, currentTime, uid, content)
    if err == nil {
        b.Data["json"] = ReturnSuccess(0, "success", "", 1)
        b.ServeJSON()
    } else {
        b.Data["json"] = ReturnError(5000, err)
        b.ServeJSON()
    }
}

model层:

// SaveBarrage 保存弹幕
func SaveBarrage(episodesId int, videoId int, currentTime int, uid int, content string) error {
    o := orm.NewOrm()
    var barrage Barrage
    barrage.Content = content
    barrage.CurrentTime = currentTime
    barrage.AddTime = time.Now().Unix()
    barrage.UserId = uid
    barrage.Status = 1
    barrage.EpisodesId = episodesId
    barrage.VideoId = videoId
    _, err := o.Insert(&barrage)
    return err
}

router路由配置:

package routers

import (
    "fyoukuApi/controllers"
    "github.com/astaxie/beego"
)

// 路由配置
func init() {
    //弹幕功能
    beego.Router("/barrage/ws", &controllers.BarrageControllers{}, "get:BarrageWs")
  beego.Router("/barrage/save", &controllers.BarrageControllers{}, "post:Save")
}

测试

使用api工具:

结果:

总结

文章到这里就简单的介绍结束了,实现一个弹幕功能并不是一项复杂的任务,但要确保其在用户体验、实时性和安全性方面都能达到最佳水平,需要综合考虑前后端的协同工作。通过本文的学习,相信你已经对实现弹幕功能有了更清晰的认识。

本作品采用《CC 协议》,转载必须注明作者和本文链接
刻意学习
本帖由系统于 4个月前 自动加精
讨论数量: 1

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!
未填写
文章
118
粉丝
89
喜欢
173
收藏
246
排名:365
访问:2.6 万
私信
所有博文
社区赞助商