如何用go开发基于redis的分布式锁。

什么是分布式锁

分布式锁是用于解决分布式系统控制协调任务用的,保证分布式服务中,能达到控制任务的目的。

常见的分布式锁

  • 基于zookeeper
  • 基于etcd
  • 基于consul
  • 基于redis

为啥用redis实现分布式锁

首先因为简单,redis中有个命令是setnx, 这个命令就能起到分布式锁的作用,已经存在key就会设置失败。
其次redis是一般项目中都有的组件,如果采用其他的,可能会增加成本。
再说redis的性能十分强劲,能满足大多数公司的需求。

用go开发基于redis的分布式锁

大致功能如下

  • 设置过期时间
    防止单点服务异常,锁不释放
  • 自动续租
    防止锁超时问题,如设置过期时间2秒,但是中间的任务如执行几条sql语句用了5秒,那么这个锁就可能是无效了,很可能被其他服务给占用,导致sql任务的异常出现。 当锁住2/3过期时间还未过期的,我们可以对他续租。 这样锁就还是有效的。
  • 选项模式
    redisClient可以使用全局的,也可以有用户传入
    过期时间
    上下文

首先我们用到最常见的go-redis库

go get github.com/go-redis/redis/v8

核心代码就200行。

全局redisClient实现

package goredislock

import (
    "context"
    "fmt"
    "log"
    "sync"
    "time"

    "github.com/go-redis/redis/v8"
)

var GlobalRedisClient *redis.Client
var redisClientOnce sync.Once

func NewRedisClient(addr string) *redis.Client {
    redisClientOnce.Do(func() {
        GlobalRedisClient = redis.NewClient(&redis.Options{
            Network:  "tcp",
            Addr:     addr,
            Password: "", //密码
            DB:       0,  // redis数据库

            //连接池容量及闲置连接数量
            PoolSize:     15, // 连接池数量
            MinIdleConns: 10, //好比最小连接数
            //超时
            DialTimeout:  5 * time.Second, //连接建立超时时间
            ReadTimeout:  3 * time.Second, //读超时,默认3秒, -1表示取消读超时
            WriteTimeout: 3 * time.Second, //写超时,默认等于读超时
            PoolTimeout:  4 * time.Second, //当所有连接都处在繁忙状态时,客户端等待可用连接的最大等待时长,默认为读超时+1秒。

            //闲置连接检查包括IdleTimeout,MaxConnAge
            IdleCheckFrequency: 60 * time.Second, //闲置连接检查的周期,默认为1分钟,-1表示不做周期性检查,只在客户端获取连接时对闲置连接进行处理。
            IdleTimeout:        5 * time.Minute,  //闲置超时
            MaxConnAge:         0 * time.Second,  //连接存活时长,从创建开始计时,超过指定时长则关闭连接,默认为0,即不关闭存活时长较长的连接

            //命令执行失败时的重试策略
            MaxRetries:      0,                      // 命令执行失败时,最多重试多少次,默认为0即不重试
            MinRetryBackoff: 8 * time.Millisecond,   //每次计算重试间隔时间的下限,默认8毫秒,-1表示取消间隔
            MaxRetryBackoff: 512 * time.Millisecond, //每次计算重试间隔时间的上限,默认512毫秒,-1表示取消间隔

        })
        pong, err := GlobalRedisClient.Ping(context.Background()).Result()
        if err != nil {
            log.Fatal(fmt.Errorf("redis connect error:%s", err))
        }
        log.Println(pong)
    })
    return GlobalRedisClient
}

分布式锁实现

package goredislock

import (
    "context"
    "log"
    "time"

    "github.com/go-redis/redis/v8"
)

const defaultExpireTime = time.Second * 10

// 分布式锁
type Locker struct {
    key        string        // redis key
    unlock     bool          // 是否已经解锁 ,解锁则不用续租
    incrScript *redis.Script // lua script
    option     options       // 可选项
}

type Options func(o *options)

type options struct {
    expire      time.Duration // expiration time ,默认是10秒
    redisClient *redis.Client // redis 实例, 可以传, 不传就得用全局的
    ctx         context.Context
}

// 续租的时候 获得和设置过期原子操作.
const incrLua = `
if redis.call('get', KEYS[1]) == ARGV[1] then
  return redis.call('expire', KEYS[1],ARGV[2])                 
 else
   return '0'                     
end`

func NewLocker(key string, opts ...Options) *Locker {
    var lock = &Locker{
        key:        key,
        incrScript: redis.NewScript(incrLua),
    }
    for _, opt := range opts {
        opt(&lock.option)
    }
    // 没设置过期时间
    if lock.option.expire == 0 {
        lock.option.expire = defaultExpireTime
    }
    // 未设置redis 实例
    if lock.option.redisClient == nil {
        lock.option.redisClient = GlobalRedisClient
    }
    // 未设置context
    if lock.option.ctx == nil {
        lock.option.ctx = context.Background()
    }
    return lock
}

// 过期选项
func WithExpire(expire time.Duration) Options {
    return func(o *options) {
        o.expire = expire
    }
}

// redisClient 选项,可以每次都传,如果没传,就用全局都
func WithRedisClient(redisClient *redis.Client) Options {
    return func(o *options) {
        o.redisClient = redisClient
    }
}

func WithContext(ctx context.Context) Options {
    return func(o *options) {
        o.ctx = ctx
    }
}

// 第一个返回:返回锁,方便链式操作
// 第二个 返回结果
func (this *Locker) Lock() (*Locker, bool) {
    boolcmd := this.option.redisClient.SetNX(context.Background(), this.key, "1", this.option.expire)
    if ok, err := boolcmd.Result(); err != nil || !ok {
        return this, false
    }
    this.expandLockTime()
    return this, true
}

// 续租
func (this *Locker) expandLockTime() {
    sleepTime := this.option.expire * 2 / 3
    go func() {
        for {
            time.Sleep(sleepTime)
            if this.unlock {
                break
            }
            this.resetExpire()
        }
    }()
}

// 重新设置过期时间
func (this *Locker) resetExpire() {
    cmd := this.incrScript.Run(this.option.ctx, this.option.redisClient, []string{this.key}, 1, this.option.expire.Seconds())
    v, err := cmd.Result()
    log.Printf("key=%s ,续期结果:%v,%v\n", this.key, err, v)
}

// 释放锁  干完活后释放锁
func (this *Locker) Unlock() {
    this.unlock = true
    this.option.redisClient.Del(this.option.ctx, this.key)
}

使用demo

package main

import (
  "context"
  "fmt"
  "time"

  "github.com/cr-mao/goredislock"
)

/*
续租测试,

2秒过期时间,续租时间大概是 1.33秒,10进行了7次续租,复合要求
2023/06/17 17:37:08 PONG
true
2023/06/17 17:37:10 key=test_lock_key ,续期结果:<nil>,1
2023/06/17 17:37:11 key=test_lock_key ,续期结果:<nil>,1
2023/06/17 17:37:12 key=test_lock_key ,续期结果:<nil>,1
2023/06/17 17:37:14 key=test_lock_key ,续期结果:<nil>,1
2023/06/17 17:37:15 key=test_lock_key ,续期结果:<nil>,1
2023/06/17 17:37:16 key=test_lock_key ,续期结果:<nil>,1
2023/06/17 17:37:18 key=test_lock_key ,续期结果:<nil>,1
*/
func main() {
    // 实例化全局redisclient, 分布式锁则会用这个redisClient
  goredislock.NewRedisClient("127.0.0.1:6379")
  // 1.33秒左右就会续租
  locker, ok := goredislock.NewLocker("test_lock_key", goredislock.WithContext(context.Background()), goredislock.WithExpire(time.Second*2)).Lock()
  fmt.Println(ok)
  time.Sleep(time.Second*10)
  defer locker.Unlock()
}

仓库地址: github.com/cr-mao/goredislock

本作品采用《CC 协议》,转载必须注明作者和本文链接
讨论数量: 4

github.com/go-redsync/redsync 这个感觉就挺好,还支持多Redis实例

10个月前 评论
CR-MAO (楼主) 10个月前
package goredislock_test

import (
    "context"
    "testing"
    "time"

    "github.com/your/package/goredislock"
    "github.com/go-redis/redis/v8"
    "github.com/stretchr/testify/assert"
)

const (
    redisAddr = "localhost:6379"
    testKey   = "test-lock-key"
)

func TestLocker(t *testing.T) {
    // 创建 Redis 客户端
    redisClient := goredislock.NewRedisClient(redisAddr)
    defer redisClient.Close()

    // 创建分布式锁
    lock := goredislock.NewLocker(testKey, goredislock.WithRedisClient(redisClient))

    // 获取锁并进行操作成功的情况
    locker, ok := lock.Lock()
    assert.True(t, ok)

    // 续租锁的过程和正确续租的结果
    time.Sleep(3 * time.Second) // 假设锁的过期时间为10秒,这里等待3秒进行续租
    locker.ResetExpire()

    // 释放锁后再次获取锁成功的情况
    locker.Unlock()
    locker, ok = lock.Lock()
    assert.True(t, ok)

    // 获取锁失败的情况
    otherLock := goredislock.NewLocker(testKey, goredislock.WithRedisClient(redisClient))
    _, ok = otherLock.Lock()
    assert.False(t, ok)
}

10个月前 评论
CR-MAO (楼主) 10个月前

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!