最佳实践之弹幕功能设计
[toc]
写在前面
最近逛B站刷各种鬼畜,发现弹幕真是有意思的话题,弹幕内容各种搞怪鬼畜等,那么言归正传,弹幕在在线视频平台或者社交直播的不可或缺之物,本文将如何开发一个简单的弹幕和在线观看统计功能,如果你正在考虑在你的应用中加入弹幕功能,那么你来对地方了。
在深入实现之前,我们先来了解一下弹幕是什么。弹幕,源自日本,是指在视频、直播等画面上飘动的一些实时评论或互动信息。用户可以通过输入文字,将其发送到共享的画面上,形成一种实时互动的社交体验。
主要功能
这里主要实现接口:
- 实时获取弹幕
- 发布弹幕
数据库表设计
这里需要设计弹幕表,用来记录对应视频的弹幕信息
// Barrage 弹幕表结构
type Barrage struct {
Id int //主键
Content string //弹幕内容
CurrentTime int //弹幕在视频中的展示时间,秒
AddTime int64 //添加时间
UserId int //添加用户
Status int //弹幕状态
EpisodesId int //归属具体视频
VideoId int //归属影视作品
}
相信下图很好理解:
实时获取弹幕
下面来实现弹幕的核心功能,实习获取弹幕信息,为了系统性能和方便统计在线观看,这里使用到Websocket保证实时性。
主要逻辑
1、使用ws协议,从视频播放开始每隔60s获取一次弹幕内容,当前60s播放结束后再次请求下一次60s开始的时候后期60s的弹幕内容,前端这要在60s循环,对比弹幕时间和视频播放时间对应,渲染到屏幕即可。
2、为了统计在线观看数,这里需要设计全局线程安全的map:
type VideoStats struct {
views map[int]int //id->view_count
mu sync.Mutex //互斥锁保证并发安全
}
代码实现
获取弹幕,最终需要在数据库中查询,查询条件是:EpisodesId视频ID,startTime-endTime时间范围,前端需要给到两个参数:
- EpisodesId
- CurrentTime
为了统计在线观看数,我们需要实现一个并发安全的map和对应操作方法:
type VideoStats struct {
views map[int]int
mu sync.Mutex
}
var videoStats = NewVideoStats()
func NewVideoStats() *VideoStats {
return &VideoStats{
views: make(map[int]int),
}
}
func (vs *VideoStats) AddView(videoID int) {
vs.mu.Lock()
defer vs.mu.Unlock()
vs.views[videoID]++
}
func (vs *VideoStats) ReduceView(videoID int) {
vs.mu.Lock()
defer vs.mu.Unlock()
vs.views[videoID]--
}
func (vs *VideoStats) GetViews(videoID int) int {
vs.mu.Lock()
defer vs.mu.Unlock()
return vs.views[videoID]
}
定义一些结构体:
// WsData 数据格式
type WsData struct {
CurrentTime int //当前时间
EpisodesId int //视频ID
}
//返回的数据格式
type Barrage struct {
BarrageMsg []models.BarrageData `json:"barrage_msg"`
UserOnline int `json:"user_online"`
}
这里需要解决跨域问题:
// 设置websocket跨域问题
var (
upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}
)
接下来看Controller层的核心代码:
func (b *BarrageControllers) BarrageWs() {
var (
conn *websocket.Conn
err error
data []byte
barrage Barrage
)
//将http转为websocket
if conn, err = upgrader.Upgrade(b.Ctx.ResponseWriter, b.Ctx.Request, nil); err != nil {
conn.Close()
}
if _, data, err = conn.ReadMessage(); err != nil {
conn.Close()
}
var wsDataInfo WsData
json.Unmarshal(data, &wsDataInfo)
videoStats.AddView(wsDataInfo.EpisodesId)
ResBarrageMsg(conn, wsDataInfo, barrage)
//用户退出播放视频,需要对对应视频在线数减一,并且关闭连接
defer func(Eid int) {
videoStats.ReduceView(Eid)
conn.Close()
}(wsDataInfo.EpisodesId)
//监听消息
for {
if _, data, err = conn.ReadMessage(); err != nil {
conn.Close()
}
var wsData WsData
json.Unmarshal(data, &wsData)
ResBarrageMsg(conn, wsData, barrage)
}
}
func ResBarrageMsg(conn *websocket.Conn, wsData WsData, barrage Barrage) {
var err error
//当前时间开始后的60s
endTime := wsData.CurrentTime + 60
//获取弹幕数据
_, barrage.BarrageMsg, err = models.BarrageList(wsData.EpisodesId, wsData.CurrentTime, endTime)
//返回在线人数
barrage.UserOnline = videoStats.GetViews(wsData.EpisodesId)
if err == nil {
if err := conn.WriteJSON(barrage); err != nil {
conn.Close()
}
}
}
这里您可能有疑问,为什么在for之前要获取一次websocket的数据?
原因是:为了方便统计对应视频的在线观看人数,如果我们之间在for中做这就是,其实是很复杂的,可能会涉及到channel,并发,数据一致性等各种问题,您可以理解为为了简化流程。
model层代码:
// BarrageData 弹幕返回结构
type BarrageData struct {
Id int `json:"id"`
Content string `json:"content"`
CurrentTime int `json:"currentTime"`
}
// BarrageList 获取指定时间范围弹幕内容
func BarrageList(episodesId int, startTime int, endTime int) (int64, []BarrageData, error) {
o := orm.NewOrm()
var barrages []BarrageData
num, err := o.Raw("SELECT id,content,`current_time` FROM barrage WHERE status=1 AND episodes_id=? AND `current_time`>=? AND `current_time`<? ORDER BY `current_time` ASC", episodesId, startTime, endTime).QueryRows(&barrages)
return num, barrages, err
}
我们在router配置接口路由:
package routers
import (
"fyoukuApi/controllers"
"github.com/astaxie/beego"
)
// 路由配置
func init() {
//弹幕功能
beego.Router("/barrage/ws", &controllers.BarrageControllers{}, "get:BarrageWs")
}
完整代码
Controller层:
package controllers
import (
"encoding/json"
"fyoukuApi/models"
"github.com/astaxie/beego"
"github.com/gorilla/websocket"
"net/http"
"sync"
)
type BarrageControllers struct {
beego.Controller
}
type VideoStats struct {
views map[int]int
mu sync.Mutex
}
var videoStats = NewVideoStats()
func NewVideoStats() *VideoStats {
return &VideoStats{
views: make(map[int]int),
}
}
func (vs *VideoStats) AddView(videoID int) {
vs.mu.Lock()
defer vs.mu.Unlock()
vs.views[videoID]++
}
func (vs *VideoStats) ReduceView(videoID int) {
vs.mu.Lock()
defer vs.mu.Unlock()
vs.views[videoID]--
}
func (vs *VideoStats) GetViews(videoID int) int {
vs.mu.Lock()
defer vs.mu.Unlock()
return vs.views[videoID]
}
// WsData 数据格式
type WsData struct {
CurrentTime int //当前时间
EpisodesId int //视频ID
}
type Barrage struct {
BarrageMsg []models.BarrageData `json:"barrage_msg"`
UserOnline int `json:"user_online"`
}
// 设置websocket跨域问题
var (
upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}
)
// BarrageWs 获取弹幕websocket 核心逻辑:使用ws协议,从视频播放开始获取60s的弹幕内容,60s播放结束后再次请求后60s的弹幕内容,
// 前端这要在60s循环对比弹幕时间和视频播放时间对应,渲染到屏幕即可。
func (b *BarrageControllers) BarrageWs() {
var (
conn *websocket.Conn
err error
data []byte
barrage Barrage
)
//将http转为websocket
if conn, err = upgrader.Upgrade(b.Ctx.ResponseWriter, b.Ctx.Request, nil); err != nil {
conn.Close()
}
if _, data, err = conn.ReadMessage(); err != nil {
conn.Close()
}
var wsDataInfo WsData
json.Unmarshal(data, &wsDataInfo)
videoStats.AddView(wsDataInfo.EpisodesId)
ResBarrageMsg(conn, wsDataInfo, barrage)
//用户退出视频
defer func(Eid int) {
videoStats.ReduceView(Eid)
conn.Close()
}(wsDataInfo.EpisodesId)
//监听消息
for {
if _, data, err = conn.ReadMessage(); err != nil {
conn.Close()
}
var wsData WsData
json.Unmarshal(data, &wsData)
ResBarrageMsg(conn, wsData, barrage)
}
}
func ResBarrageMsg(conn *websocket.Conn, wsData WsData, barrage Barrage) {
var err error
//当前时间开始后的60s
endTime := wsData.CurrentTime + 60
//获取弹幕数据
_, barrage.BarrageMsg, err = models.BarrageList(wsData.EpisodesId, wsData.CurrentTime, endTime)
//返回在线人数
barrage.UserOnline = videoStats.GetViews(wsData.EpisodesId)
if err == nil {
if err := conn.WriteJSON(barrage); err != nil {
conn.Close()
}
}
}
model层:
package models
import (
"github.com/astaxie/beego/orm"
"time"
)
// Barrage 弹幕表结构
type Barrage struct {
Id int //主键
Content string //弹幕内容
CurrentTime int //当前时间,秒
AddTime int64 //添加时间
UserId int //添加用户
Status int //弹幕状态
EpisodesId int //弹幕视频
VideoId int //归属视频
}
// BarrageData 弹幕返回结构
type BarrageData struct {
Id int `json:"id"`
Content string `json:"content"`
CurrentTime int `json:"currentTime"`
}
func init() {
orm.RegisterModel(new(Barrage))
}
// BarrageList 获取指定时间范围弹幕内容
func BarrageList(episodesId int, startTime int, endTime int) (int64, []BarrageData, error) {
o := orm.NewOrm()
var barrages []BarrageData
num, err := o.Raw("SELECT id,content,`current_time` FROM barrage WHERE status=1 AND episodes_id=? AND `current_time`>=? AND `current_time`<? ORDER BY `current_time` ASC", episodesId, startTime, endTime).QueryRows(&barrages)
return num, barrages, err
}
测试
使用api工具测试
返回数据:
{
"barrage_msg": [
{
"id": 53,
"content": "冲冲冲!",
"currentTime": 1
},
{
"id": 54,
"content": "斗破大陆一片天,谁见海老不递烟!",
"currentTime": 1
},
{
"id": 55,
"content": "你们看到这里的时候我已经看完了",
"currentTime": 1
},
{
"id": 56,
"content": "打倒唐三,胜利属于武魂殿",
"currentTime": 8
},
{
"id": 57,
"content": "萧炎突破斗帝了哎",
"currentTime": 10
},
{
"id": 58,
"content": "魂天帝受死吧",
"currentTime": 10
},
{
"id": 61,
"content": "斗破大陆一片天,谁见海老不递烟!",
"currentTime": 10
},
{
"id": 62,
"content": "冲冲冲!",
"currentTime": 14
},
{
"id": 63,
"content": "反派死于话多!",
"currentTime": 14
},
{
"id": 64,
"content": "话太多了!",
"currentTime": 17
},
{
"id": 52,
"content": "发个弹幕试一试",
"currentTime": 18
}
],
"user_online": 2
}
最终效果:
发布弹幕
主要逻辑
发布弹幕功能其实就很简单了,主要有两步:1、发布弹幕后前端立即渲染到屏幕上,2、写入数据库。直接上代码吧!
代码实现
Controller层:
func (b *BarrageControllers) Save() {
uid, _ := b.GetInt("uid")
content := b.GetString("content")
currentTime, _ := b.GetInt("currentTime")
episodesId, _ := b.GetInt("episodesId")
videoId, _ := b.GetInt("videoId")
if content == "" {
b.Data["json"] = ReturnError(4001, "弹幕不能为空")
b.ServeJSON()
}
if uid == 0 {
b.Data["json"] = ReturnError(4002, "请先登录")
b.ServeJSON()
}
if episodesId == 0 {
b.Data["json"] = ReturnError(4003, "必须指定剧集ID")
b.ServeJSON()
}
if videoId == 0 {
b.Data["json"] = ReturnError(4005, "必须指定视频ID")
b.ServeJSON()
}
if currentTime == 0 {
b.Data["json"] = ReturnError(4006, "必须指定视频播放时间")
b.ServeJSON()
}
err := models.SaveBarrage(episodesId, videoId, currentTime, uid, content)
if err == nil {
b.Data["json"] = ReturnSuccess(0, "success", "", 1)
b.ServeJSON()
} else {
b.Data["json"] = ReturnError(5000, err)
b.ServeJSON()
}
}
model层:
// SaveBarrage 保存弹幕
func SaveBarrage(episodesId int, videoId int, currentTime int, uid int, content string) error {
o := orm.NewOrm()
var barrage Barrage
barrage.Content = content
barrage.CurrentTime = currentTime
barrage.AddTime = time.Now().Unix()
barrage.UserId = uid
barrage.Status = 1
barrage.EpisodesId = episodesId
barrage.VideoId = videoId
_, err := o.Insert(&barrage)
return err
}
router路由配置:
package routers
import (
"fyoukuApi/controllers"
"github.com/astaxie/beego"
)
// 路由配置
func init() {
//弹幕功能
beego.Router("/barrage/ws", &controllers.BarrageControllers{}, "get:BarrageWs")
beego.Router("/barrage/save", &controllers.BarrageControllers{}, "post:Save")
}
测试
使用api工具:
结果:
总结
文章到这里就简单的介绍结束了,实现一个弹幕功能并不是一项复杂的任务,但要确保其在用户体验、实时性和安全性方面都能达到最佳水平,需要综合考虑前后端的协同工作。通过本文的学习,相信你已经对实现弹幕功能有了更清晰的认识。
本作品采用《CC 协议》,转载必须注明作者和本文链接
:+1: :+1: :+1: