浅谈并发加锁
出现的问题
在我们的工作中,常常会出现一些对数量控制有精确要求的需求,比如商品库存量、奖品数量、报名人数限制等等,这些应用场景往往都存在高并发可能,比较容易出现数据量超量问题。以下做一下示例探索:
首先设计一个存量表
CREATE TABLE `product` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`product_name` varchar(255) NOT NULL DEFAULT '',
`count` int(10) NOT NULL DEFAULT '0',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4;
添加一行数据如下,设定基础库存量为10
问题代码如下:
$process_num = 50; //开50个进程,模拟50个用户
for ($i = 0; $i < $process_num; $i++) {
MultiProcessHelper::instance($process_num)->multiProcessTask(function () use ($i) {
if (Db::name('product')->where('id', 1)->value('count') > 0) {
$res = Db::name('product')->where('id', 1)->setDec('count');
if ($res) {
dump('获取到更新资源权限:' . $i);
}
}
});
}
执行结果,50个用户都获取到了更新资源的权限
数据库相应数据存量变成了-40
这显然不是我们所期待的,这就是高并发带来的问题,同一时刻有多个进程读取同一条数据,同一时刻有多个进程更新同一条数据
解决问题
数据库自有的锁机制解决
- 要进行DML层面的限制(最后关卡安全,报错总比出现数据问题产生的影响小),主要的修改是将count的类型改成了无符号整数,这样该值就不可能再出现负数值
CREATE TABLE `product` ( `id` int(11) NOT NULL AUTO_INCREMENT, `product_name` varchar(255) NOT NULL DEFAULT '', `count` int(10) unsigned NOT NULL DEFAULT '0', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4;
执行一下代码,当count值从10减到0时,就不能再减少了,再减就会出现数据库报错
- mysql提供的行级锁select … lock in share mode(阻塞写),select … for update(阻塞读写,悲观锁),所以for update机制能满足我们的原子要求。编辑代码如下:
$process_num = 50; //开50个进程,模拟50个用户
for ($i = 0; $i < $process_num; $i++) {
MultiProcessHelper::instance($process_num)->multiProcessTask(function () use ($i) {
Db::startTrans(); //行级锁必须在事务中才能生效
//设置for update,进程会阻塞在这里,只能允许一个进程获取到行锁,其他等待获取
if (Db::name('product')->where('id', 1)->lock('for update')->value('count') > 0) {
$res = Db::name('product')->where('id', 1)->setDec('count');
if ($res) {
dump('获取到更新资源权限:' . $i);
}
}
Db::commit();
});
}
只有十个进程获取到了更新权限,消费正常
- 将条件语句放到update上,保持语句执行的原子性,杜绝并发幻读
修改代码如下:$process_num = 50; //开50个进程,模拟50个用户 for ($i = 0; $i < $process_num; $i++) { MultiProcessHelper::instance($process_num)->multiProcessTask(function () use ($i) { //合并两条语句为一条更新语句 $res = Db::name('product')->where('id', 1)->where('count', '>', 0)->setDec('count'); if ($res) { dump('获取到更新资源权限:' . $i); } }); }
只有十个进程获取到了更新权限,消费正常
文件锁机制解决
编辑代码如下:
$process_num = 50; //开50个进程,模拟50个用户
for ($i = 0; $i < $process_num; $i++) {
MultiProcessHelper::instance($process_num)->multiProcessTask(function () use ($i) {
$filename = app()->getRootPath() . 'runtime/lock';
$file = fopen($filename, 'w'); //打开文件
$lock = flock($file, LOCK_EX);
// $lock=flock($handle, LOCK_EX|LOCK_NB); (异步非阻塞,所有进程如果出现获取不到锁,不等待跳过,加锁失败)
//获取文件排他锁:LOCK_EX(异步阻塞,只有一个进程获得锁,其他竞争进程等待)
//还有一种共享锁:LOCK_SH(所有进程都可以获取共享锁,读取文件,当且只有一个锁时,才允许写操作,否则操作失败,容易出现死锁问题)
if ($lock) {
try {
if (Db::name('product')->where('id', 1)->lock('for update')->value('count') > 0) {
$res = Db::name('product')->where('id', 1)->setDec('count');
if ($res) {
dump('获取到更新资源权限:' . $i);
}
}
} catch (\Exception $e) {
dump($e->getMessage());
} finally {
flock($file, LOCK_UN); //无论如何都要释放锁
}
}
fclose($file); //关闭文件句柄
});
}
只有十个进程获取到了更新权限,消费正常
分布式锁机制解决
以上文件锁,只适应于单体架构的需求,在集群架构、分布式等多机联网结构中就是掩耳盗铃了,所以适应性更好地锁机制还是要使用分布式锁,分布式锁最常用和最易用就是redis的setnx锁了。
编辑代码如下:
$process_num = 50; //开50个进程,模拟50个用户
for ($i = 0; $i < $process_num; $i++) {
MultiProcessHelper::instance($process_num)->multiProcessTask(function () use ($i) {
//获取redis锁
//关于CacheHelper::getRedisLock是怎样获取锁的,注意几个点就行:1.如何避免死锁;2.如何设置过期时间;3.如何设置抢占条件;4.如何循环等待判断。这些不在本文讨论范围,可自行研究,以后有空我也可以写一篇博文
$lock = CacheHelper::getRedisLock('redis_lock');
if ($lock) {
try {
if (Db::name('product')->where('id', 1)->lock('for update')->value('count') > 0) {
$res = Db::name('product')->where('id', 1)->setDec('count');
if ($res) {
dump('获取到更新资源权限:' . $i);
}
}
} catch (\Exception $e) {
dump($e->getMessage());
}
} else {
// dump('获取redis锁失败');
}
});
}
只有十个进程获取到了更新权限,消费正常
总结
加锁并不是最好的解决方案,只能达到数据安全的要求,同时性能会很大的损耗,可能出现死锁或者请求长期等待返回5XX的问题,要根据场景和需求来做选择和设计。
本作品采用《CC 协议》,转载必须注明作者和本文链接
enjoy it
推荐文章: