Asynq 实现 Go 后台作业异步定时任务处理 (7/11 更新)
在本教程中,我们将编写两个程序client
和workers
。
-client.go
将创建和安排任务,由后台工作程序异步处理。
-workers.go
将启动多个并发工作器来处理客户端创建的任务。
本教程假定您正在localhost:6379
上运行Redis服务器。
在开始之前,请确保您已安装并运行Redis。
让我们从创建两个主要文件开始。
mkdir quickstart && cd quickstart
go mod init asynq-quickstart
mkdir client workers
touch client/client.go workers/workers.go
并安装asynq
软件包。
go get -u github.com/hibiken/asynq
在开始编写代码之前,让我们回顾一下将在两个程序中使用的一些核心类型。
Redis连接选项
Asynq使用Redis作为消息代理。client.go
和workers.go
都需要连接到Redis进行写入和读取。
我们将使用RedisClientOpt
指定如何连接到本地Redis实例。
var redis = asynq.RedisClientOpt{
Addr: "localhost:6379",
// Omit if no password is required
Password: "mypassword",
// Use a dedicated db number for asynq.
// By default, Redis offers 16 databases (0..15)
DB: 0,
}
Tasks任务
在asynq
中,工作单元被封装为Task
类型。
其中有两个字段:“类型”和“有效载荷”。
// Task represents a task to be performed.
type Task struct {
// Type indicates the type of a task to be performed.
Type string
// Payload holds data needed to perform the task.
Payload Payload
}
Type
是一个简单的字符串值,指示给定任务的类型。Payload
保存执行任务所需的数据,您可以将其视为map[string]interface{}
。需要注意的重要一件事是有效负载值必须是可序列化的。
现在,我们已经研究了核心类型,下一步开始编写程序。
Client Program (客户程序)
在client.go
中,我们将创建一些任务,并使用asynq.Client
将它们加入队列。
要创建任务,请使用NewTask
函数,并为任务传递类型和有效负载。
asynq.Client
支持三种调度任务的方法:Enqueue
,EnqueueIn
和EnqueueAt
。
使用client.Enqueue
将任务立即加入队列。
使用client.EnqueueIn
或client.EnqueueAt
来安排将来要处理的任务。
// client.go
func main() {
r := asynq.RedisClientOpt{Addr: "localhost:6379"}
client := asynq.NewClient(r)
// Create a task with typename and payload.
t1 := asynq.NewTask(
"email:welcome",
map[string]interface{}{"user_id": 42})
t2 := asynq.NewTask(
"email:reminder",
map[string]interface{}{"user_id": 42})
// Process the task immediately.
res, err := client.Enqueue(t1)
if err != nil {
log.Fatal(err)
}
fmt.Printf("result: %+v\n", res)
// Process the task 24 hours later.
res, err = client.EnqueueIn(24*time.Hour, t2)
if err != nil {
log.Fatal(err)
}
fmt.Printf("result: %+v\n", res)
}
这就是我们需要的客户端程序。
Workers Program
在workers.go
中,我们将创建一个asynq.Server
实例来启动worker。
NewServer
函数采用RedisConnOpt
和Config
。
Config
用于调整服务器的任务处理行为。
您可以查看Config
上的文档以查看所有可用的config选项。
为简单起见,在此示例中,我们仅指定并发。
// workers.go
func main() {
r := asynq.RedisClientOpt{Addr: "localhost:6379"}
srv := asynq.NewServer(r, asynq.Config{
Concurrency: 10,
})
// NOTE: We'll cover what this `handler` is in the section below.
if err := srv.Run(handler); err != nil {
log.Fatal(err)
}
}
(* Server).Run
的参数是一个接口asynq.Handler
,它具有一个方法ProcessTask
。
// ProcessTask should return nil if the task was processed successfully.
//
// If ProcessTask returns a non-nil error or panics, the task will be retried again later.
type Handler interface {
ProcessTask(context.Context, *Task) error
}
实现处理程序的最简单方法是定义一个具有相同签名的函数,并将其传递给Run
时使用asynq.HandlerFunc
适配器类型。
func handler(ctx context.Context, t *asynq.Task) error {
switch t.Type {
case "email:welcome":
id, err := t.Payload.GetInt("user_id")
if err != nil {
return err
}
fmt.Printf("Send Welcome Email to User %d\n", id)
case "email:reminder":
id, err := t.Payload.GetInt("user_id")
if err != nil {
return err
}
fmt.Printf("Send Reminder Email to User %d\n", id)
default:
return fmt.Errorf("unexpected task type: %s", t.Type)
}
return nil
}
func main() {
r := asynq.RedisClientOpt{Addr: "localhost:6379"}
srv := asynq.NewServer(r, asynq.Config{
Concurrency: 10,
})
// Use asynq.HandlerFunc adapter for a handler function
if err := srv.Run(asynq.HandlerFunc(handler)); err != nil {
log.Fatal(err)
}
}
我们可以继续向此处理函数添加案例,但是在实际应用中,在单独的函数中为每种案例定义逻辑很方便。
为了重构我们的代码,让我们使用ServeMux
创建我们的处理程序。
就像来自"net/http"
包中的ServeMux
一样,您可以通过调用Handle
或HandleFunc
来注册处理程序。 ServeMux
满足Handler
接口,因此您可以将其传递给(*Server).Run
。
// workers.go
func main() {
r := asynq.RedisClientOpt{Addr: "localhost:6379"}
srv := asynq.NewServer(r, asynq.Config{
Concurrency: 10,
})
mux := asynq.NewServeMux()
mux.HandleFunc("email:welcome", sendWelcomeEmail)
mux.HandleFunc("email:reminder", sendReminderEmail)
if err := srv.Run(mux); err != nil {
log.Fatal(err)
}
}
func sendWelcomeEmail(ctx context.Context, t *asynq.Task) error {
id, err := t.Payload.GetInt("user_id")
if err != nil {
return err
}
fmt.Printf("Send Welcome Email to User %d\n", id)
return nil
}
func sendReminderEmail(ctx context.Context, t *asynq.Task) error {
id, err := t.Payload.GetInt("user_id")
if err != nil {
return err
}
fmt.Printf("Send Reminder Email to User %d\n", id)
return nil
}
因为我们已经提取了处理每种任务类型的函数,所以代码看起来会更有条理。
但是,代码有点含糊不清,对于任务类型和有效负载密钥,我们具有这些字符串值,应将它们封装在一个内聚的程序包中。 让我们通过编写一个封装任务创建和处理的包来重构代码。 我们将简单地创建一个名为tasks
的软件包。
mkdir tasks && touch tasks/tasks.go
package tasks
import (
"context"
"fmt"
"github.com/hibiken/asynq"
)
// A list of task types.
const (
WelcomeEmail = "email:welcome"
ReminderEmail = "email:reminder"
)
func NewWelcomeEmailTask(id int) *asynq.Task {
payload := map[string]interface{}{"user_id": id}
return asynq.NewTask(WelcomeEmail, payload)
}
func NewReminderEmailTask(id int) *asynq.Task {
payload := map[string]interface{}{"user_id": id}
return asynq.NewTask(ReminderEmail, payload)
}
func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
id, err := t.Payload.GetInt("user_id")
if err != nil {
return err
}
fmt.Printf("Send Welcome Email to User %d\n", id)
return nil
}
func HandleReminderEmailTask(ctx context.Context, t *asynq.Task) error {
id, err := t.Payload.GetInt("user_id")
if err != nil {
return err
}
fmt.Printf("Send Reminder Email to User %d\n", id)
return nil
}
现在,我们可以将此包导入到client.go和workers.go中。
// client.go
func main() {
r := asynq.RedisClientOpt{Addr: "localhost:6379"}
client := asynq.NewClient(r)
t1 := tasks.NewWelcomeEmailTask(42)
t2 := tasks.NewReminderEmailTask(42)
// Process the task immediately.
res, err := client.Enqueue(t1)
if err != nil {
log.Fatal(err)
}
fmt.Printf("result: %+v\n", res)
// Process the task 24 hours later.
res, err = client.EnqueueIn(24*time.Hour, t2)
if err != nil {
log.Fatal(err)
}
fmt.Printf("result: %+v\n", res)
}
// workers.go
func main() {
r := asynq.RedisClientOpt{Addr: "localhost:6379"}
srv := asynq.NewServer(r, asynq.Config{
Concurrency: 10,
})
mux := asynq.NewServeMux()
mux.HandleFunc(tasks.WelcomeEmail, tasks.HandleWelcomeEmailTask)
mux.HandleFunc(tasks.ReminderEmail, tasks.HandleReminderEmailTask)
if err := srv.Run(mux); err != nil {
log.Fatal(err)
}
}
现在,代码看起来更完美了!
运行程序
现在我们既有“客户端”又有“工人”,我们可以运行这两个程序。
让我们运行client
程序来创建和安排任务。
go run client/client.go
这将创建两项任务:一项应立即启动处理,另一项将在24小时后处理。
让我们使用asynq CLI来检查任务。
asynq stats
您应该能够看到有一个任务处于“Enqueued”状态,另一任务处于“Scheduled”状态。
让我们使用watch命令运行asynq stats
,以便我们可以连续运行该命令以查看更改。
watch -n 1 asynq stats
最后,让我们启动“worker”程序来处理任务。
go run workers/workers.go
您应该能够在终端上看到一些打印的文本,表明任务已成功处理。
您可以再次运行“客户端”程序,以查看工作人员如何处理任务并进行处理。
Task Retry(任务重试)
第一次尝试未成功完成任务的情况并不少见。 默认情况下,失败的任务将重试,且指数补偿最多可重复25次。
让我们更新处理程序以返回错误,模拟失败的情况。
// tasks.go
func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
id, err := t.Payload.GetInt("user_id")
if err != nil {
return err
}
fmt.Printf("Attempting to Send Welcome Email to User %d...\n", id)
return fmt.Errorf("could not send email to the user") // <-- Return error
}
让我们重新启动工作程序并入队列。
go run workers/workers.go
go run client/client.go
如果您正在运行asynq stats
,则应该能够看到存在处于“重试”状态的任务。
要检查哪些任务处于重试状态,可以运行
asynq ls retry
这将列出将来将重试的所有任务。 输出包括任务下一次执行的ETA(预计时间)。
任务用尽其重试计数后,该任务将转换为“dead”状态,并且不会被重试(您仍然可以通过运行asynq enq
命令来手动将dead任务加入队列)。
在结束本教程之前,请先修复处理程序。
func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
id, err := t.Payload.GetInt("user_id")
if err != nil {
return err
}
fmt.Printf("Send Welcome Email to User %d\n", id)
return nil
}
现在我们已经修复了处理程序,任务将在下一次尝试中成功处理!
本文是Asynq的简洁. 想要了解更多高级内容 priority queues 和 custom retry, 查看 Wiki page.
谢谢阅读!
本作品采用《CC 协议》,转载必须注明作者和本文链接
请问下为啥不直接使用 goroutine 呢?
或者换句话说:什么场景使用该包做异步任务,什么场景直接使用 goroutine 来做异步
asynq stats 为啥我的用不了呀
我就想知道,这个启动的延时队列 没有cli的话如何取消
大家有task 对于并发控制的要求吗,现在并发控制是在整个服务上,不能细化到task