Redis 分布式锁实现 (一)
什么是分布式锁#
分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁
一般至少要满足如下几点
- 多进程可见
- 互斥
- 高可用
- 高性能
- 安全性
分布式锁的实现#
分布式锁的核心是实现多进程之间互斥,而满足这一点的方式有很多,常见的有三种:
MySQL | Redis | Zookeeper | |
---|---|---|---|
互斥 | 利用 mysql 本身的互斥锁机制,事务机制 | 利用 setnx 这样的互斥命令 | 利用节点的唯一性和有序性实现互斥 |
高可用 | 好 | 好 | 好 |
高性能 | 一般 | 好 | 一般 |
安全性 | 断开连接,自动释放锁 | 利用超时时间,到期释放锁 | 临时节点,断开连接自动释放 |
基于 Redis 的分布式锁#
实现分布式锁时需要实现两个基本方法:
获取锁:
- 互斥:确保只有一个线程获取锁
# 添加锁,利用 setnx 的互斥特性 SETNX lock thread1 # 添加过期时间,避免服务宕机引起的死锁 EXPIRE lock 10 # 在 SETNX 和 EXPIRE 之间可能服务宕机,替换为原子操作 SET lock thread EX 10 NX
- 互斥:确保只有一个线程获取锁
释放锁:
- 手动释放锁
- 超时释放:获取锁时添加一个超时时间
# 释放锁,删除即可 DEL key
基于 Redis 实现分布式锁(初级版本)#
实现非阻塞分布式锁,流程大致如下
代码实现#
定义锁接口#
public interface ILock {
/**
* 尝试获取锁
* @param timeoutSec 锁持有的超时时间,过期后自动释放
* @return true 代表获取锁成功;false 代表锁获取失败
*/
boolean tryLock(long timeoutSec);
/**
* 释放锁
*/
void unlock();
}
简单示例#
在分布式服务下,用户可能会进行多次的下单操作。
public class SimpleRedisLock implements ILock {
private String name;
private StringRedisTemplate redisTemplate;
public static final String KEY_PREFIX = "lock:";
public SimpleRedisLock(String name, StringRedisTemplate redisTemplate) {
this.name = name;
this.redisTemplate = redisTemplate;
}
@Override
public boolean tryLock(long timeoutSec) {
long threadId = Thread.currentThread().getId();
// 获取锁
Boolean success = redisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId + "", timeoutSec, TimeUnit.SECONDS);
// 由于这里会有自动拆箱操作,可能 success 为 null
return Boolean.TRUE.equals(success);
}
@Override
public void unlock() {
// 释放锁
redisTemplate.delete(KEY_PREFIX + name);
}
}
@Service
public class OrderServiceImpl implements OrderService {
@Autowired
private StringRedisTemplate redisTemplate;
@Override
public void placeAndOrder(Integer userId) {
// 创建锁对象
SimpleRedisLock simpleRedisLock = new SimpleRedisLock("order" + userId, redisTemplate);
// 获取锁
boolean isLock = simpleRedisLock.tryLock(5);
// 判断是否获取锁成功
if (!isLock) {
// 获取锁失败,返回错误重试
System.out.println("不允许重复下单!");
}
System.out.println("下单成功!");
// 释放锁
simpleRedisLock.unlock();
}
}
存在问题问题#
问题:假如现在有两个进程,进程 1 和进程 2,进程 1 拿到了 redis 锁,并且设置了锁自动释放时间,如果进程 1 此时由于各种原因,导致业务完成时间超过了锁的超时释放时间,此时进程 2 拿到锁之后,进行业务的执行,然后进程 1 此时业务完成,进行锁释放,也就是进程 1 将进程 2 的锁释放了,此时如果有进程 3 来获取锁,就能直接获取到锁执行自己的业务。这样就导致同时有两个进程获取到锁并且执行业务,就有可能导致线程安全问题。
该进为
一、改进#
1、在获取锁时,存入线程标识(可以用 UUID 表示)
2、在释放锁时先获取锁中的线程标识,判断是否与当前线程标识一致,如果一致则释放锁,否则不释放
流程大致如下#
代码实现#
public class SimpleRedisLock implements ILock {
private String name;
private StringRedisTemplate redisTemplate;
public static final String KEY_PREFIX = "lock:";
public static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";
public SimpleRedisLock(String name, StringRedisTemplate redisTemplate) {
this.name = name;
this.redisTemplate = redisTemplate;
}
@Override
public boolean tryLock(long timeoutSec) {
// 获取线程标识
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 获取锁
Boolean success = redisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
// 由于这里会有自动拆箱操作,可能 success 为 null
return Boolean.TRUE.equals(success);
}
@Override
public void unlock() {
// 获取线程标识
String threadId = ID_PREFIX + Thread.currentThread().getId();
//判断标识是否一致
String id = redisTemplate.opsForValue().get(KEY_PREFIX + name);
if (threadId.equals(id)) {
// 释放锁
redisTemplate.delete(KEY_PREFIX + name);
}
}
}
存在问题#
在极端情况下,由于在判断锁是否时自己的锁与释放锁时两个步骤,如果在此之间阻塞了,并且锁超时释放了,此时进程 2 拿到锁,进行业务的执行,在进程 2 执行期间,进程 1 阻塞恢复,由于之前已经通过了锁判断,此时直接释放了锁,锁误删,就会导致并发异常
二、改进#
需要将线程标识判断与释放锁这两个操作变成一个原子性操作。
Redis 的 Lua 脚本#
Redis 提供了 Lua 脚本功能,一个脚本中编写多条 Redis 命令,确保多条命令执行时的原子性。语法如下:
例如 先执行 set name HuDu,再执行 get name,具体脚本如下
# 先执行 set name HuDu
redis.call('set','name','HuDu')
# 再执行 get name
local name = redis.call('get','name')
# 返回
return name
127.0.0.1:6379> help @scripting
EVAL script numkeys key [key ...] arg [arg ...]
summary: Execute a Lua script server side
since: 2.6.0
例如要执行 redis.call('set','name','HuDu')
这个脚本,语法如下
# 调用脚本,0 表示需要的 key 类型的参数个数,因为有些命令是可以传多个 key 例如 mset 等
127.0.0.1:6379> EVAL "return redis.call('set','name','HuDu')" 0
OK
127.0.0.1:6379> get name
"HuDu"
如果脚本中的 key、value 不想写死,可以作为参数传递。key 类型参数会放入 KEYS 数组,其它参数会放入 ARGV 数组,在脚本中可以从 KEYS 和 ARGV 数组获取这些参数:
127.0.0.1:6379> EVAL "return redis.call('set',KEYS[1],ARGV[1])" 1 name HuDu
OK
127.0.0.1:6379> get name
"HuDu"
Lua 脚本内容如下
-- 获取锁中线程标识 get key,锁的key KEYS[1],当前线程标识 ARGV[1]
-- 比较线程与锁中的标识是否一致
if(redis.call('get',KEYS[1]) == ARGV[1]) then
-- 释放锁 del key
return redis.call('del',KEYS[1])
end
-- 不一致,直接返回
return 0
Java 调用 Lua 脚本改造分布式锁#
在项目 resource
目录下创建 Lua 脚本,unlock.lua
public class SimpleRedisLock implements ILock {
private String name;
private StringRedisTemplate redisTemplate;
public static final String KEY_PREFIX = "lock:";
public static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";
public static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
static {
UNLOCK_SCRIPT = new DefaultRedisScript<>();
UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
UNLOCK_SCRIPT.setResultType(Long.class);
}
public SimpleRedisLock(String name, StringRedisTemplate redisTemplate) {
this.name = name;
this.redisTemplate = redisTemplate;
}
@Override
public boolean tryLock(long timeoutSec) {
// 获取线程标识
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 获取锁
Boolean success = redisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
// 由于这里会有自动拆箱操作,可能 success 为 null
return Boolean.TRUE.equals(success);
}
@Override
public void unlock() {
// 调用 lua 脚本
redisTemplate.execute(UNLOCK_SCRIPT, Collections.singletonList(KEY_PREFIX + name),ID_PREFIX + Thread.currentThread().getId());
}
}
总结#
基于 Redis 的分布式锁实现思路:
- 利用
set nx ex
获取锁,并设置过期时间,保存线程标识 - 释放锁时先判断线程标识是否和自己的一致,一致删除锁
特性:
- 利用
set nx
满足互斥 - 利用
set nx
保证故障时锁依然能释放,避免死锁,提高安全性 - 利用
Redis
集群保证高可用和高并发特性
本作品采用《CC 协议》,转载必须注明作者和本文链接
博主好, public static final String ID_PREFIX = UUID.randomUUID ().toString (true) + "-"; tryLock 、unlock 操作时,两次获取的 ID_PREFIX 值是不一样的?这样 unlock 时,还能准确找到加锁的线程么?