Redis 分布式锁实现 (一)

什么是分布式锁

分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁

一般至少要满足如下几点

  • 多进程可见
  • 互斥
  • 高可用
  • 高性能
  • 安全性

分布式锁的实现

分布式锁的核心是实现多进程之间互斥,而满足这一点的方式有很多,常见的有三种:

MySQL Redis Zookeeper
互斥 利用 mysql 本身的互斥锁机制,事务机制 利用 setnx 这样的互斥命令 利用节点的唯一性和有序性实现互斥
高可用
高性能 一般 一般
安全性 断开连接,自动释放锁 利用超时时间,到期释放锁 临时节点,断开连接自动释放

基于 Redis 的分布式锁

实现分布式锁时需要实现两个基本方法:

  • 获取锁:

    • 互斥:确保只有一个线程获取锁
      # 添加锁,利用 setnx 的互斥特性
      SETNX lock thread1
      # 添加过期时间,避免服务宕机引起的死锁
      EXPIRE lock 10
      # 在 SETNXEXPIRE 之间可能服务宕机,替换为原子操作
      SET lock thread EX 10 NX
  • 释放锁:

    • 手动释放锁
    • 超时释放:获取锁时添加一个超时时间
      # 释放锁,删除即可
      DEL key

基于 Redis 实现分布式锁(初级版本)

实现非阻塞分布式锁,流程大致如下

Redis 分布式锁实现 (一)

代码实现

定义锁接口

public interface ILock {

    /**
     * 尝试获取锁
     * @param timeoutSec 锁持有的超时时间,过期后自动释放
     * @return true 代表获取锁成功;false 代表锁获取失败
     */
    boolean tryLock(long timeoutSec);

    /**
     * 释放锁
     */
    void unlock();
}

简单示例

在分布式服务下,用户可能会进行多次的下单操作。

public class SimpleRedisLock implements ILock {

    private String name;
    private StringRedisTemplate redisTemplate;

    public static final String KEY_PREFIX = "lock:";

    public SimpleRedisLock(String name, StringRedisTemplate redisTemplate) {
        this.name = name;
        this.redisTemplate = redisTemplate;
    }

    @Override
    public boolean tryLock(long timeoutSec) {
        long threadId = Thread.currentThread().getId();
        // 获取锁
        Boolean success = redisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId + "", timeoutSec, TimeUnit.SECONDS);
        // 由于这里会有自动拆箱操作,可能 success 为 null
        return Boolean.TRUE.equals(success);
    }

    @Override
    public void unlock() {
        // 释放锁
        redisTemplate.delete(KEY_PREFIX + name);
    }
}
@Service
public class OrderServiceImpl implements OrderService {

    @Autowired
    private StringRedisTemplate redisTemplate;

    @Override
    public void placeAndOrder(Integer userId) {
        // 创建锁对象
        SimpleRedisLock simpleRedisLock = new SimpleRedisLock("order" + userId, redisTemplate);

        // 获取锁
        boolean isLock = simpleRedisLock.tryLock(5);

        // 判断是否获取锁成功
        if (!isLock) {
            // 获取锁失败,返回错误重试
            System.out.println("不允许重复下单!");
        }
        System.out.println("下单成功!");
        // 释放锁
        simpleRedisLock.unlock();
    }
}

存在问题问题

Redis 分布式锁实现 (一)

问题:假如现在有两个进程,进程1和进程2,进程1拿到了 redis 锁,并且设置了锁自动释放时间,如果进程1此时由于各种原因,导致业务完成时间超过了锁的超时释放时间,此时进程2拿到锁之后,进行业务的执行,然后进程1此时业务完成,进行锁释放,也就是进程1将进程2的锁释放了,此时如果有进程3来获取锁,就能直接获取到锁执行自己的业务。这样就导致同时有两个进程获取到锁并且执行业务,就有可能导致线程安全问题。

该进为

Redis 分布式锁实现 (一)

一、改进

1、在获取锁时,存入线程标识(可以用 UUID 表示)
2、在释放锁时先获取锁中的线程标识,判断是否与当前线程标识一致,如果一致则释放锁,否则不释放

流程大致如下

Redis 分布式锁实现 (一)

代码实现

public class SimpleRedisLock implements ILock {

    private String name;
    private StringRedisTemplate redisTemplate;

    public static final String KEY_PREFIX = "lock:";
    public static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";

    public SimpleRedisLock(String name, StringRedisTemplate redisTemplate) {
        this.name = name;
        this.redisTemplate = redisTemplate;
    }

    @Override
    public boolean tryLock(long timeoutSec) {
        // 获取线程标识
        String threadId = ID_PREFIX + Thread.currentThread().getId();
        // 获取锁
        Boolean success = redisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
        // 由于这里会有自动拆箱操作,可能 success 为 null
        return Boolean.TRUE.equals(success);
    }

    @Override
    public void unlock() {
        // 获取线程标识
        String threadId = ID_PREFIX + Thread.currentThread().getId();
        //判断标识是否一致
        String id = redisTemplate.opsForValue().get(KEY_PREFIX + name);
        if (threadId.equals(id)) {
            // 释放锁
            redisTemplate.delete(KEY_PREFIX + name);
        }
    }
}

存在问题

Redis 分布式锁实现 (一)

在极端情况下,由于在判断锁是否时自己的锁与释放锁时两个步骤,如果在此之间阻塞了,并且锁超时释放了,此时进程2拿到锁,进行业务的执行,在进程2执行期间,进程1阻塞恢复,由于之前已经通过了锁判断,此时直接释放了锁,锁误删,就会导致并发异常

二、改进

需要将线程标识判断与释放锁这两个操作变成一个原子性操作。

Redis 的 Lua 脚本

Redis 提供了 Lua 脚本功能,一个脚本中编写多条 Redis 命令,确保多条命令执行时的原子性。语法如下:

例如 先执行 set name HuDu,再执行 get name,具体脚本如下

# 先执行 set name HuDu
redis.call('set','name','HuDu')
# 再执行 get name
local name = redis.call('get','name')
# 返回
return name
127.0.0.1:6379> help @scripting

  EVAL script numkeys key [key ...] arg [arg ...]
  summary: Execute a Lua script server side
  since: 2.6.0

例如要执行redis.call('set','name','HuDu')这个脚本,语法如下

# 调用脚本,0 表示需要的 key 类型的参数个数,因为有些命令是可以传多个 key 例如 mset 等
127.0.0.1:6379> EVAL "return redis.call('set','name','HuDu')" 0
OK
127.0.0.1:6379> get name
"HuDu"

如果脚本中的 key、value 不想写死,可以作为参数传递。key 类型参数会放入 KEYS 数组,其它参数会放入 ARGV 数组,在脚本中可以从 KEYS 和 ARGV 数组获取这些参数:

127.0.0.1:6379> EVAL "return redis.call('set',KEYS[1],ARGV[1])" 1 name HuDu
OK
127.0.0.1:6379> get name
"HuDu"

Lua 脚本内容如下

-- 获取锁中线程标识 get key,锁的key KEYS[1],当前线程标识 ARGV[1]
-- 比较线程与锁中的标识是否一致
if(redis.call('get',KEYS[1]) == ARGV[1]) then
    -- 释放锁 del key
    return redis.call('del',KEYS[1])
end
-- 不一致,直接返回
return 0

Java 调用 Lua 脚本改造分布式锁

在项目resource目录下创建 Lua 脚本,unlock.lua

public class SimpleRedisLock implements ILock {

    private String name;
    private StringRedisTemplate redisTemplate;

    public static final String KEY_PREFIX = "lock:";
    public static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";
    public static final DefaultRedisScript<Long> UNLOCK_SCRIPT;

    static {
        UNLOCK_SCRIPT = new DefaultRedisScript<>();
        UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
        UNLOCK_SCRIPT.setResultType(Long.class);
    }

    public SimpleRedisLock(String name, StringRedisTemplate redisTemplate) {
        this.name = name;
        this.redisTemplate = redisTemplate;
    }

    @Override
    public boolean tryLock(long timeoutSec) {
        // 获取线程标识
        String threadId = ID_PREFIX + Thread.currentThread().getId();
        // 获取锁
        Boolean success = redisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
        // 由于这里会有自动拆箱操作,可能 success 为 null
        return Boolean.TRUE.equals(success);
    }

    @Override
    public void unlock() {
        // 调用 lua 脚本
        redisTemplate.execute(UNLOCK_SCRIPT, Collections.singletonList(KEY_PREFIX + name),ID_PREFIX + Thread.currentThread().getId());
    }
}

总结

基于 Redis 的分布式锁实现思路:

  • 利用set nx ex获取锁,并设置过期时间,保存线程标识
  • 释放锁时先判断线程标识是否和自己的一致,一致删除锁

特性:

  • 利用set nx满足互斥
  • 利用set nx保证故障时锁依然能释放,避免死锁,提高安全性
  • 利用Redis集群保证高可用和高并发特性
本作品采用《CC 协议》,转载必须注明作者和本文链接
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!