go websocket推送优化
refer
www.cnblogs.com/chyingp/p/websocke...
github.com/eranyanay/1m-go-websock...
代码地址github.com/luxun9527/gpush
主要是两个方向的优化
1、写的时候定时批量写入。
2、读的时候使用epoll,避免每个连接都启动一个读的协程。
其他有对长连接读写的场景,也可以使用这种方法。
1、定时批量写数据
当需要高频写数据,且数据量比较小的情况下,如推送股票行情。可以定时批量写数据。
当连接数1w,高频写入数据。,使用top命令,可以看到cpu是满载的。使用pprof分析可以看到将数据写入到连接的那一步占用cpu比较高。
go tool pprof -http=:9091 192.168.2.159:8899/debug/pprof/prof...
得益于github.com/gobwas/ws 提供的非常灵活的用法,我们可以暂时将我们要写的数据转为”websocket数据包”缓存起来,直接使用tcp连接,定时批量发送。
package model
import (
"bytes"
gws "github.com/gobwas/ws"
"github.com/gobwas/ws/wsflate"
)
type Message struct {
messageType gws.OpCode
data []byte
}
func NewMessage(code gws.OpCode, data []byte) Message {
return Message{
messageType: code,
data: data,
}
}
//将要写的数据转为websocket格式的数据。
func (m Message) ToBytes() ([]byte, error) {
buf := bytes.NewBuffer(make([]byte, 0, 100))
frame := gws.NewFrame(m.messageType, true, m.data)
if err := gws.WriteFrame(buf, frame); err != nil {
return nil, err
}
return buf.Bytes(), nil
}
对比测试1万连接
github.com/luxun9527/gpush/blob/ma...
//使用缓存,写缓存200 定时500毫秒
func (conn *Connection) WriteLoop() {
defer func() {
if err := recover(); err != nil {
global.L.Debug("recover from read", zap.Any("err", err))
}
conn.Close()
}()
for {
select {
case data := <-conn.write:
if _, err := conn.writeBuf.Write(data); err != nil {
global.L.Debug("Write from read", zap.Any("err", err))
return
}
case <-conn.writeRate.C:
//如果关闭就返回
if conn.isClosed {
return
}
//心跳超时
if conn.lastHeartbeat.Add(time.Millisecond * time.Duration(global.Config.Connection.TimeOut)).Before(time.Now()) {
return
}
//写到连接中
if conn.writeBuf.Available() > 0 {
if err := conn.writeBuf.Flush(); err != nil {
return
}
}
}
}
}
//对比
2023/12/16 04:35:12 当前收到 334224159条 平均每毫秒收到 1661条
2023/12/16 04:35:13 当前收到 339639047条 平均每毫秒收到 2081条
2023/12/16 04:35:17 当前收到 341958389条 平均每毫秒收到 2276条
2023/12/16 04:35:20 当前收到 350767911条 平均每毫秒收到 2119条
2023/12/16 04:35:28 当前收到 371979124条 平均每毫秒收到 2179条
2023/12/16 04:35:29 当前收到 374155711条 平均每毫秒收到 2174条
2023/12/16 04:35:31 当前收到 376456748条 平均每毫秒收到 2296条
2023/12/16 04:35:34 当前收到 381007068条 平均每毫秒收到 2239条
2023/12/16 04:35:35 当前收到 387411405条 平均每毫秒收到 1966条
2023/12/16 04:35:37 当前收到 389106853条 平均每毫秒收到 1593条
2023/12/16 04:35:38 当前收到 391489681条 平均每毫秒收到 2284条
2023/12/16 04:35:39 当前收到 393774240条 平均每毫秒收到 2284条
2023/12/16 04:35:40 当前收到 396169192条 平均每毫秒收到 2394条
2023/12/16 04:35:41 当前收到 398502117条 平均每毫秒收到 2330条
2023/12/16 04:35:43 当前收到 400792789条 平均每毫秒收到 2290条
2023/12/16 04:35:44 当前收到 406800009条 平均每毫秒收到 2745条
2023/12/16 04:35:45 当前收到 408767853条 平均每毫秒收到 1958条
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
147549 root 20 0 2264040 286356 16892 S 389.1 3.3 15:55.13 socket
147567 root 20 0 2283396 508252 5564 R 163.3 5.9 7:29.37 stress
//不使用缓存,直接写到连接中
func (conn *Connection) WriteLoop() {
defer func() {
if err := recover(); err != nil {
global.L.Debug("recover from read", zap.Any("err", err))
}
conn.Close()
}()
for data := range conn.write {
if _, err := conn.Write(data); err != nil {
return
}
}
}
//
2023/12/16 04:39:03 当前收到 121781703条 平均每毫秒收到 1166条
2023/12/16 04:39:05 当前收到 126380058条 平均每毫秒收到 2115条
2023/12/16 04:39:07 当前收到 130433719条 平均每毫秒收到 2155条
2023/12/16 04:39:09 当前收到 134412628条 平均每毫秒收到 1709条
2023/12/16 04:39:10 当前收到 137867897条 平均每毫秒收到 1720条
2023/12/16 04:39:12 当前收到 139214716条 平均每毫秒收到 1334条
2023/12/16 04:39:13 当前收到 143384829条 平均每毫秒收到 2115条
2023/12/16 04:39:15 当前收到 145074988条 平均每毫秒收到 1690条
2023/12/16 04:39:17 当前收到 148277250条 平均每毫秒收到 1705条
2023/12/16 04:39:18 当前收到 151778877条 平均每毫秒收到 1424条
2023/12/16 04:39:20 当前收到 153185029条 平均每毫秒收到 1404条
2023/12/16 04:39:22 当前收到 156690213条 平均每毫秒收到 1690条
2023/12/16 04:39:23 当前收到 159630744条 平均每毫秒收到 1525条
2023/12/16 04:39:24 当前收到 161531340条 平均每毫秒收到 1872条
2023/12/16 04:39:26 当前收到 162992849条 平均每毫秒收到 1452条
2023/12/16 04:39:27 当前收到 166536469条 平均每毫秒收到 1868条
2023/12/16 04:39:28 当前收到 168282233条 平均每毫秒收到 1744条
2023/12/16 04:39:29 当前收到 169958205条 平均每毫秒收到 1646条
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
147979 root 20 0 2257848 337668 16964 R 520.8 3.9 28:07.85 socket
148153 root 20 0 2379540 421912 5508 S 151.3 4.9 2:26.53 stress
使用缓存,cpu的使用率相对更低同时推送的效率也更高。
2、使用epoll,避免每一个连接都启动一个读的协程。
在写多读少的情况下,不用每个连接启动一个读的协程。可以使用go提供的epoll相关的api,只有当连接有数据要被读才去处理。需要控制好读的协程的启动数量和当读取没有读出完整消息,需要自己实现对消息的缓存。
type Epoller struct {
fd int
}
func NewEpoller() *Epoller {
var err error
fd, err := unix.EpollCreate1(0)
if err != nil {
global.L.Panic("init epoll failed", zap.Any("err", err))
}
epoller := &Epoller{fd: fd}
go epoller.run()
return epoller
}
func (e *Epoller) run() {
defer syscall.Close(e.fd)
for {
events := make([]unix.EpollEvent, 1024)
n, err := unix.EpollWait(e.fd, events, -1)
if errors.Is(err, syscall.EINTR) {
continue
}
if errors.Is(err, syscall.EAGAIN) {
break
}
for i := 0; i < n; i++ {
switch events[i].Events {
//case unix.EPOLLIN | unix.EPOLLRDHUP:
case unix.EPOLLIN:
CM.NotifyRead(int(events[i].Fd))
//其他的操作视为关闭
default:
CM.CloseConnection(int(events[i].Fd))
}
}
}
}
func (e *Epoller) Add(fd int) error {
//使用et模式 一次触发
return unix.EpollCtl(e.fd, syscall.EPOLL_CTL_ADD, fd, &unix.EpollEvent{Events: unix.EPOLLERR | unix.EPOLLHUP | unix.EPOLLET | unix.EPOLLRDHUP | unix.EPOLLPRI | unix.EPOLLIN, Fd: int32(fd)})
}
func (e *Epoller) Remove(fd int) error {
return unix.EpollCtl(e.fd, syscall.EPOLL_CTL_DEL, fd, nil)
}
func (conn *Connection) Read(data []byte) (n int, err error) {
n, err = syscall.Read(int(conn.ID), data)
if err != nil {
if errors.Is(err, syscall.EAGAIN) {
return 0, io.EOF
}
return 0, err
}
return n, nil
}
本作品采用《CC 协议》,转载必须注明作者和本文链接