go-zero-antd实战-5(go-zero添加asynq队列任务)
go-zero-antd实战-5(go-zero添加asynq队列任务)
后台系统一般都会有任务队列的需求,本项目加入了asynq,具体使用参考官方文档。使用不复杂,这里只是简单的加入到go-zero单体服务的项目中,如果使用微服务可以参考go-zero-looklook项目它相当于单独做了个asynq的服务,设计思路类似go-zero的api,我仔细阅读后完成了这次接入。
asynq加入项目
创建了aqueue目录,
aqueue/
├── handler // task任务
│ └── userlist.go
├── jobtype // task对应的结构体和const
│ └── userlist.go
├── main.go
└── queue
└── queue.go // asynq 服务器部分的启动文件
3 directories, 4 files
首先先看main.go
func main() {
// 这里还是去引用go-zero的配置
var c config.Config
conf.MustLoad("../etc/backend.yaml", &c)
svcCtx := svc.NewServiceContext(c)
ctx := context.Background()
// 这里可以看源码,类似go-zero的rest,也可以看做http
job := queue.NewQueue(ctx, svcCtx)
// 注册路由
mux := job.Register()
// 启动asynq服务连接redis
server := asynq.NewServer(
asynq.RedisClientOpt{Addr: svcCtx.Config.Redis.Host},
asynq.Config{
IsFailure: func(err error) bool {
fmt.Printf("asynq server exec task IsFailure ======== >>>>>>>>>>> err : %+v \n", err)
return true
},
Concurrency: 20, //max concurrent process job task num
},
)
if err := server.Run(mux); err != nil {
logx.WithContext(ctx).Errorf("!!!CronJobErr!!! run err:%+v", err)
os.Exit(1)
}
}
创建 aqueue/queue/queue.go
package queue
import (
"context"
"tapi/aqueue/handler"
"tapi/aqueue/jobtype"
"tapi/internal/svc"
"github.com/hibiken/asynq"
)
type Queue struct {
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewQueue(ctx context.Context, svcCtx *svc.ServiceContext) *Queue {
return &Queue{
ctx: ctx,
svcCtx: svcCtx,
}
}
// register job 这里一看就和go-zero的router类似
func (l *Queue) Register() *asynq.ServeMux {
mux := asynq.NewServeMux()
// job handler就加入到这里
// NewUserListHandler这个方法要符合asynq.Handler接口
/*
type Handler interface {
ProcessTask(context.Context, *Task) error
}
*/
mux.Handle(jobtype.DesUserList, handler.NewUserListHandler(l.svcCtx))
return mux
}
上面其实算是加入成功了
server部分创建
创建handler 增加aqueue/handler/userlist.go
package handler
import (
"context"
"encoding/json"
"errors"
"fmt"
"tapi/aqueue/jobtype"
"tapi/internal/svc"
"github.com/hibiken/asynq"
)
type UserListHandler struct {
svcCtx *svc.ServiceContext
}
func NewUserListHandler(svcCtx *svc.ServiceContext) *UserListHandler {
return &UserListHandler{
svcCtx: svcCtx,
}
}
func (l *UserListHandler) ProcessTask(ctx context.Context, t *asynq.Task) error {
// 获取参数
var p jobtype.PayloadUserList
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return errors.New("参数错误")
}
u := l.svcCtx.BkModel.User
d, err := u.WithContext(context.Background()).Where(u.ID.Eq(p.Id)).Debug().First()
if err != nil {
return err
}
fmt.Println(d)
return nil
}
创建结构aqueue/jobtype/userlist.go
package jobtype
// 任务标识
const DesUserList = "job:user_list"
// 任务接收发送参数
type PayloadUserList struct {
Id int64
}
添加路由 aqueue/queue/queue.go
// job
mux.Handle(jobtype.DesUserList, handler.NewUserListHandler(l.svcCtx))
服务部分完成了
客户端部分
新增internal/svc/asynqClient.go
package svc
import (
"tapi/internal/config"
"github.com/hibiken/asynq"
)
// create asynq client.
func newAsynqClient(c config.Config) *asynq.Client {
return asynq.NewClient(asynq.RedisClientOpt{Addr: c.Redis.Host})
}
修改 internal/svc/servicecontext.go
AsynqClient: newAsynqClient(c),
调用客户端
在userinfo接口添加
// 测试一下写入job
// 设置参数
payload, err := json.Marshal(jobtype.PayloadUserList{Id: 2})
if err != nil {
return &types.UserInfoResponse{
Code: 500,
Msg: err.Error(),
}, nil
} else {
// 加入任务
_, err = l.svcCtx.AsynqClient.Enqueue(asynq.NewTask(jobtype.DesUserList, payload))
if err != nil {
return &types.UserInfoResponse{
Code: 500,
Msg: err.Error(),
}, nil
}
}
测试
启动asynq服务端
cd aqueue
go run main.go
// 回显
root@tdev:/home/code/go-zero-antd-backend/api/aqueue# go run main.go
asynq: pid=8583 2023/02/16 17:14:17.527560 INFO: Starting processing
asynq: pid=8583 2023/02/16 17:14:17.528667 INFO: Send signal TSTP to stop processing new tasks
启动go-zero
make dev
访问192.168.1.13:8888/api/user/info
asynq服务端显示
{"@timestamp":"2023-02-17T01:15:17.522+08:00","caller":"stat/usage.go:61","content":"CPU: 5m, MEMORY: Alloc=1.6Mi, TotalAlloc=5.8Mi, Sys=18.6Mi, NumGC=2","level":"stat"}
2023/02/17 01:15:18 /home/code/go-zero-antd-backend/api/bkmodel/dao/query/user.gen.go:238
[1.098ms] [rows:1] SELECT * FROM `user` WHERE `user`.`id` = 2 ORDER BY `user`.`id` LIMIT 1
&{2 ttt 0 123456 1 1676195522 1676195522}
源码已上传
本作品采用《CC 协议》,转载必须注明作者和本文链接
Asynq 的重试的间隔时间可控嘛?