redis应用系列一:分布式锁正确实现姿势

实现分布式锁常见有三种实现方式:
1.基于数据库
2.基于缓存(redis)分布式锁,
3.基于Zookeeper实现分布式锁
以下是他们在可靠性、性能、复杂性三个维度的对比

评判维度 比较
可靠性 Zookeeper > 缓存 > 数据库
性能 缓存 > Zookeeper >= 数据库
复杂性 Zookeeper >= 缓存 > 数据库

由于redis高性能,在许多密集型的业务场景中是运用最多,因此以下介绍基于redis分布式锁的实现

分析

Why

  • 安全性(互斥性):在任意时刻,当且仅当只有一个客户端能持有锁
  • 活性A(无死锁):即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁
  • 同一性:加锁和解锁必须保证为同一个客户端
  • 活性B(容错性):只要大部分的Redis节点正常运行,客户端就可以加锁和解锁

what

  • 时间维度保证数据强一致性

When

  • 存在竞争(库存竞争,工单/任务竞争)

Where

  • 抢购
  • 秒杀
  • 抢单
  • 派单
  • 库存

Who

  • 库存竞争:给标识库存的唯一属性加锁作为key
  • 工单 / 任务竞争:给工单 / 任务 加锁作为key

How

  • 没锁可以加锁
  • 有锁加锁失败
  • 给锁设置过期时间
  • 解锁和加锁是同一个用户

How much

  • 一条指令

How feel

  • 乐观锁
  • 悲观锁

常见加锁方式

示例1

public function lock($lockKey, $requestId, $expireTime)
    {
        $redis  = Redis::connection();
        $result = $redis->setnx($lockKey, $requestId);
        if ($result) {
            // 若在这里程序突然崩溃,则无法设置过期时间,将发生死锁
            $redis->expire($lockKey, $expireTime);
        }
    }

此处乍一看这种方式并没有什么问题,
But由于是两条redis命令,So不具有原子性;
试想如果程序在执行完第一句setnx命令之后突然挂掉,那么会发生死锁,和设计原则相违背。
因此不是最优解

示例2

    public function lock2($lockKey, $requestId, $expireTime)
    {
        $expires = microtime(true) + $expireTime;
        $redis   = Redis::connection();
        // 如果当前锁不存在,返回加锁成功
        $result = $redis->setnx($lockKey, microtime(true));
        if ($result) {
            return true;
        }
        // 如果锁存在,获取锁的过期时间
        $currenExpires = $redis->get($lockKey);
        if ($currenExpires && $currenExpires < microtime(true)) {
            // 锁已过期,获取上一个锁的过期时间,并设置现在锁的过期时间
            $oldExpires = $redis->getset($lockKey, $expires);
            if ($oldExpires && $oldExpires == $currenExpires) {
                // 考虑多线程并发的情况,只有一个线程的设置值和当前值相同,它才有权利加锁
                return true;
            }
        }

        // 其他情况,一律返回加锁失败
        return false;
    }

那么这段代码问题在哪里?

  1. 由于是客户端自己生成过期时间,所以需要强制要求分布式下每个客户端的时间必须同步;
  2. 当锁过期的时候,如果多个客户端同时执行getset方法,那么虽然最终只有一个客户端可以加锁,但是这个客户端的锁的过期时间可能被其他客户端覆盖;
  3. 锁不具备拥有者标识,即任何客户端都可以解锁。
    因此此锁安全性没法保证,不满足设计原则第一条

示例3

    /**
     * @param $lockKey 锁
     * @param $requestId 请求标识
     * @param $expireTime 超期时间
     * @return bool
     */
    public function lock3($lockKey, $requestId, $expireTime)
    {
        $ret = Redis::set($lockKey, $requestId, 'PX', $expireTime, 'NX');
        if ($ret) {
            return true;
        }
        return false;
    }

此锁既满足了安全性,又有活性,并且满足同一性(解锁中体现),同时实现简单,是一种最优解

常见解锁方式

示例1

    public function releaseLock($lockKey)
    {
        $redis  = Redis::connection();
        $redis->del($lockKey);
    }

这种不先判断锁的拥有者而直接解锁的方式,会导致任何客户端都可以随时进行解锁,即使这把锁不是它的

示例2

    public function releaseLock1($lockKey, $requestId)
    {
        $redis  = Redis::connection();
        $result = $redis->get($lockKey);
        // 判断加锁与解锁是不是同一个客户端
        if ($result == $requestId) {
            // 若在此时,这把锁突然不是这个客户端的,则会误解锁
            $redis->del($lockKey);
        }
    }

这种解锁方法没有多大毛病,但是存在一个问题,有误删锁的可能性
比如A客户端加锁,执行一段事件后进行解锁操作,在执行del锁之前锁过期,这时候客户端B加锁成功,接着客户端A执行del锁就会将客户端B的锁删除,没有保证同一性

示例3

    public function releaseLock13($lockKey, $requestId)
    {
        $luaScript = <<<EOF
if redis.call("get",KEYS[1]) == ARGV[1]
then
    return redis.call("del",KEYS[1])
else
    return 0
end
EOF;
        // 利用lua脚本,保证原子性
        $res = Redis::eval($luaScript, 1, $lockKey, $requestId);
        if ($res) {
            return true;
        }
        return false;
    }

此种方法利用lua脚本,保证原子性,是一种最优解

完整实现

trait RedisMutexLock{

    /**
     * 获取分布式锁(加锁)
     * @param lockKey 锁key
     * @param requestId 客户端请求标识
     * @param expireTime 超期时间,毫秒,默认15s
     * @param isNegtive 是否是悲观锁,默认否
     * @return 是否获取成功
     */
    public function tryGetDistributedLock($lockKey, $requestId, $expireTime = 15000, $isNegtive = false)
    {
        if ($isNegtive) {//悲观锁
            /**
             * 悲观锁 循环阻塞式锁取,阻塞时间为2s
             */
            $endtime = microtime(true) * 1000 + $this->acquireTimeout * 1000;
            while (microtime(true) * 1000 < $endtime) { //每隔一段时间尝试获取一次锁
                $acquired = Redis::set($lockKey, $requestId, 'PX', $expireTime, 'NX');
                if ($acquired) { //获取锁成功,返回true
                    return true;
                }
                usleep(100);
            }
            //获取锁超时,返回false
            return false;

        } else {//乐观锁
            /**
             * 乐观锁只尝试一次,成功返回true,失败返回false
             */
            $ret = Redis::set($lockKey, $requestId, 'PX', $expireTime, 'NX');
            if ($ret) {
                return true;
            }
            return false;
        }
    }

    /**
     * 解锁
     * @param $lockKey 锁key
     * @param $requestId 客户端请求唯一标识
     */
    public function releaseDistributedLock($lockKey, $requestId)
    {
        $luaScript = <<<EOF
if redis.call("get",KEYS[1]) == ARGV[1]
then
    return redis.call("del",KEYS[1])
else
    return 0
end
EOF;
        $res       = Redis::eval($luaScript, 1, $lockKey, $requestId);
        if ($res) {
            return true;
        }
        return false;
    }
}

使用

use RedisMutexLock;

public function __construct()
{
    define("REQUEST_ID", md5(uniqid(env('APP_NAME'), true)) . rand(10000, 99999));
    $this->requestId = $_SERVER['x_request_id'] ?? REQUEST_ID;
}

// 抢单
public function addOrder()
{
    // 订单加锁
    $lock = $this->tryGetDistributedLock($this->redisOrderKey, $this->requestId);
    if (!$lock) {
        return ['error' => 1900001];
    }
    try {
        // TODO 处理业务
    } catch (\Exception $e) {
        // 异常处理
    } finally {
        // 处理完释放锁
        $this->releaseDistributedLock($this->redisOrderKey, $this->requestId);
    }
}
本作品采用《CC 协议》,转载必须注明作者和本文链接
本帖由系统于 3周前 自动加精
《L03 构架 API 服务器》
你将学到如 RESTFul 设计风格、PostMan 的使用、OAuth 流程,JWT 概念及使用 和 API 开发相关的进阶知识。
《L04 微信小程序从零到发布》
从小程序个人账户申请开始,带你一步步进行开发一个微信小程序,直到提交微信控制台上线发布。
讨论数量: 11

@PHPer技术栈 我把redis官网中推荐实现方法搬过来了, :smile:

3周前 评论
游离不2
// 抢单
public function addOrder()
{
    // 订单加锁
    $lock = $this->tryGetDistributedLock($this->redisOrderKey, $this->requestId);
    if (!$lock) {
        return ['error' => 1900001];
    }
    try {
        // TODO 处理业务
    } catch (\Exception $e) {
        // 异常处理
    } finally {
        // 处理完释放锁
        $this->releaseDistributedLock($this->redisOrderKey, $this->requestId);
    }
}

这样释放锁是不是可以简单写

3周前 评论

@游离不2 good :+1:这样写更加干净了,说实话我之前不知道还有这种写法哈,感谢感谢,我改进下

3周前 评论

向DB一样写成闭包一样会不会更好

3周前 评论

@thus 道行不够,没明白啥意思? :joy:具体在哪个位置?

3周前 评论
thus 3周前

@mantou_1 此处你把两个概念搞混淆了:lockKey和lockey对应的value:

  • AB客户端的lockKey都是同一个,如果不是同一个,就没有加锁的必要了(因为多个客户端不存在资源竞争)
  • 但是同一个lockey可以由不同客户端进行操作,所以lockkey对应的value不相同,此实现中lockey对应value为客户端请求标识(当然可以为随机数,只要能确认不同请求就行)
2周前 评论
mantou_1 2周前
笨小孩 (作者) (楼主) 2周前

想问问为什么乐观锁获取一次失败就返回

1周前 评论

@MONKEYG 此处乐观锁和悲观锁只是个业务叫法,影响的是用户体验。 乐观锁:总是很乐观的认为锁立马就能拿到,因此得到结果立马返回 此处关于乐观锁和悲观锁概念你可以百度下数据库对这两个概念解释说明,理解起来就比较容易

1周前 评论

处理业务过程中锁过期了怎么办?

1周前 评论

@hua1234 这种场景下得单独起个线程,每隔一段时间watch这个key是否存在,如果存在,就延长锁的失效时间,打算后续也出一个这个方面文章。感谢提供思路 :pray:

1周前 评论

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!