分布式锁

分布式锁

悲观锁与乐观锁是人们定义出来的概念,你可以理解为一种思想,是处理并发资源的常用手段。
不要把他们与mysql中提供的锁机制(表锁,行锁,排他锁,共享锁)混为一谈。

文章介绍

下面我将以真正的业务逻辑场景来介绍:

现在我们有一个库存服务,用来扣减库存,代码如下:

这里我们特别什么DB.Begin()方法,它是gorm为我们提供的事务操作,这里事务操作指库存扣减服务:必须全部扣减成功,或者全部扣减失败,不允许出现a商品扣减成功,而b商品扣减失败的情况,如果出现了扣减中途失败,那么服务就会将之前扣减成功的数量归还数据库,即:tx.Rollback(),当所有扣减都成功后,使用tx.Commit()对数据进行提交。

//Sell 扣减库存,涉及事务逻辑,执行的逻辑必须全部成功或者全部失败并且失败后数据可恢复,不能中途失败
func (i *InventoryServer) Sell(ctx context.Context, req *proto.SellInfo) (*empty.Empty, error) {

  //global.DB 为已经完成初始化的数据库
    tx := global.DB.Begin()
    for _, goodsInfo := range req.GoodsInfo {

        var inventory model.Inventory
        if result := global.DB.Where(&model.Inventory{Goods: goodsInfo.GoodsId}).First(&inventory); result.RowsAffected == 0 {
            //失败进行事务回滚
            tx.Rollback()
            return nil, status.Errorf(codes.InvalidArgument, "库存信息不存在")
        }

        if inventory.Stocks < goodsInfo.Num {
            //失败进行事务回滚
            tx.Rollback()
            return nil, status.Errorf(codes.ResourceExhausted, "库存不足")
        }
    //扣减库存
        inventory.Stocks -= goodsInfo.Num
    //保存修改
        tx.Save(&inventory)
    }
  //事务业务提交
    tx.Commit()
    return &empty.Empty{}, nil
}

model.Inventory:

//Inventory 库存
type Inventory struct {
    BaseModel
    Goods   int32 `gorm:"type:int comment '商品id';index"`
    Stocks  int32 `gorm:"type:int comment '商品库存'"`
}

现在我们来库存服务的库存扣减还有什么问题:

  1. 当有两个或者多个服务同时调用库存扣减服务的时候,会出现少扣 (这里以两个为例,每一个调用扣一件库存):当q1查询数据库将数据库存100件拿出,还没完成更新库存时,这时q2也进入了服务,查询数据库此时拿到的库存仍然为100件,接着q1完成了库存扣减,过后q2也完成扣减,此时库存服务就出现了少扣,服务调用了两次,数据库应该为98件,结果却是只扣1件。

分布式锁

这个问题其实好解决,那么我们直接给服务,【查询库存】-> 【更新库存】上锁,这个过程只能允许一个请求就来,我们来看看如何实现:

//给服务上锁
var ms sync.Mutex

//Sell 扣减库存,涉及事务逻辑,执行的逻辑必须全部成功或者全部失败并且失败后数据可恢复,不能中途失败
func (i *InventoryServer) Sell(ctx context.Context, req *proto.SellInfo) (*empty.Empty, error) {

  //获取锁
  ms.Lock()
  //global.DB 为已经完成初始化的数据库
    tx := global.DB.Begin()
    for _, goodsInfo := range req.GoodsInfo {

        var inventory model.Inventory
        if result := global.DB.Where(&model.Inventory{Goods: goodsInfo.GoodsId}).First(&inventory); result.RowsAffected == 0 {
            //失败进行事务回滚
            tx.Rollback()
            return nil, status.Errorf(codes.InvalidArgument, "库存信息不存在")
        }

        if inventory.Stocks < goodsInfo.Num {
            //失败进行事务回滚
            tx.Rollback()
            return nil, status.Errorf(codes.ResourceExhausted, "库存不足")
        }
    //扣减库存
        inventory.Stocks -= goodsInfo.Num
    //保存修改
        tx.Save(&inventory)

    //释放锁
      ms.Unlock()
    }
  //事务业务提交
    tx.Commit()
    return &empty.Empty{}, nil
}
  1. 如果我们在多台服务器上部署了我们的库存服务,并且多个服务器访问同一个数据库,那么问题就来了

    我们怎么约束不同服务器之间同时访问数据库的问题呢?

基于mysql的分布式锁

对于不同服务器共同访问同一数据库时我们可以使用分布式锁,来控制服务的行为

分布式锁

1. 分布式悲观锁

悲观锁,顾名思义,就是对于数据的处理持悲观态度,总认为会发生并发冲突,获取和修改数据时,别人会修改数据。所以在整个数据处理过程中,需要将数据锁定。
悲观锁的实现,通常依靠数据库提供的锁机制实现,比如mysql的排他锁,select … for update来实现悲观锁。

分布式悲观锁原理:

在使用基于mysql分布式悲观锁时之前您需要先了解:基于mysql分布式悲观锁原理

悲观锁在库存服务中的应用:

在这个过程中我们是使用gorm来完成mysql的分布式悲观锁的

核心的代码在这里,该方法就能完成悲观锁了

更多关于gorm的学习

DB.Clauses(clause.Locking{Strength: "UPDATE"})

完整代码:

//Sell 扣减库存,涉及事务逻辑,执行的逻辑必须全部成功或者全部失败并且失败后数据可恢复,不能中途失败
func (i *InventoryServer) Sell(ctx context.Context, req *proto.SellInfo) (*empty.Empty, error) {

    //并发情况下可能会出现超买,需要使用锁来将并发串行化
    //事务开始
    tx := global.DB.Begin()

    //悲观锁 对数据库进行上锁,会降低一定性能
    var inventory model.Inventory
    if result := tx.Clauses(clause.Locking{Strength: "UPDATE"}).Where(&model.Inventory{Goods: goodsInfo.GoodsId}).First(&inventory); result.RowsAffected == 0 {
     //失败进行事务回滚
        tx.Rollback()
        return nil, status.Errorf(codes.InvalidArgument, "库存信息不存在")
    }
  if inventory.Stocks < goodsInfo.Num {
        //失败进行事务回滚
        tx.Rollback()
        return nil, status.Errorf(codes.ResourceExhausted, "库存不足")
    }
    inventory.Stocks -= goodsInfo.Num
  tx.Save(&inventory)
  //提交事务
  tx.Commit()
    return &empty.Empty{}, nil
}

2. 分布式乐观锁:

原理:

如图:
分布式锁

乐观锁的应用
func (i *InventoryServer) Sell(ctx context.Context, req *proto.SellInfo) (*empty.Empty, error) {
    //并发情况下可能会出现超买,需要使用锁来将并发串行化
    //事务开始
    tx := global.DB.Begin()
    for _, goodsInfo := range req.GoodsInfo {

        //分布式乐观锁
        var inventory model.Inventory
        for {
            if result := global.DB.Where(&model.Inventory{Goods: goodsInfo.GoodsId}).First(&inventory); result.RowsAffected == 0 {
                //失败进行事务回滚
                tx.Rollback()
                return nil, status.Errorf(codes.InvalidArgument, "库存信息不存在")
            }

            if inventory.Stocks < goodsInfo.Num {
                //失败进行事务回滚
                tx.Rollback()
                return nil, status.Errorf(codes.ResourceExhausted, "库存不足")
            }
            inventory.Stocks -= goodsInfo.Num
            //注意这里gorm在处理零值时,他会自动忽略零值的更新,这里需要使用select强制更新某些字段
            if result := tx.Model(&model.Inventory{}).Select("Stocks", "Version").Where("goods = ? and version = ?",
                goodsInfo.GoodsId, inventory.Version).Updates(model.Inventory{Stocks: inventory.Stocks, Version: inventory.Version + 1}); result.RowsAffected == 0 {
                zap.S().Info("库存扣减失败")
            } else {
                break
            }
        }
    }
    tx.Commit()
    return &empty.Empty{}, nil
}

基于redis的分布式锁

原理:

分布式锁

setnx命令

Redis Setnx(SET if Not eXists) 命令在指定的 key 不存在时,为 key 设置指定的值。

语法

redis Setnx 命令基本语法如下:

redis 127.0.0.1:6379> SETNX KEY_NAME VALUE
返回值

设置成功,返回 1 。 设置失败,返回 0 。

分布式锁

  1. setnx的作用
    将获取和设置值变成原子性的操作
  2. 如果我的服务挂掉了- 死锁
    1. 设置过期时间
    2. 如果你设置了过期时间,那么如果过期时间到了我的业务逻辑没有执行完怎么办?
      1. 在过期之前刷新一下
      2. 需要自己去启动协程完成延时的工作
        1. 延时的接口可能会带来负面影响 - 如果其中某一个服务hung住了, 2s就能执行完,但是你hung住那么你就会一直去申请延长锁,导致别人永远获取不到锁,这个很要命
  3. 分布锁需要解决的问题 - lua脚本去做
    1. 互斥性 - setnx
    2. 死锁
    3. 安全性
      1. 锁只能被持有该锁的用户删除,不能被其他用户删除
        1. 当时设置的value值是多少只有当时的g才能知道
        2. 在删除的时取出redis中的值和当前自己保存下来的值对比一下

这样我们使用setnx就可以完成原子操作了

下面来看看如何使用redisync

redisync
package main

import (
    "fmt"
    "sync"
    "time"

    goredislib "github.com/go-redis/redis/v8"
    "github.com/go-redsync/redsync/v4"
    "github.com/go-redsync/redsync/v4/redis/goredis/v8"
)

func main() {
    // Create a pool with go-redis (or redigo) which is the pool redisync will
    // use while communicating with Redis. This can also be any pool that
    // implements the `redis.Pool` interface.
    client := goredislib.NewClient(&goredislib.Options{
        Addr: "localhost:6379",
    })
    pool := goredis.NewPool(client) // or, pool := redigo.NewPool(...)

    // Create an instance of redisync to be used to obtain a mutual exclusion
    // lock.
    rs := redsync.New(pool)

    var wg sync.WaitGroup
    wg.Add(3)
    for i := 0; i < 3; i++ {
        go func() {
            defer wg.Done()
            mutexname := fmt.Sprintf("mytest_%s", i)
            mutex := rs.NewMutex(mutexname)
            if err := mutex.Lock(); err != nil {
                panic(err)
            }
            fmt.Printf("获取锁成功\n")

            time.Sleep(time.Second * 1)
            fmt.Printf("执行结束\n")

            if ok, err := mutex.Unlock(); !ok || err != nil {
                panic("unlock failed")
            }
            fmt.Printf("释放锁成功\n")
        }()
    }
    wg.Wait()
}
在库存服务中的应用:
//Sell 扣减库存,涉及事务逻辑,执行的逻辑必须全部成功或者全部失败并且失败后数据可恢复,不能中途失败
func (i *InventoryServer) Sell(ctx context.Context, req *proto.SellInfo) (*empty.Empty, error) {

    //并发情况下可能会出现超买,需要使用锁来将并发串行化
  //将数据库作为事务性
    tx := global.DB.Begin()
    var mutexs []*redsync.Mutex
    for _, goodsInfo := range req.GoodsInfo {

        var inventory model.Inventory
        mutex := global.Rs.NewMutex(fmt.Sprintf("goods_%d", goodsInfo.GoodsId))

        if err := mutex.Lock(); err != nil {
            return nil, status.Errorf(codes.Internal, "获取redis分布式锁异常")
        }

        if result := global.DB.Where(&model.Inventory{Goods: goodsInfo.GoodsId}).First(&inventory); result.RowsAffected == 0 {
            //失败进行事务回滚
            tx.Rollback()
            return nil, status.Errorf(codes.InvalidArgument, "库存信息不存在")
        }

        if inventory.Stocks < goodsInfo.Num {
            //失败进行事务回滚
            tx.Rollback()
            return nil, status.Errorf(codes.ResourceExhausted, "库存不足")
        }
        inventory.Stocks -= goodsInfo.Num
        tx.Save(&inventory)

        mutexs = append(mutexs, mutex)

        //if ok, err := mutex.Unlock(); !ok || err != nil {
        //    return nil, status.Errorf(codes.Internal, "释放redis分布式锁异常")
        //}

    }
    tx.Commit()

    for _, mutex := range mutexs {
        if ok, err := mutex.Unlock(); !ok || err != nil {
            return nil, status.Errorf(codes.Internal, "释放redis分布式锁异常")
        }
    }
    return &empty.Empty{}, nil
}
本作品采用《CC 协议》,转载必须注明作者和本文链接
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!