Asynq 实现 Go 后台作业异步定时任务处理 (7/11 更新)

在本教程中,我们将编写两个程序clientworkers

-client.go将创建和安排任务,由后台工作程序异步处理。
-workers.go将启动多个并发工作器来处理客户端创建的任务。

本教程假定您正在localhost:6379上运行Redis服务器。
在开始之前,请确保您已安装并运行Redis。

让我们从创建两个主要文件开始。

mkdir quickstart && cd quickstart
go mod init asynq-quickstart
mkdir client workers
touch client/client.go workers/workers.go

并安装asynq软件包。

go get -u github.com/hibiken/asynq

在开始编写代码之前,让我们回顾一下将在两个程序中使用的一些核心类型。

Redis连接选项

Asynq使用Redis作为消息代理。
client.goworkers.go都需要连接到Redis进行写入和读取。
我们将使用RedisClientOpt指定如何连接到本地Redis实例。

var redis = asynq.RedisClientOpt{
    Addr: "localhost:6379",
    // Omit if no password is required
    Password: "mypassword",
    // Use a dedicated db number for asynq.
    // By default, Redis offers 16 databases (0..15)
    DB: 0,
}

Tasks任务

asynq中,工作单元被封装为Task类型。
其中有两个字段:“类型”和“有效载荷”。

// Task represents a task to be performed.
type Task struct {
    // Type indicates the type of a task to be performed.
    Type string

    // Payload holds data needed to perform the task.
    Payload Payload
}

Type是一个简单的字符串值,指示给定任务的类型。
Payload保存执行任务所需的数据,您可以将其视为map[string]interface{}。需要注意的重要一件事是有效负载值必须是可序列化的

现在,我们已经研究了核心类型,下一步开始编写程序。


Client Program (客户程序)

client.go中,我们将创建一些任务,并使用asynq.Client将它们加入队列。

要创建任务,请使用NewTask函数,并为任务传递类型和有效负载。

asynq.Client支持三种调度任务的方法:EnqueueEnqueueInEnqueueAt
使用client.Enqueue将任务立即加入队列。
使用client.EnqueueInclient.EnqueueAt来安排将来要处理的任务。

// client.go
func main() {
    r := asynq.RedisClientOpt{Addr: "localhost:6379"}
    client := asynq.NewClient(r)

    // Create a task with typename and payload.
    t1 := asynq.NewTask(
        "email:welcome",
        map[string]interface{}{"user_id": 42})

    t2 := asynq.NewTask(
        "email:reminder",
        map[string]interface{}{"user_id": 42})

    // Process the task immediately.
    res, err := client.Enqueue(t1)
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("result: %+v\n", res)

    // Process the task 24 hours later.
    res, err = client.EnqueueIn(24*time.Hour, t2)
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("result: %+v\n", res)
}

这就是我们需要的客户端程序。

Workers Program

workers.go中,我们将创建一个asynq.Server实例来启动worker。

NewServer函数采用RedisConnOptConfig

Config用于调整服务器的任务处理行为。
您可以查看Config上的文档以查看所有可用的config选项。

为简单起见,在此示例中,我们仅指定并发。

// workers.go
func main() {
    r := asynq.RedisClientOpt{Addr: "localhost:6379"}
    srv := asynq.NewServer(r, asynq.Config{
        Concurrency: 10,
    })

    // NOTE: We'll cover what this `handler` is in the section below.
    if err := srv.Run(handler); err != nil {
        log.Fatal(err)
    }
}

(* Server).Run的参数是一个接口asynq.Handler,它具有一个方法ProcessTask

// ProcessTask should return nil if the task was processed successfully.
//
// If ProcessTask returns a non-nil error or panics, the task will be retried again later.
type Handler interface {
    ProcessTask(context.Context, *Task) error
}

实现处理程序的最简单方法是定义一个具有相同签名的函数,并将其传递给Run时使用asynq.HandlerFunc适配器类型。

func handler(ctx context.Context, t *asynq.Task) error {
    switch t.Type {
    case "email:welcome":
        id, err := t.Payload.GetInt("user_id")
        if err != nil {
            return err
        }
        fmt.Printf("Send Welcome Email to User %d\n", id)

    case "email:reminder":
        id, err := t.Payload.GetInt("user_id")
        if err != nil {
            return err
        }
        fmt.Printf("Send Reminder Email to User %d\n", id)

    default:
        return fmt.Errorf("unexpected task type: %s", t.Type)
    }
    return nil
}

func main() {
    r := asynq.RedisClientOpt{Addr: "localhost:6379"}
    srv := asynq.NewServer(r, asynq.Config{
        Concurrency: 10,
    })

    // Use asynq.HandlerFunc adapter for a handler function
    if err := srv.Run(asynq.HandlerFunc(handler)); err != nil {
        log.Fatal(err)
    }
}

我们可以继续向此处理函数添加案例,但是在实际应用中,在单独的函数中为每种案例定义逻辑很方便。

为了重构我们的代码,让我们使用ServeMux创建我们的处理程序。
就像来自"net/http"包中的ServeMux一样,您可以通过调用HandleHandleFunc来注册处理程序。 ServeMux满足Handler接口,因此您可以将其传递给(*Server).Run

// workers.go
func main() {
    r := asynq.RedisClientOpt{Addr: "localhost:6379"}
    srv := asynq.NewServer(r, asynq.Config{
        Concurrency: 10,
    })

    mux := asynq.NewServeMux()
    mux.HandleFunc("email:welcome", sendWelcomeEmail)
    mux.HandleFunc("email:reminder", sendReminderEmail)

    if err := srv.Run(mux); err != nil {
        log.Fatal(err)
    }
}

func sendWelcomeEmail(ctx context.Context, t *asynq.Task) error {
    id, err := t.Payload.GetInt("user_id")
    if err != nil {
        return err
    }
    fmt.Printf("Send Welcome Email to User %d\n", id)
    return nil
}

func sendReminderEmail(ctx context.Context, t *asynq.Task) error {
    id, err := t.Payload.GetInt("user_id")
    if err != nil {
        return err
    }
    fmt.Printf("Send Reminder Email to User %d\n", id)
    return nil
}

因为我们已经提取了处理每种任务类型的函数,所以代码看起来会更有条理。
但是,代码有点含糊不清,对于任务类型和有效负载密钥,我们具有这些字符串值,应将它们封装在一个内聚的程序包中。 让我们通过编写一个封装任务创建和处理的包来重构代码。 我们将简单地创建一个名为tasks的软件包。

mkdir tasks && touch tasks/tasks.go
package tasks

import (
    "context"
    "fmt"

    "github.com/hibiken/asynq"
)

// A list of task types.
const (
    WelcomeEmail  = "email:welcome"
    ReminderEmail = "email:reminder"
)

func NewWelcomeEmailTask(id int) *asynq.Task {
    payload := map[string]interface{}{"user_id": id}
    return asynq.NewTask(WelcomeEmail, payload)
}

func NewReminderEmailTask(id int) *asynq.Task {
    payload := map[string]interface{}{"user_id": id}
    return asynq.NewTask(ReminderEmail, payload)
}

func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
    id, err := t.Payload.GetInt("user_id")
    if err != nil {
        return err
    }
    fmt.Printf("Send Welcome Email to User %d\n", id)
    return nil
}

func HandleReminderEmailTask(ctx context.Context, t *asynq.Task) error {
    id, err := t.Payload.GetInt("user_id")
    if err != nil {
        return err
    }
    fmt.Printf("Send Reminder Email to User %d\n", id)
    return nil
}

现在,我们可以将此包导入到client.go和workers.go中。

// client.go
func main() {
    r := asynq.RedisClientOpt{Addr: "localhost:6379"}
    client := asynq.NewClient(r)

    t1 := tasks.NewWelcomeEmailTask(42)

    t2 := tasks.NewReminderEmailTask(42)

    // Process the task immediately.
    res, err := client.Enqueue(t1)
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("result: %+v\n", res)

    // Process the task 24 hours later.
    res, err = client.EnqueueIn(24*time.Hour, t2)
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("result: %+v\n", res)
}
// workers.go
func main() {
    r := asynq.RedisClientOpt{Addr: "localhost:6379"}
    srv := asynq.NewServer(r, asynq.Config{
        Concurrency: 10,
    })

    mux := asynq.NewServeMux()
    mux.HandleFunc(tasks.WelcomeEmail, tasks.HandleWelcomeEmailTask)
    mux.HandleFunc(tasks.ReminderEmail, tasks.HandleReminderEmailTask)

    if err := srv.Run(mux); err != nil {
        log.Fatal(err)
    }
}

现在,代码看起来更完美了!

运行程序

现在我们既有“客户端”又有“工人”,我们可以运行这两个程序。
让我们运行client程序来创建和安排任务。

go run client/client.go

这将创建两项任务:一项应立即启动处理,另一项将在24小时后处理。

让我们使用asynq CLI来检查任务。

asynq stats

您应该能够看到有一个任务处于“Enqueued”状态,另一任务处于“Scheduled”状态。

让我们使用watch命令运行asynq stats,以便我们可以连续运行该命令以查看更改。

watch -n 1 asynq stats 

最后,让我们启动“worker”程序来处理任务。

go run workers/workers.go

您应该能够在终端上看到一些打印的文本,表明任务已成功处理。

您可以再次运行“客户端”程序,以查看工作人员如何处理任务并进行处理。

Task Retry(任务重试)

第一次尝试未成功完成任务的情况并不少见。 默认情况下,失败的任务将重试,且指数补偿最多可重复25次。
让我们更新处理程序以返回错误,模拟失败的情况。

// tasks.go
func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
    id, err := t.Payload.GetInt("user_id")
    if err != nil {
        return err
    }
    fmt.Printf("Attempting to Send Welcome Email to User %d...\n", id)
    return fmt.Errorf("could not send email to the user") // <-- Return error 
}

让我们重新启动工作程序并入队列。

go run workers/workers.go

go run client/client.go

如果您正在运行asynq stats,则应该能够看到存在处于“重试”状态的任务。

要检查哪些任务处于重试状态,可以运行

asynq ls retry

这将列出将来将重试的所有任务。 输出包括任务下一次执行的ETA(预计时间)。

任务用尽其重试计数后,该任务将转换为“dead”状态,并且不会被重试(您仍然可以通过运行asynq enq 命令来手动将dead任务加入队列)。

在结束本教程之前,请先修复处理程序。

func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
    id, err := t.Payload.GetInt("user_id")
    if err != nil {
        return err
    }
    fmt.Printf("Send Welcome Email to User %d\n", id)
    return nil 
}

现在我们已经修复了处理程序,任务将在下一次尝试中成功处理!

本文是Asynq的简洁. 想要了解更多高级内容 priority queuescustom retry, 查看 Wiki page.

谢谢阅读!

本作品采用《CC 协议》,转载必须注明作者和本文链接
讨论数量: 6
Destiny

:+1:

3年前 评论

请问下为啥不直接使用 goroutine 呢?
或者换句话说:什么场景使用该包做异步任务,什么场景直接使用 goroutine 来做异步

3年前 评论
VICys 1年前

asynq stats 为啥我的用不了呀

2年前 评论

我就想知道,这个启动的延时队列 没有cli的话如何取消

9个月前 评论

大家有task 对于并发控制的要求吗,现在并发控制是在整个服务上,不能细化到task

4周前 评论

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!