buffalo 中使用消息队列(worker)

我们在web开发中常常会有一些需要异步处理的任务,比如给网站注册者发送邮件,发送短信认证码,甚至将一些日志记录到消息队列,用户上传的视频需要转码等等。这些共同的特点就是比较耗时,不需要给用户即时返回信息。

buffalo中叫后台任务。一般来说如果仅仅是耗时,任务量不大的情况下,倒也不一定需要消息队列。但是如果任务量比较大的情况下,单机未必能处理完,就需要到消息队列。消息队列的好处就是可以分布式部署,worker也是分布式的,而且也支持持久化。

下面我举一个用redis来充当消息队列服务器的例子。

首先,我们需要在app.go中,buffalo.New app的时候,通过buffalo.Options来指定worker。

        app = buffalo.New(buffalo.Options{
            Env:          ENV,
            SessionStore: sessions.Null{},
            PreWares: []buffalo.PreWare{
                cors.Default().Handler,
            },
            SessionName: "_coke_web_session",
            Worker:      newRedisWorker(),
        })

newRedisWorker 就是创建一个redis的worker.Adapter

// init redis worker from env
func newRedisWorker() *gwa.Adapter {
    return gwa.New(gwa.Options{
        Pool: &redis.Pool{
            MaxActive: 5,
            MaxIdle:   5,
            Wait:      true,
            Dial: func() (redis.Conn, error) {
                return redis.Dial(
                    envy.Get("REDIS_NETWORK", "tcp"),
                    envy.Get("REDIS_ADDRESS", ":6378"),
                    redis.DialPassword(envy.Get("REDIS_PASSWORD", "")),
                )
            },
        },
        Name:           envy.Get("APP_NAME", "appname"),
        MaxConcurrency: 25,
    })
}

当然这里MaxActive,MaxIdle等也是可以从环境变量中读取啊。

//.env
REDIS_SIZE=10
REDIS_NETWORK=tcp
REDIS_ADDRESS=:6379
REDIS_PASSWORD=
SESSION_SECRET=huantest123

接下来,我们需要创建任务的消费者,也就是buffalo中说的Registering a Worker Handler
我们在actions目录下建一个worker.go

func init() {
    w = App().Worker
    w.Register("send_email", func(args worker.Args) error {
        fmt.Println("---------------")
        fmt.Println("send_email started!")
        time.Sleep(5 * time.Second)
        fmt.Println(args)
        fmt.Println("send_email finished!")
        return nil
    })
}

这里面可以注册多个Handler。

最后就是如何创建一个生产者了。一方面,我们可以在具体action中创建,比如用户注册完成。

func UsersCreate(c buffalo.Context) error {
     ...
     w.Perform(worker.Job{
         Queue: "default",
         Handler: "send_email",
         Args: worker.Args{
             "user_id": 123,
         },
     })
      ...
}

在task中也可以生产,因为task和action不在一个目录中,需要现找到worker。

var _ = Desc("send_mails", "Task Description")
var _ = Add("send_mails", func(c *Context) error {

    var w worker.Worker

    w = actions.App().Worker

    w.Perform(worker.Job{
        Queue:   "default",
        Handler: "send_email",
        Args: worker.Args{
            "user_id": 123,
        },
    })
    return nil
})

完!

superwen
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

请勿发布不友善或者负能量的内容。与人为善,比聪明更重要!

社区文档: