buffalo 中使用消息队列(worker)
我们在web开发中常常会有一些需要异步处理的任务,比如给网站注册者发送邮件,发送短信认证码,甚至将一些日志记录到消息队列,用户上传的视频需要转码等等。这些共同的特点就是比较耗时,不需要给用户即时返回信息。
buffalo中叫后台任务。一般来说如果仅仅是耗时,任务量不大的情况下,倒也不一定需要消息队列。但是如果任务量比较大的情况下,单机未必能处理完,就需要到消息队列。消息队列的好处就是可以分布式部署,worker也是分布式的,而且也支持持久化。
下面我举一个用redis来充当消息队列服务器的例子。
首先,我们需要在app.go
中,buffalo.New
app的时候,通过buffalo.Options
来指定worker。
app = buffalo.New(buffalo.Options{
Env: ENV,
SessionStore: sessions.Null{},
PreWares: []buffalo.PreWare{
cors.Default().Handler,
},
SessionName: "_coke_web_session",
Worker: newRedisWorker(),
})
newRedisWorker
就是创建一个redis的worker.Adapter
// init redis worker from env
func newRedisWorker() *gwa.Adapter {
return gwa.New(gwa.Options{
Pool: &redis.Pool{
MaxActive: 5,
MaxIdle: 5,
Wait: true,
Dial: func() (redis.Conn, error) {
return redis.Dial(
envy.Get("REDIS_NETWORK", "tcp"),
envy.Get("REDIS_ADDRESS", ":6378"),
redis.DialPassword(envy.Get("REDIS_PASSWORD", "")),
)
},
},
Name: envy.Get("APP_NAME", "appname"),
MaxConcurrency: 25,
})
}
当然这里MaxActive
,MaxIdle
等也是可以从环境变量中读取啊。
//.env
REDIS_SIZE=10
REDIS_NETWORK=tcp
REDIS_ADDRESS=:6379
REDIS_PASSWORD=
SESSION_SECRET=huantest123
接下来,我们需要创建任务的消费者,也就是buffalo中说的Registering a Worker Handler
。
我们在actions目录下建一个worker.go
func init() {
w = App().Worker
w.Register("send_email", func(args worker.Args) error {
fmt.Println("---------------")
fmt.Println("send_email started!")
time.Sleep(5 * time.Second)
fmt.Println(args)
fmt.Println("send_email finished!")
return nil
})
}
这里面可以注册多个Handler。
最后就是如何创建一个生产者了。一方面,我们可以在具体action中创建,比如用户注册完成。
func UsersCreate(c buffalo.Context) error {
...
w.Perform(worker.Job{
Queue: "default",
Handler: "send_email",
Args: worker.Args{
"user_id": 123,
},
})
...
}
在task中也可以生产,因为task和action不在一个目录中,需要现找到worker。
var _ = Desc("send_mails", "Task Description")
var _ = Add("send_mails", func(c *Context) error {
var w worker.Worker
w = actions.App().Worker
w.Perform(worker.Job{
Queue: "default",
Handler: "send_email",
Args: worker.Args{
"user_id": 123,
},
})
return nil
})
完!