用 Golang 实现百万级 Websocket 服务

用Golang实现百万级Websocket服务

file

前言: 本文为国外大佬的一篇文章,因为我最近在研究和学习使用go写一个消息服务器,所以找到这篇文章,于是将它翻译过来,希望能够帮到其他的同学。这是我的处女作翻译作品。希望大神能够帮助指导修正。以后可能每周都会有一篇国外技术文章的翻译,有兴趣的同学可以加QQ群共同讨论(511247400)。

这篇文章我们讨论一下怎么使用go开发一个高负载的Websocket服务

如果你很熟悉websocket,但是不了解Go, 我希望你先掌握一些Go的知识

1.介绍

要定义我们的故事背景,应该说下我们为什么需要这样的一个服务。

我们有一个状态系统,用户邮箱存储就是其中之一,这里有一些方法去跟踪系统状态的变化和系统事件。大多是通过定期轮询或系统通知来改变状态。

两种方法各有利弊,但是提到email,用户越快接受到邮件越好。

邮件轮询涉及到大约 50000HTTP 请求每秒,60%返回了304状态, 这意味着邮箱里没有发生任何变化。

因此, 我们要减少服务器的负载去快速的发送email到用户,这让我决定重新创造一个轮子去写一个发布者订阅者服务,一方面它将接受到状态的通知,另一方面,也将这样的通知发布到其他用户。

之前
file

现在:
file

第一个方案展示的就我们之前说的第一个,浏览器定期轮询Api并且询问邮箱存储服务的改变。

第二个方案描述了一个新的架构,浏览器和通知API建立一个Websocket连接,这是服务的客户端。收到一个新的邮件的时候,服务器发送一个通知到发布者服务(1),发布者发布(2),api确定收到通知的连接,并发送到用户的浏览器(3)。

所以,今天我们来讨论这个API和Websocket服务,展望未来,我将告诉你这个服务将又有300万在线连接。

2. 通常解决方法

让我们看下如何在没有任何优化的情况下使用普通GO功能实现服务器的某些功能。

在我们继续使用 net/http之前, 我们来看下怎么发送和接受数据,在Websocket协议之上(例如:json 对象)我们称之为数据包。

让我们开始实现这个channel结构体,它将包含Websocket发送和接受数包的逻辑,通过Websocket连接。

2.1 channel struct

// Packet represents application level data.
type Packet struct {
    ...
}

// Channel wraps user connection.
type Channel struct {
    conn net.Conn    // WebSocket connection.
    send chan Packet // Outgoing packets queue.
}

func NewChannel(conn net.Conn) *Channel {
    c := &Channel{
        conn: conn,
        send: make(chan Packet, N),
    }

    go c.reader()
    go c.writer()

    return c
}

Websocket channel 实现

我想,你应该注意到两个读写的goroutines, 每个goroutines拥有一个堆内存,并且初始大小为2K到8K大小,这取决于操作系统和Go的版本。

关于上面我们提到的300万在线连接,我们就需要24GB的内存(按每个goroutine 4Kb 堆内存计算),而且没有为channel结构体分配内存。

2.2 I/O goroutines

让我们看看reader的实现

func (c *Channel) reader() {
    // We make a buffered read to reduce read syscalls.
    buf := bufio.NewReader(c.conn)

    for {
        pkt, _ := readPacket(buf)
        c.handle(pkt)
    }
}

这里我们使用bufio.Reader减少read()系统调用的次数,并读取buf缓冲区大小允许的数量, 在一个无限循环内,我们接受一个新的数据进来。 请记住:(等待新的数据到来)。我们稍后将返回它。

我们将在旁边解析和处理这些进来的数据包,因为她对我们讨论的优化并不重要。buf现在值得我们注意,默认情况下,它是4 KB,这意味着我们的连接还有12 GB的内存。writer情况于此类似。

func (c *Channel) writer() {
    // We make buffered write to reduce write syscalls. 
    buf := bufio.NewWriter(c.conn)

    for pkt := range c.send {
        _ := writePacket(buf, pkt)
        buf.Flush()
    }
}

我们遍历传出的数据包通道 c.send , 并将它们写入缓冲区,我们细心的读者已经猜到,我们的300万连接还有另外的4kb和12GB内存。

2.3 HTTP

我们已经有了一个简单的 channel 实现, 那么现在我们需要一个 websockt连接和它一起工作,因为我们还在通常的解决方案之下,所以我们按照响应的方式去做。

注意: 如果你不知道Websocket是怎么工作的, 应该提到切换到Websocket协议,意思是调用特殊的HTTP升级机制,成功处理升级请求后。服务器和客户端使用 TCP交换二进制Websocket帧,下面是一个连接里的帧结构体内容

import (
    "net/http"
    "some/websocket"
)

http.HandleFunc("/v1/ws", func(w http.ResponseWriter, r *http.Request) {
    conn, _ := websocket.Upgrade(r, w)
    ch := NewChannel(conn)
    //...
})

请注意 http.ResponseWriterbufio.Readerbufio.Writer进行内存分配(都带4KB的缓冲区),用户 *http.Request的初始化和进一步的响应写入。

无论使用什么Websocket 库, 成功响应升级请求之后,调用 responseWriter.Hijack()后, 服务器与TCP连接同时接收 I/O 缓存区。

提示 :在某些情况下,go:linkname可用于通过调用 net / http.putBufio {Reader,Writer} 将缓冲区返回到net / http内的sync.Pool。

从而,我们300万的在线连接又需要额外24GB的内存, 所以,我们的应用程序就需要72GB内存,并且什么也没做。

3. 优化

让我们回顾一下我们在介绍部分中讨论的内容,并记住用户连接的行为方式,切换到 Websocket 之后,客户端发送数据包和相应的时间或者换句话说事件订阅,然后(不考虑 ping/pong 的技术细节),客户端可能在连接的生命周期内没有发送任何数据

连接的生命周期可能是几秒到几天

因此,我们的 Channel.reader()Channel.writer()大部分时间都是在等待接收或发送数据的处理,每个单独的等待都有4KB的缓存区

现在我们已经清楚哪些能够做的更好,哪些不能。

3.1. Netpoll

你还记得 Channel.reader() 的实现是:等待新的数据进入,通过锁定 bufio.Reader.Read() 内的conn.Read() 调用? 如果连接中有数据,Go 运行时会唤醒 goroutine 去允许读取下一条数据包,之后,这个goroutine 再次被锁定,并等待新的数据。让我们看看Go运行时是怎么知道 goroutine必须被唤醒。

如果我们查看 conn.Read()实现,我们将看到 net.netFD.Read()被调用

func (fd *netFD) Read(p []byte) (n int, err error) {
    //...
    for {
        n, err = syscall.Read(fd.sysfd, p)
        if err != nil {
            n = 0
            if err == syscall.EAGAIN {
                if err = fd.pd.waitRead(); err == nil {
                    continue
                }
            }
        }
        //...
        break
    }
    //...
}

Go 使用非阻塞模式,“ EAGAIN ” 说没有数据在socket连接里并且不要锁定读取空的Websocket连接。OS将控制权返回给我们

我们从连接文件描述符中看到一个read()系统调用。如果 read 返回 " EAGAIN " 错误,则运行时调用pollDesc.waitRead()

// net/fd_poll_runtime.go

func (pd *pollDesc) waitRead() error {
   return pd.wait('r')
}

func (pd *pollDesc) wait(mode int) error {
   res := runtime_pollWait(pd.runtimeCtx, mode)
   //...
}

如果我们深挖,我们将看到 netpoll 是使用 Linux 中的 epollBSD 中的 kqueue 实现的,为什么不对我们的连接使用相同的方法 ?我们可以分配一个读缓冲区,并在真正需要时启动读取 goroutine, 当socket中有真正可读的数据时。

github.com/golang/go 这里, 存在导出 netpoll 功能的问题。

3.2 摆脱goroutines

假设我们有Go的netpoll实现,现在我们可以避免使用内部缓冲区启动 Channel.reader()goroutine

并订阅连接中可读数据的事件

ch := NewChannel(conn)

// Make conn to be observed by netpoll instance.
poller.Start(conn, netpoll.EventRead, func() {
    // We spawn goroutine here to prevent poller wait loop
    // to become locked during receiving packet from ch.
    go Receive(ch)
})

// Receive reads a packet from conn and handles it somehow.
func (ch *Channel) Receive() {
    buf := bufio.NewReader(ch.conn)
    pkt := readPacket(buf)
    c.handle(pkt)
}

Channel.writer()更容易, 因为我们只能在发送数据包时运行goroutine并分配缓冲区:

func (ch *Channel) Send(p Packet) {
    if c.noWriterYet() {
        go ch.writer()
    }
    ch.send <- p
}

请注意,我没有处理操作系统在write()系统调用上返回 EAGAIN的情况。这种情况,我们依靠Go运行时,因为这种服务实际上很少见。 然而,如果你需要,它可以以相同的方式处理。

ch.send读出数据包之后,writer将完成它,并释放 goroutine堆栈和发送缓存区。

完美,我们保存了48GB的内存,摆脱了两个连续运行的goroutine内堆栈 I/O 缓冲区。

3.3 控制资源

大量的连接并不只是涉及高内存消耗,当开发服务时,我们经历了反复的竞争条件和死锁之后通常会出现所谓的自我DDoS--应用程序客户端试图连接到服务器从而进一步破坏它的情况。

例如:如果出于什么原因导致我们无法处理 ping/pong 消息,但空闲的处理程序继续关闭这样的连接(假如这个链接端口,因此没有提供数据)。客户端似乎每N秒去尝试充实连接而不是等待事件。

如果锁定或重载的服务器刚刚停止接受新连接,并且它之前的平衡器(例如,nginx)将请求传递给下一个服务器实例,那将是很好的。

此外,服务器负载如何,如果所有客户突然想以任何理由向我们发送数据包,先前保存的48 GB将再次使用,我们实际上会回到每个连接的goroutine和缓冲区的初始状态。

goroutine 池

我们可以使用goroutine池限制同时处理的数据包数量, 下面是goroutine 池的基础实现:

package gopool

func New(size int) *Pool {
    return &Pool{
        work: make(chan func()),
        sem:  make(chan struct{}, size),
    }
}

func (p *Pool) Schedule(task func()) error {
    select {
    case p.work <- task:
    case p.sem <- struct{}{}:
        go p.worker(task)
    }
}

func (p *Pool) worker(task func()) {
    defer func() { <-p.sem }
    for {
        task()
        task = <-p.work
    }
}

现在我们的netpoll代码如下:

pool := gopool.New(128)

poller.Start(conn, netpoll.EventRead, func() {
    // We will block poller wait loop when
    // all pool workers are busy.
    pool.Schedule(func() {
        Receive(ch)
    })
})

所以现在我们不仅在socket中可读数据出现时读取数据包,而且也是第一次有机会接受空闲的goroutine在池中。

同样,我们将更改 Send():

pool := gopool.New(128)

func (ch *Channel) Send(p Packet) {
    if c.noWriterYet() {
        pool.Schedule(ch.writer)
    }
    ch.send <- p
}

代替 go ch.writer(), 我们想写对一个有N个goroutines池, 在其中一个重复使用的goroutines。我们可以保证同时处理N个请求并且到达的N + 1我们将不会分配N + 1个缓冲区进行读取,goroutine池还允许我们限制新连接的 Accept()Upgrade()以及避免大多数DDoS情况。

3.4 零拷贝升级

让我们稍微偏离 Websocket 协议,正如我们提到的那样,客户端使用http请求升级切换到Websocket协议。如下所示:

GET /ws HTTP/1.1
Host: mail.ru
Connection: Upgrade
Sec-Websocket-Key: A3xNe7sEB9HixkmBhVrYaA==
Sec-Websocket-Version: 13
Upgrade: websocket

HTTP/1.1 101 Switching Protocols
Connection: Upgrade
Sec-Websocket-Accept: ksu0wXWG+YmkVx+KQR2agP0cQn4=
Upgrade: websocket

就是说,在我们的例子中,我们需要http请求并且在他的头部只需要切换到Websocket 协议, 这些知识以及http.Request中存储的内容提示,为了优化,在处理HTTP请求并放弃标准的net / http服务器时,我们可能会拒绝不必要的分配和复制。

例如: 这个 http.Request 包含一个具相同名称标头类型的字段,通过将数据从连接复制到值字符串,无条件地填充所有请求标头,想象一下,在该字段内可以保留多少额外数据,例如对于大型Cookie标头。

但是该怎么返回呢?

WebSocket 实现

我们的服务器优化时存在的所有库都允许我们仅为标准的net / http服务器进行升级, 而且,(两个)库都不能使用所有上述读写优化,使这些优化工作,我们必须有一个相当低级的API来处理WebSocket。重用缓冲区,我们需要procotol函数看起来像这样:

func ReadFrame(io.Reader) (Frame, error)
func WriteFrame(io.Writer, Frame) error

如果我们有一个带有这种API的库,我们可以按如下方式从连接中读取数据包(数据包写入看起来一样):

// getReadBuf, putReadBuf are intended to 
// reuse *bufio.Reader (with sync.Pool for example).
func getReadBuf(io.Reader) *bufio.Reader
func putReadBuf(*bufio.Reader)

// readPacket must be called when data could be read from conn.
func readPacket(conn io.Reader) error {
    buf := getReadBuf()
    defer putReadBuf(buf)

    buf.Reset(conn)
    frame, _ := ReadFrame(buf)
    parsePacket(frame.Payload)
    //...
}

简而言之,是时候建立自己的库了。

github.com/gobwas/ws

编写ws库是为了不对用户强加其协议操作逻辑,所有的读取和写入方法都接收标准的 io.Readerio.Writer接口, 这使得可以使用或不使用缓冲或任何其他 I / O 包装。

除了从标准 net/http 升级,ws支持零拷贝升级, 处理升级请求并切换到WebSocket而不进行内存分配或复制。ws.Upgrade()接受 io.ReadWriter (net.Conn实现这个接口), 换句话说,我们可以标准的net.Listen 并将接收到的连接从 ln.Accept()立即转移到 ws.Upgrade(). 该库可以复制任何请求数据,以备将来在应用程序中使用(例如: CookieSession 验证)

下面是升级请求处理的基准:标准net / http服务器与net.Listen()以及零拷贝升级:

BenchmarkUpgradeHTTP    5156 ns/op    8576 B/op    9 allocs/op
BenchmarkUpgradeTCP     973 ns/op     0 B/op       0 allocs/op

切换到 ws 和零拷贝升级保存了我们另外24GB的内存,在 net / http 处理程序处理请求时为I / O缓冲区分配的空间.

3.5 概要

让我们构建一下我告诉你的优化

  • 内部有缓冲区的读取 goroutine 很昂贵。解决方案:netpoll(epoll,kqueue);重用缓冲区。

  • 内部有缓冲区的写入 goroutine 很昂贵。 解决方案:必要时启动goroutine;重用缓冲区。

  • 随着大量的连接,netpoll不起作用。解决方案:重复使用goroutines并限制其数量

  • net / http不是处理升级到WebSocket的最快方法。解决方案:在裸TCP连接上使用零拷贝升级。

这就是服务器代码:

import (
    "net"
    "github.com/gobwas/ws"
)

ln, _ := net.Listen("tcp", ":8080")

for {
    // Try to accept incoming connection inside free pool worker.
    // If there no free workers for 1ms, do not accept anything and try later.
    // This will help us to prevent many self-ddos or out of resource limit cases.
    err := pool.ScheduleTimeout(time.Millisecond, func() {
        conn := ln.Accept()
        _ = ws.Upgrade(conn)

        // Wrap WebSocket connection with our Channel struct.
        // This will help us to handle/send our app's packets.
        ch := NewChannel(conn)

        // Wait for incoming bytes from connection.
        poller.Start(conn, netpoll.EventRead, func() {
            // Do not cross the resource limits.
            pool.Schedule(func() {
                // Read and handle incoming packet(s).
                ch.Recevie()
            })
        })
    })
    if err != nil {   
        time.Sleep(time.Millisecond)
    }
}

4. 总结

过早优化是编程中所有邪恶(或至少大部分)的根源。唐纳德克努特

当然,上述优化是相对应的,并非在所有情况下。例如,如果自由资源(内存,CPU)与在线连接数之间的比率相当高,则优化可能没有意义。但是,通过了解改进的位置和内容,您可以从中受益匪浅。

感谢您的关注!

5. 参考

原文链接:https://medium.freecodecamp.org/million-we...

本作品采用《CC 协议》,转载必须注明作者和本文链接
本帖由 zhaocrazy 于 1年前 加精
讨论数量: 2

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!