go-zero-antd实战-5(go-zero添加asynq队列任务)

go-zero-antd实战-5(go-zero添加asynq队列任务)

后台系统一般都会有任务队列的需求,本项目加入了asynq,具体使用参考官方文档。使用不复杂,这里只是简单的加入到go-zero单体服务的项目中,如果使用微服务可以参考go-zero-looklook项目它相当于单独做了个asynq的服务,设计思路类似go-zero的api,我仔细阅读后完成了这次接入。

asynq加入项目

创建了aqueue目录,


aqueue/

├── handler // task任务

│ └── userlist.go

├── jobtype // task对应的结构体和const

│ └── userlist.go

├── main.go

└── queue

└── queue.go // asynq 服务器部分的启动文件

3 directories, 4 files

首先先看main.go


func  main() {

// 这里还是去引用go-zero的配置

var  c config.Config

conf.MustLoad("../etc/backend.yaml", &c)

svcCtx := svc.NewServiceContext(c)

ctx := context.Background()

// 这里可以看源码,类似go-zero的rest,也可以看做http

job := queue.NewQueue(ctx, svcCtx)

// 注册路由

mux := job.Register()

// 启动asynq服务连接redis

server := asynq.NewServer(

asynq.RedisClientOpt{Addr: svcCtx.Config.Redis.Host},

asynq.Config{

IsFailure: func(err error) bool {

fmt.Printf("asynq server exec task IsFailure ======== >>>>>>>>>>> err : %+v  \n", err)

return  true

},

Concurrency: 20, //max concurrent process job task num

},

)

if  err := server.Run(mux); err != nil {

logx.WithContext(ctx).Errorf("!!!CronJobErr!!! run err:%+v", err)

os.Exit(1)

}

}

创建 aqueue/queue/queue.go


package queue

import (

"context"

"tapi/aqueue/handler"

"tapi/aqueue/jobtype"

"tapi/internal/svc"

"github.com/hibiken/asynq"

)

type  Queue  struct {

ctx context.Context

svcCtx *svc.ServiceContext

}

func  NewQueue(ctx context.Context, svcCtx *svc.ServiceContext) *Queue {

return &Queue{

ctx: ctx,

svcCtx: svcCtx,

}

}

// register job 这里一看就和go-zero的router类似

func (l *Queue) Register() *asynq.ServeMux {

mux := asynq.NewServeMux()

// job handler就加入到这里

// NewUserListHandler这个方法要符合asynq.Handler接口

/*

type Handler interface {

ProcessTask(context.Context, *Task) error

}

*/

mux.Handle(jobtype.DesUserList, handler.NewUserListHandler(l.svcCtx))

return mux

}

上面其实算是加入成功了

server部分创建

创建handler 增加aqueue/handler/userlist.go


package handler

import (

"context"

"encoding/json"

"errors"

"fmt"

"tapi/aqueue/jobtype"

"tapi/internal/svc"

"github.com/hibiken/asynq"

)

type  UserListHandler  struct {

svcCtx *svc.ServiceContext

}

func  NewUserListHandler(svcCtx *svc.ServiceContext) *UserListHandler {

return &UserListHandler{

svcCtx: svcCtx,

}

}

func (l *UserListHandler) ProcessTask(ctx context.Context, t *asynq.Task) error {

// 获取参数

var  p jobtype.PayloadUserList

if  err := json.Unmarshal(t.Payload(), &p); err != nil {

return errors.New("参数错误")

}

u := l.svcCtx.BkModel.User

d, err := u.WithContext(context.Background()).Where(u.ID.Eq(p.Id)).Debug().First()

if err != nil {

return err

}

fmt.Println(d)

return  nil

}

创建结构aqueue/jobtype/userlist.go


package jobtype

// 任务标识

const  DesUserList = "job:user_list"

// 任务接收发送参数

type  PayloadUserList  struct {

Id int64

}

添加路由 aqueue/queue/queue.go


// job

mux.Handle(jobtype.DesUserList, handler.NewUserListHandler(l.svcCtx))

服务部分完成了

客户端部分

新增internal/svc/asynqClient.go


package svc

import (

"tapi/internal/config"

"github.com/hibiken/asynq"

)

// create asynq client.

func  newAsynqClient(c config.Config) *asynq.Client {

return asynq.NewClient(asynq.RedisClientOpt{Addr: c.Redis.Host})

}

修改 internal/svc/servicecontext.go


AsynqClient: newAsynqClient(c),

调用客户端

在userinfo接口添加


// 测试一下写入job

// 设置参数

payload, err := json.Marshal(jobtype.PayloadUserList{Id: 2})

if err != nil {

return &types.UserInfoResponse{

Code: 500,

Msg: err.Error(),

}, nil

} else {

// 加入任务

_, err = l.svcCtx.AsynqClient.Enqueue(asynq.NewTask(jobtype.DesUserList, payload))

if err != nil {

return &types.UserInfoResponse{

Code: 500,

Msg: err.Error(),

}, nil

}

}

测试

启动asynq服务端


cd aqueue

go run main.go

// 回显

root@tdev:/home/code/go-zero-antd-backend/api/aqueue# go run main.go

asynq: pid=8583 2023/02/16 17:14:17.527560 INFO: Starting processing

asynq: pid=8583 2023/02/16 17:14:17.528667 INFO: Send signal TSTP to stop processing new tasks

启动go-zero


make dev

访问192.168.1.13:8888/api/user/info

asynq服务端显示


{"@timestamp":"2023-02-17T01:15:17.522+08:00","caller":"stat/usage.go:61","content":"CPU: 5m, MEMORY: Alloc=1.6Mi, TotalAlloc=5.8Mi, Sys=18.6Mi, NumGC=2","level":"stat"}

2023/02/17 01:15:18 /home/code/go-zero-antd-backend/api/bkmodel/dao/query/user.gen.go:238

[1.098ms] [rows:1] SELECT * FROM `user` WHERE `user`.`id` = 2 ORDER BY `user`.`id` LIMIT 1

&{2 ttt 0 123456 1 1676195522 1676195522}

源码已上传

地址

本作品采用《CC 协议》,转载必须注明作者和本文链接
讨论数量: 1

Asynq 的重试的间隔时间可控嘛?

1年前 评论

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!