如何用go开发基于redis的分布式锁。
什么是分布式锁
分布式锁是用于解决分布式系统控制协调任务用的,保证分布式服务中,能达到控制任务的目的。
常见的分布式锁
- 基于zookeeper
- 基于etcd
- 基于consul
- 基于redis
为啥用redis实现分布式锁
首先因为简单,redis中有个命令是setnx, 这个命令就能起到分布式锁的作用,已经存在key就会设置失败。
其次redis是一般项目中都有的组件,如果采用其他的,可能会增加成本。
再说redis的性能十分强劲,能满足大多数公司的需求。
用go开发基于redis的分布式锁
大致功能如下
- 设置过期时间
防止单点服务异常,锁不释放 - 自动续租
防止锁超时问题,如设置过期时间2秒,但是中间的任务如执行几条sql语句用了5秒,那么这个锁就可能是无效了,很可能被其他服务给占用,导致sql任务的异常出现。 当锁住2/3过期时间还未过期的,我们可以对他续租。 这样锁就还是有效的。 - 选项模式
redisClient可以使用全局的,也可以有用户传入
过期时间
上下文
首先我们用到最常见的go-redis库
go get github.com/go-redis/redis/v8
核心代码就200行。
全局redisClient实现
package goredislock
import (
"context"
"fmt"
"log"
"sync"
"time"
"github.com/go-redis/redis/v8"
)
var GlobalRedisClient *redis.Client
var redisClientOnce sync.Once
func NewRedisClient(addr string) *redis.Client {
redisClientOnce.Do(func() {
GlobalRedisClient = redis.NewClient(&redis.Options{
Network: "tcp",
Addr: addr,
Password: "", //密码
DB: 0, // redis数据库
//连接池容量及闲置连接数量
PoolSize: 15, // 连接池数量
MinIdleConns: 10, //好比最小连接数
//超时
DialTimeout: 5 * time.Second, //连接建立超时时间
ReadTimeout: 3 * time.Second, //读超时,默认3秒, -1表示取消读超时
WriteTimeout: 3 * time.Second, //写超时,默认等于读超时
PoolTimeout: 4 * time.Second, //当所有连接都处在繁忙状态时,客户端等待可用连接的最大等待时长,默认为读超时+1秒。
//闲置连接检查包括IdleTimeout,MaxConnAge
IdleCheckFrequency: 60 * time.Second, //闲置连接检查的周期,默认为1分钟,-1表示不做周期性检查,只在客户端获取连接时对闲置连接进行处理。
IdleTimeout: 5 * time.Minute, //闲置超时
MaxConnAge: 0 * time.Second, //连接存活时长,从创建开始计时,超过指定时长则关闭连接,默认为0,即不关闭存活时长较长的连接
//命令执行失败时的重试策略
MaxRetries: 0, // 命令执行失败时,最多重试多少次,默认为0即不重试
MinRetryBackoff: 8 * time.Millisecond, //每次计算重试间隔时间的下限,默认8毫秒,-1表示取消间隔
MaxRetryBackoff: 512 * time.Millisecond, //每次计算重试间隔时间的上限,默认512毫秒,-1表示取消间隔
})
pong, err := GlobalRedisClient.Ping(context.Background()).Result()
if err != nil {
log.Fatal(fmt.Errorf("redis connect error:%s", err))
}
log.Println(pong)
})
return GlobalRedisClient
}
分布式锁实现
package goredislock
import (
"context"
"log"
"time"
"github.com/go-redis/redis/v8"
)
const defaultExpireTime = time.Second * 10
// 分布式锁
type Locker struct {
key string // redis key
unlock bool // 是否已经解锁 ,解锁则不用续租
incrScript *redis.Script // lua script
option options // 可选项
}
type Options func(o *options)
type options struct {
expire time.Duration // expiration time ,默认是10秒
redisClient *redis.Client // redis 实例, 可以传, 不传就得用全局的
ctx context.Context
}
// 续租的时候 获得和设置过期原子操作.
const incrLua = `
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('expire', KEYS[1],ARGV[2])
else
return '0'
end`
func NewLocker(key string, opts ...Options) *Locker {
var lock = &Locker{
key: key,
incrScript: redis.NewScript(incrLua),
}
for _, opt := range opts {
opt(&lock.option)
}
// 没设置过期时间
if lock.option.expire == 0 {
lock.option.expire = defaultExpireTime
}
// 未设置redis 实例
if lock.option.redisClient == nil {
lock.option.redisClient = GlobalRedisClient
}
// 未设置context
if lock.option.ctx == nil {
lock.option.ctx = context.Background()
}
return lock
}
// 过期选项
func WithExpire(expire time.Duration) Options {
return func(o *options) {
o.expire = expire
}
}
// redisClient 选项,可以每次都传,如果没传,就用全局都
func WithRedisClient(redisClient *redis.Client) Options {
return func(o *options) {
o.redisClient = redisClient
}
}
func WithContext(ctx context.Context) Options {
return func(o *options) {
o.ctx = ctx
}
}
// 第一个返回:返回锁,方便链式操作
// 第二个 返回结果
func (this *Locker) Lock() (*Locker, bool) {
boolcmd := this.option.redisClient.SetNX(context.Background(), this.key, "1", this.option.expire)
if ok, err := boolcmd.Result(); err != nil || !ok {
return this, false
}
this.expandLockTime()
return this, true
}
// 续租
func (this *Locker) expandLockTime() {
sleepTime := this.option.expire * 2 / 3
go func() {
for {
time.Sleep(sleepTime)
if this.unlock {
break
}
this.resetExpire()
}
}()
}
// 重新设置过期时间
func (this *Locker) resetExpire() {
cmd := this.incrScript.Run(this.option.ctx, this.option.redisClient, []string{this.key}, 1, this.option.expire.Seconds())
v, err := cmd.Result()
log.Printf("key=%s ,续期结果:%v,%v\n", this.key, err, v)
}
// 释放锁 干完活后释放锁
func (this *Locker) Unlock() {
this.unlock = true
this.option.redisClient.Del(this.option.ctx, this.key)
}
使用demo
package main
import (
"context"
"fmt"
"time"
"github.com/cr-mao/goredislock"
)
/*
续租测试,
2秒过期时间,续租时间大概是 1.33秒,10进行了7次续租,复合要求
2023/06/17 17:37:08 PONG
true
2023/06/17 17:37:10 key=test_lock_key ,续期结果:<nil>,1
2023/06/17 17:37:11 key=test_lock_key ,续期结果:<nil>,1
2023/06/17 17:37:12 key=test_lock_key ,续期结果:<nil>,1
2023/06/17 17:37:14 key=test_lock_key ,续期结果:<nil>,1
2023/06/17 17:37:15 key=test_lock_key ,续期结果:<nil>,1
2023/06/17 17:37:16 key=test_lock_key ,续期结果:<nil>,1
2023/06/17 17:37:18 key=test_lock_key ,续期结果:<nil>,1
*/
func main() {
// 实例化全局redisclient, 分布式锁则会用这个redisClient
goredislock.NewRedisClient("127.0.0.1:6379")
// 1.33秒左右就会续租
locker, ok := goredislock.NewLocker("test_lock_key", goredislock.WithContext(context.Background()), goredislock.WithExpire(time.Second*2)).Lock()
fmt.Println(ok)
time.Sleep(time.Second*10)
defer locker.Unlock()
}
仓库地址: github.com/cr-mao/goredislock
本作品采用《CC 协议》,转载必须注明作者和本文链接
github.com/go-redsync/redsync 这个感觉就挺好,还支持多Redis实例