使用 Go 语言每分钟处理 1 百万请求

Malwarebytes 我们经历了显著的增长,一年前加入了硅谷的公司,一个主要的职责是架构和开发一些系统来支持一个快速增长的信息安全公司,这是一个每天百万用户使用的产品。我在反病毒和反恶意软件行业的不同公司工作了 12 年,从而我知道由于我们每天处理大量的数据,这些系统是多么复杂。
有趣的是,在过去的大约 9 年间,我参与的所有的 web 后端的开发通常是通过 Ruby on Rails 技术实现的。不要错怪我。我喜欢 Ruby on Rails,并且我相信它是个令人愉悦的环境。但是一段时间后,你会开始以 ruby 的方式开始思考和设计系统,你会忘记,如果你可以利用多线程、并行、快速执行和小内存开销,软件架构本来应该是多么高效和简单。很多年期间,我是一个 c/c++、Delphi 和 c# 开发者,我刚开始意识到使用正确的工具可以把复杂的事情变得简单些。

作为首席架构师,我不会很关心在互联网上的语言和框架战争。我相信效率、生产力。代码可维护性主要依赖于你如何把解决方案设计得很简单。

问题#

在我们的匿名遥测分析系统模块中,我们的目标是能够处理来自数百万端点的 POST 请求。web 处理器会收到一个包含了许多需要写入 Amazon S3 负载信息的 JSON 文件,然后我们的 map-reduce 系统会进行后续的数据处理。

传统做法就是我们考虑搞一个工作层架构,使用类似如下的这些:

  • Sidekiq
  • Resque
  • DelayedJob
  • Elasticbeanstalk Worker Tier
  • RabbitMQ
  • 等等……

然后我们建两个不同的集群,一个用来处理前端请求,另外一个用于处理其他东西,这样我们就可以扩大我们的后端处理能力了。

从一开始,我们的团队就知道应该使用 Go,因为在讨论阶段我们就预见到了这个将是一个非常庞大的传输系统。我使用 Go 已经 2 年了,我们也开发过一些系统,但是从未遇到过如此大体量的负载的挑战。

我们先定义了几个结构体来接收 POST 方式发过来的的负载请求,然后用一个方法来把它上传到我们的 S3 存储块中。

type PayloadCollection struct {
    WindowsVersion  string    `json:"version"`
    Token           string    `json:"token"`
    Payloads        []Payload `json:"data"`
}

type Payload struct {
    // [编写]
}

func (p *Payload) UploadToS3() error {
    //  storageFolder 方法避免时间戳相同时产生的同名 key 冲突 
    storage_path := fmt.Sprintf("%v/%v", p.storageFolder, time.Now().UnixNano())

    bucket := S3Bucket

    b := new(bytes.Buffer)
    encodeErr := json.NewEncoder(b).Encode(payload)
    if encodeErr != nil {
        return encodeErr
    }

    // 我们传到 S3 存储块中的所有东西都应该标记为 'private'
    var acl = s3.Private
    var contentType = "application/octet-stream"

    return bucket.PutReader(storage_path, b, int64(b.Len()), contentType, acl, s3.Options{})
}

幼稚的协程方式#

早期的时候我们实现了一个非常简陋的 POST 处理器,试图将任务处理并行化到一个简单的 goroutine 中:

func payloadHandler(w http.ResponseWriter, r *http.Request) {

    if r.Method != "POST" {
        w.WriteHeader(http.StatusMethodNotAllowed)
        return
    }

    // 将包体读入到一个字符串进行 json 解析
    var content = &PayloadCollection{}
    err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
    if err != nil {
        w.Header().Set("Content-Type", "application/json; charset=UTF-8")
        w.WriteHeader(http.StatusBadRequest)
        return
    }

    // 遍历每个负载,将其条目逐一队列化,以期把它们发到 S3
    for _, payload := range content.Payloads {
        go payload.UploadToS3()   // <----- 千万别这么干
    }

    w.WriteHeader(http.StatusOK)
}

中等负载情况下,应对大部分人没有问题,但是大规模请求时就不行了。我们预期会有不少请求,但是当把第一版的代码部署到生产环境的时候,我们看到了洪水猛兽。我们小觑了流量的体量。

以上的办法在多方面都是行不通的。我们无法控制 goroutine 的产生数量。当我们面对每分钟 1 百万的 POST 请求时,上述代码很快就跪了。

屡败屡战#

我们得另辟蹊径。讨论之初,我们的着眼点就放在如何缩短请求接受器生命周期,将处理程序置于后台运行。当然,在 Ruby on Rails 的世界中,同样无可避免要做这些事。无论你是用 puma,unicorn,passenger (这里就不扯 JRuby 了),否则你就会阻塞所有可用的 web 工作进程。那么我们还是需要用常规方案来解决这件事,诸如 Resque, Sidekiq, SQS 之类的方案不胜枚举。

所以在第二次迭代的时候,我们创建了一个缓冲 channel,用以队列化一些任务并将之上传到 S3,如此一来,我们就可以控制队列容量,并且有足够的内存来进行队列化了, 我们认为在 channel 队列中缓存任务就可以了。

var Queue chan Payload

func init() {
    Queue = make(chan Payload, MAX_QUEUE)
}

func payloadHandler(w http.ResponseWriter, r *http.Request) {
    ...
    // 遍历每个有效载荷,逐个排队之后上传 S3
    for _, payload := range content.Payloads {
        Queue <- payload
    }
    ...
}

接下来将任务出列并处理它们,我们使用类似如下做法:

func StartProcessor() {
    for {
        select {
        case job := <-Queue:
            job.payload.UploadToS3()  // <-- 仍不够好
        }
    }
}

说实话,我对我们考量的方案心里也没底。这将是一个不眠之夜。这套方案没有给我们带来什么实际收益, 我们只是暂时用缓冲队列置换了有缺陷的并发,简单地把问题推迟而已。我们的同步处理器一次只能上传一个载荷到 S3,由于请求速率远远大于我们的单个处理器上传能力,缓冲 channel 很快到达极限,阻塞了请求处理队列。

我们只是简单地逃避了这个问题,最终还是为我们的系统敲响了丧钟。
在我们部署了这个有缺陷的版本后,系统的延迟率以固定速率不断攀升。

file

更好的方案#

在使用 Go channel 时,我们决定使用一种常规模式,用以创建一个双层 channel 系统,一个用来对任务进行排队,另一个控制任务队列中同时进行任务处理 worker 的数量。

这个办法的原理就是将上传到 S3 的并行化速度保持在一个可持续的范围中,既不会让机器瘫痪,也不会产生 S3 连接错误。所以我们选择创建一个 Job/Worker 模式。对于熟悉 Java,C# 等之类语言的开发者来说,可以将其视为 Golang 使用 channel 实现的类似 Worker 线程池。

var (
    MaxWorker = os.Getenv("MAX_WORKERS")
    MaxQueue  = os.Getenv("MAX_QUEUE")
)

// Job 指代要运行的任务
type Job struct {
    Payload Payload
}

// 一个可以发送工作请求的缓冲 channel
var JobQueue chan Job

// Worker 表示执行任务的 worker
type Worker struct {
    WorkerPool  chan chan Job
    JobChannel  chan Job
    quit        chan bool
}

func NewWorker(workerPool chan chan Job) Worker {
    return Worker{
        WorkerPool: workerPool,
        JobChannel: make(chan Job),
        quit:       make(chan bool)}
}

// Start 方法开启一个 worker 循环,监听退出 channel,可按需停止这个循环
func (w Worker) Start() {
    go func() {
        for {
            // 将当前的 worker 注册到 worker 队列中
            w.WorkerPool <- w.JobChannel

            select {
            case job := <-w.JobChannel:
                // 此时我们接收到一个工作请求
                if err := job.Payload.UploadToS3(); err != nil {
                    log.Errorf("Error uploading to S3: %s", err.Error())
                }

            case <-w.quit:
                // 此处接收一个停止信号
                return
            }
        }
    }()
}

// Stop 方法控制 worker 停止监听工作请求
func (w Worker) Stop() {
    go func() {
        w.quit <- true
    }()
}

我们修改了 Web 请求处理器,使之能够创建一个携带载荷信息的 Job 实例, 然后把它发到 JobQueue channel 中供 worker 消费。

func payloadHandler(w http.ResponseWriter, r *http.Request) {

    if r.Method != "POST" {
        w.WriteHeader(http.StatusMethodNotAllowed)
        return
    }

    // 把包体读入到一个字符串中,进行 json 解析
    var content = &PayloadCollection{}
    err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
    if err != nil {
        w.Header().Set("Content-Type", "application/json; charset=UTF-8")
        w.WriteHeader(http.StatusBadRequest)
        return
    }

    // 遍历载荷,逐个队列化用以上传到 S3
    for _, payload := range content.Payloads {

        // 创建一个带有载荷的任务( Job 实例 )
        work := Job{Payload: payload}

        // 然后把它放到队列中
        JobQueue <- work
    }

    w.WriteHeader(http.StatusOK)
}

在 web 服务器初始化的时候,我们创建一个 Dispatcher 并调用其 Run() 方法来创建 worker 池,并开始监听 JobQueue 队列中的任务。

dispatcher := NewDispatcher(MaxWorker)
dispatcher.Run()

接下来的代码就是我们的 dispatcher 实现:

type Dispatcher struct {
    // 注册到 dispatcher 的 worker channel 池
    WorkerPool chan chan Job
}

func NewDispatcher(maxWorkers int) *Dispatcher {
    pool := make(chan chan Job, maxWorkers)
    return &Dispatcher{WorkerPool: pool}
}

func (d *Dispatcher) Run() {
    // 开始运行 n 个 worker
    for i := 0; i < d.maxWorkers; i++ {
        worker := NewWorker(d.pool)
        worker.Start()
    }

    go d.dispatch()
}

func (d *Dispatcher) dispatch() {
    for {
        select {
        case job := <-JobQueue:
            // 收到任务请求
            go func(job Job) {
                // 尝试获取一个可用的 worker job channel
                // 阻塞直到有可用的 worker
                jobChannel := <-d.WorkerPool

                // 分发任务到 worker job channel 中
                jobChannel <- job
            }(job)
        }
    }
}

注意,我们规定了可实例化的 worker 最大数量,并把它放到我们的 worker 池中。 因为我们在此项目中使用了 Amazon Elasticbeanstalk 中 docker 化的 Go 环境,我们遵循 12 - 因子 规约在生产环境中配置系统,从环境变量中读取这些配置值。这样一来,我们就可以控制 worker 的数量和任务队列的的最大容量,在无需重新部署整个集群的情况下,我们也可以灵活调整这些参数。

var (
    MaxWorker = os.Getenv("MAX_WORKERS")
    MaxQueue  = os.Getenv("MAX_QUEUE")
)

立竿见影#

在我们部署这套新代码之后,立即就能看到所有的延迟率都降到了一个微不足道的水平,同时我们的请求处理能力突飞猛进。

file

在我们的弹性负载均衡系统经过几分钟完全热身之后,我们的 ElasticBeanstalk 应用每分钟可处理 1 百万左右的的请求。通常在早上的几个小时中,我们的流量会高达每分钟 1 百多万。

我们部署新代码之后,工作服务器数量从 100 大幅降到了 20。

file

在正确恰当地配置集群和弹性伸缩设置之后,我们将服务器实例从 4x EC 降到 c4.Large 水平,如果 CPU 占用率连续 5 分钟超过 90%,弹性伸缩设置会新建一个实例用来增援。

file

结论#

以我的经验来看,最简单的方案永远是最好的。我们可以设计一个复杂的系统,使用了大量的队列、无数后台 Worker 以及复杂的部署方案。但我们最终还是选择了合理利用 Elasticbeanstalk 自动伸缩的高负载方案,再加上 Golang 并发编程,简单、高效。

并非每天你就只能守着一个,只有四台机器组成的,每分钟处理一百万次写入 Amazon S3 存储桶的 POST 请求的能力连我的 MacBook Pro 笔记本都不如的这种集群。

总有合适的工具去解决这个问题。有时候在 Ruby on Rails 系统中你需要一个很强大的 web 处理器时,可以考虑从 Ruby 生态圈之外寻找更简单但是更强大的替代方案。

本文中的所有译文仅用于学习和交流目的,转载请务必注明文章译者、出处、和本文链接
我们的翻译工作遵照 CC 协议,如果我们的工作有侵犯到您的权益,请及时联系我们。

原文地址:http://marcio.io/2015/07/handling-1-mill...

译文地址:https://learnku.com/go/t/23456/using-the...

本文为协同翻译文章,如您发现瑕疵请点击「改进」按钮提交优化建议
讨论数量: 3

很赞。实战操作操作,

6年前 评论
绝缘体菜狗

可以继续优化一下,S3 服务的接口加上可以批量接收 json 包的功能,每个 worker 够 10 个 json 包提交一次到 S3 服务器,这样就能变成每分钟处理一千万请求了,就是这样不知道怎么让 worker 线程间不抢夺队列中的 json 包,而确保已拿到包的 worker 线程能优先拿够 10 个包。

6年前 评论

// 然后把它放到队列中 JobQueue <- work
这个地方如果 JobQueue 满了,请求的连接就会全部阻塞在这里,文件打开数一下子就满了?

2年前 评论