go websocket推送优化

refer

www.cnblogs.com/chyingp/p/websocke...

github.com/eranyanay/1m-go-websock...

代码地址github.com/luxun9527/gpush

主要是两个方向的优化

1、写的时候定时批量写入。

2、读的时候使用epoll,避免每个连接都启动一个读的协程。

其他有对长连接读写的场景,也可以使用这种方法。

1、定时批量写数据

当需要高频写数据,且数据量比较小的情况下,如推送股票行情。可以定时批量写数据。

当连接数1w,高频写入数据。,使用top命令,可以看到cpu是满载的。使用pprof分析可以看到将数据写入到连接的那一步占用cpu比较高。

go tool pprof -http=:9091 192.168.2.159:8899/debug/pprof/prof...

得益于github.com/gobwas/ws 提供的非常灵活的用法,我们可以暂时将我们要写的数据转为”websocket数据包”缓存起来,直接使用tcp连接,定时批量发送。

package model

import (
    "bytes"
    gws "github.com/gobwas/ws"
    "github.com/gobwas/ws/wsflate"
)

type Message struct {
    messageType gws.OpCode
    data        []byte
}

func NewMessage(code gws.OpCode, data []byte) Message {
    return Message{
        messageType: code,
        data:        data,
    }
}
//将要写的数据转为websocket格式的数据。
func (m Message) ToBytes() ([]byte, error) {
    buf := bytes.NewBuffer(make([]byte, 0, 100))
    frame := gws.NewFrame(m.messageType, true, m.data)
    if err := gws.WriteFrame(buf, frame); err != nil {
        return nil, err
    }
    return buf.Bytes(), nil
}

对比测试1万连接

github.com/luxun9527/gpush/blob/ma...

//使用缓存,写缓存200 定时500毫秒
func (conn *Connection) WriteLoop() {
    defer func() {
        if err := recover(); err != nil {
            global.L.Debug("recover from read", zap.Any("err", err))
        }
        conn.Close()
    }()
    for {
        select {
        case data := <-conn.write:
            if _, err := conn.writeBuf.Write(data); err != nil {
                global.L.Debug("Write from read", zap.Any("err", err))
                return
            }
        case <-conn.writeRate.C:
            //如果关闭就返回
            if conn.isClosed {
                return
            }
            //心跳超时
            if conn.lastHeartbeat.Add(time.Millisecond * time.Duration(global.Config.Connection.TimeOut)).Before(time.Now()) {
                return
            }
            //写到连接中
            if conn.writeBuf.Available() > 0 {
                if err := conn.writeBuf.Flush(); err != nil {
                    return
                }
            }

        }
    }
}
//对比
2023/12/16 04:35:12 当前收到 334224159条 平均每毫秒收到 16612023/12/16 04:35:13 当前收到 339639047条 平均每毫秒收到 20812023/12/16 04:35:17 当前收到 341958389条 平均每毫秒收到 22762023/12/16 04:35:20 当前收到 350767911条 平均每毫秒收到 21192023/12/16 04:35:28 当前收到 371979124条 平均每毫秒收到 21792023/12/16 04:35:29 当前收到 374155711条 平均每毫秒收到 21742023/12/16 04:35:31 当前收到 376456748条 平均每毫秒收到 22962023/12/16 04:35:34 当前收到 381007068条 平均每毫秒收到 22392023/12/16 04:35:35 当前收到 387411405条 平均每毫秒收到 19662023/12/16 04:35:37 当前收到 389106853条 平均每毫秒收到 15932023/12/16 04:35:38 当前收到 391489681条 平均每毫秒收到 22842023/12/16 04:35:39 当前收到 393774240条 平均每毫秒收到 22842023/12/16 04:35:40 当前收到 396169192条 平均每毫秒收到 23942023/12/16 04:35:41 当前收到 398502117条 平均每毫秒收到 23302023/12/16 04:35:43 当前收到 400792789条 平均每毫秒收到 22902023/12/16 04:35:44 当前收到 406800009条 平均每毫秒收到 27452023/12/16 04:35:45 当前收到 408767853条 平均每毫秒收到 1958PID USER      PR  NI    VIRT    RES    SHR S  %CPU  %MEM     TIME+ COMMAND                                                                                                                                                                                                        
 147549 root      20   0 2264040 286356  16892 S 389.1   3.3  15:55.13 socket                                                                                                                                                                                                         
 147567 root      20   0 2283396 508252   5564 R 163.3   5.9   7:29.37 stress 

//不使用缓存,直接写到连接中
func (conn *Connection) WriteLoop() {
    defer func() {
        if err := recover(); err != nil {
            global.L.Debug("recover from read", zap.Any("err", err))
        }
        conn.Close()
    }()

    for data := range conn.write {
        if _, err := conn.Write(data); err != nil {
            return
        }
    }

}
//
2023/12/16 04:39:03 当前收到 121781703条 平均每毫秒收到 11662023/12/16 04:39:05 当前收到 126380058条 平均每毫秒收到 21152023/12/16 04:39:07 当前收到 130433719条 平均每毫秒收到 21552023/12/16 04:39:09 当前收到 134412628条 平均每毫秒收到 17092023/12/16 04:39:10 当前收到 137867897条 平均每毫秒收到 17202023/12/16 04:39:12 当前收到 139214716条 平均每毫秒收到 13342023/12/16 04:39:13 当前收到 143384829条 平均每毫秒收到 21152023/12/16 04:39:15 当前收到 145074988条 平均每毫秒收到 16902023/12/16 04:39:17 当前收到 148277250条 平均每毫秒收到 17052023/12/16 04:39:18 当前收到 151778877条 平均每毫秒收到 14242023/12/16 04:39:20 当前收到 153185029条 平均每毫秒收到 14042023/12/16 04:39:22 当前收到 156690213条 平均每毫秒收到 16902023/12/16 04:39:23 当前收到 159630744条 平均每毫秒收到 15252023/12/16 04:39:24 当前收到 161531340条 平均每毫秒收到 18722023/12/16 04:39:26 当前收到 162992849条 平均每毫秒收到 14522023/12/16 04:39:27 当前收到 166536469条 平均每毫秒收到 18682023/12/16 04:39:28 当前收到 168282233条 平均每毫秒收到 17442023/12/16 04:39:29 当前收到 169958205条 平均每毫秒收到 1646PID USER      PR  NI    VIRT    RES    SHR S  %CPU  %MEM     TIME+ COMMAND                                                                                                                                                                                                        
 147979 root      20   0 2257848 337668  16964 R 520.8   3.9  28:07.85 socket                                                                                                                                                                                                         
 148153 root      20   0 2379540 421912   5508 S 151.3   4.9   2:26.53 stress 

使用缓存,cpu的使用率相对更低同时推送的效率也更高。

2、使用epoll,避免每一个连接都启动一个读的协程。

在写多读少的情况下,不用每个连接启动一个读的协程。可以使用go提供的epoll相关的api,只有当连接有数据要被读才去处理。需要控制好读的协程的启动数量和当读取没有读出完整消息,需要自己实现对消息的缓存。

type Epoller struct {
    fd int
}

func NewEpoller() *Epoller {
    var err error
    fd, err := unix.EpollCreate1(0)
    if err != nil {
        global.L.Panic("init epoll failed", zap.Any("err", err))
    }
    epoller := &Epoller{fd: fd}
    go epoller.run()
    return epoller
}

func (e *Epoller) run() {
    defer syscall.Close(e.fd)
    for {
        events := make([]unix.EpollEvent, 1024)
        n, err := unix.EpollWait(e.fd, events, -1)
        if errors.Is(err, syscall.EINTR) {
            continue
        }
        if errors.Is(err, syscall.EAGAIN) {
            break
        }
        for i := 0; i < n; i++ {
            switch events[i].Events {
            //case unix.EPOLLIN | unix.EPOLLRDHUP:

            case unix.EPOLLIN:
                CM.NotifyRead(int(events[i].Fd))
                //其他的操作视为关闭
            default:
                CM.CloseConnection(int(events[i].Fd))
            }

        }

    }
}

func (e *Epoller) Add(fd int) error {
    //使用et模式 一次触发
    return unix.EpollCtl(e.fd, syscall.EPOLL_CTL_ADD, fd, &unix.EpollEvent{Events: unix.EPOLLERR | unix.EPOLLHUP | unix.EPOLLET | unix.EPOLLRDHUP | unix.EPOLLPRI | unix.EPOLLIN, Fd: int32(fd)})

}

func (e *Epoller) Remove(fd int) error {
    return unix.EpollCtl(e.fd, syscall.EPOLL_CTL_DEL, fd, nil)
}
func (conn *Connection) Read(data []byte) (n int, err error) {
    n, err = syscall.Read(int(conn.ID), data)
    if err != nil {
        if errors.Is(err, syscall.EAGAIN) {
            return 0, io.EOF
        }
        return 0, err
    }
    return n, nil
}
本作品采用《CC 协议》,转载必须注明作者和本文链接
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!