高并发下的超卖和一人一单
工具
代码框架: Hyperf
数据库:mysql
缓存:redis
压测工具:JMeter
SQL
ku_goods:秒杀商品表
CREATE TABLE `ku_goods` (
`id` int(10) unsigned NOT NULL AUTO_INCREMENT,
`title` varchar(255) NOT NULL COMMENT '商品标题',
`num` int(11) NOT NULL DEFAULT '0' COMMENT '数量',
`begin_time` datetime NOT NULL COMMENT '优惠开始时间',
`created_at` datetime NOT NULL,
`updated_at` datetime NOT NULL ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4;
#创建一个秒杀商品,初始数量为100
INSERT INTO `kuke_user1`.`ku_goods` (`id`, `title`, `num`, `begin_time`, `created_at`, `updated_at`) VALUES (1, '秒杀商品', 100, '2023-06-27 14:17:21', '2023-06-27 14:17:28', '2023-06-28 16:59:32');
ku_order:订单信息
CREATE TABLE `ku_order` (
`id` int(10) unsigned NOT NULL AUTO_INCREMENT,
`user_id` int(11) NOT NULL DEFAULT '0',
`goods_id` int(11) NOT NULL DEFAULT '0',
`created_at` datetime DEFAULT NULL,
`updated_at` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=4082 DEFAULT CHARSET=utf8mb4;
超卖问题
正常逻辑
用户抢购一个商品
/**
* @param Request $request
* @return bool
*/
public function test(Request $request): bool
{
$userId = $request->input("user_id");
//1. 获取商品信息
$goods = Goods::find(1);
if (!$goods) {
return false;
}
//2. 判断是否有库存
if ($goods->num < 1) {
return false;
}
//3. 减库存
$flag = Db::update("update ku_goods set num = num - 1 where id = {$goods->id}");
if (!$flag) {
return false;
}
//4. 添加订单信息
$order = new Order();
$order->user_id = $userId;
$order->goods_id = $goods->id;
$order->save();
return true;
}
按照正常的逻辑,判断商品库存,减少库存,添加订单信息
压测
开启 1000 个线程同时请求这个接口
执行完毕,通过 SQL 查看结果
select count(1) as "生成订单总数" from ku_order;
select num as "商品剩余数量" from ku_goods where id = 1;
可以看到,商品出现超卖,剩余库存为-10,订单生成了 110
出现此问题的原因就在 判断是否有库存 和 减库存之间
,多个线程过来获取到还有剩余库存,然后更新库存,但是在更新库存之前,其它线程已经抢完了,造成超卖
此问题可以使用悲观锁和乐观锁来解决,但是悲观锁会把整个业务逻辑进行加锁,变相为串行执行了,所以考虑使用乐观锁
乐观锁解决超卖问题
乐观锁的概念不在此阐述了,百度一下,此场景下 库存 就可以看做是 version,更新库存的时候,判断下更新的时候的库存是否和查询时候的库存一致
/**
* 乐观锁实现
* @param Request $request
* @return bool
*/
public function test(Request $request): bool
{
$userId = $request->input("user_id");
//1. 获取商品信息
$goods = Goods::find(1);
if (!$goods) {
return false;
}
//2. 判断是否有库存
if ($goods->num < 1) {
return false;
}
//3. 减库存
$flag = Db::update("update ku_goods set num = num - 1 where id = {$goods->id} and num = {$goods->num}");
if (!$flag) {
return false;
}
//4. 添加订单信息
$order = new Order();
$order->user_id = $userId;
$order->goods_id = $goods->id;
$order->save();
return true;
}
此时使用压测后,结果
可以看到,商品未出现超卖,但是有另外的问题,100 个商品竟然都没有强光
此时的原因是 获取商品和更新库存之间,其它线程已经更改了库存,导致此线程更新失败
此时可以稍微变通一下,就可以解决,更新库存时只要 num > 0,都可以更新成功,并创建订单
/**
* 乐观锁实现
* @param Request $request
* @return bool
*/
public function test(Request $request): bool
{
$userId = $request->input("user_id");
//1. 获取商品信息
$goods = Goods::find(1);
if (!$goods) {
return false;
}
//2. 判断是否有库存
if ($goods->num < 1) {
return false;
}
//3. 更新库存,更改 num > 0
$flag = Db::update("update ku_goods set num = num - 1 where id = {$goods->id} and num > 0");
if (!$flag) {
return false;
}
//4. 创建订单
$order = new Order();
$order->user_id = $userId;
$order->goods_id = $goods->id;
$order->save();
return true;
}
从压测结果可以看到,未出现超卖,并且正常抢购完毕
一人一单问题
抢购商品,每个用户限购一单
正常逻辑
/**
* 一人一单
* @param Request $request
* @return bool
*/
public function test1(Request $request): bool
{
$userId = $request->input("user_id");
//1. 获取商品信息
$goods = Goods::find(1);
if (!$goods) {
return false;
}
//2. 判断库存
if ($goods->num < 1) {
return false;
}
//3. 判断当天用户是否购买
$isBuy = Db::select("select id from ku_order where user_id = {$userId} and goods_id = 1 limit 1");
if ($isBuy) {
return false;
}
//4. 减少库存
$flag = Db::update("update ku_goods set num = num - 1 where id = {$goods->id} and num > 0");
if (!$flag) {
return false;
}
//5. 添加订单信息
$order = new Order();
$order->user_id = $userId;
$order->goods_id = $goods->id;
$order->save();
return true;
}
在乐观锁的代码基础之上,添加了用户是否购买的判断,来限定一个用户只能买一个商品
压测结果
可以看到,当一个用户并发请求的时候,会出现一人一单的超卖问题
原因就在于 判断用户是否购买,和 最后添加订单之间有间隔,并发过来时,这个用户的订单信息还没来得及创建,此时是可以正常减库存和创建订单的
Redis setnx 解决
Redis 的 setnx
SETNX key value
将 key 的值设为 value ,当且仅当 key 不存在。
若给定的 key 已经存在,则 SETNX 不做任何动作。
获取锁
设置有效期的目的在于程序意外终止,锁得不到释放,设置有效期保证一定会在未来某个时间失效
使用 lua 脚本来保证 setnx 和 expire 操作是原子性操作
释放锁
判断锁是否还存在
删除锁
代码实现
ILock 接口
<?php
declare(strict_types=1);
namespace App\Tool\Redis;
interface ILock
{
/**
* 获取锁
* @param int $waitTime 当获取锁失败后,在次时间段内尝试重新获取锁
* @param int $expireTime 获取锁成功后,锁的有效期
* @return bool
*/
public function tryLock(int $waitTime, int $expireTime):bool;
/**
* 释放锁
* @return bool
*/
public function unlock():bool;
}
RedisLock 实现
<?php
declare(strict_types=1);
namespace App\Tool\Redis;
use Hyperf\Redis\Exception\InvalidRedisProxyException;
use Hyperf\Redis\Redis;
use Hyperf\Redis\RedisFactory;
use Hyperf\Utils\ApplicationContext;
class RedisLock implements ILock
{
private string $name;
private string $value;
private Redis $redisClient;
public function __construct($name, ?Redis $redisClient = null)
{
$this->name = $name;
$this->value = "1";
$this->redisClient = $this->initRedisClient($redisClient);
}
private function initRedisClient(?Redis $redisClient = null): Redis
{
if (!$redisClient) {
try {
$redisClient = ApplicationContext::getContainer()->get(RedisFactory::class)->get("default");
} catch (\Throwable $e) {
//初始化redis客户端失败
throw new InvalidRedisProxyException("初始化redis客户端失败");
}
}
return $redisClient;
}
/**
* 获取锁
* @param int $waitTime
* @param int $expireTime
* @return bool
*/
public function tryLock(int $waitTime, int $expireTime): bool
{
//秒转换为毫秒
$waitTime = $waitTime * 1000;
$expireTime = $expireTime * 1000;
//获取超时时间
$currentTime = $this->getMillisecond() + $waitTime;
//编写 lua 脚本
$script =<<<SCRIPT
local lockKey = KEYS[1]
local lockValue = ARGV[1]
local lockExpireTime = ARGV[2]
if redis.call('SETNX', lockKey, lockValue) == 1 then
redis.call('PEXPIRE', lockKey, lockExpireTime)
return 1
else
return 0
end
SCRIPT;
do {
$result = $this->redisClient->eval($script, array($this->name, $this->value, $expireTime), 1);
if ($result == 1) {
return true;
}
} while ($this->getMillisecond() < $currentTime);
return false;
}
public function unlock(): bool
{
//编写 lua 脚本
$script =<<<SCRIPT
local lockKey = KEYS[1]
local lockValue = ARGV[1]
if redis.call('get', lockKey) == lockValue then
if redis.call('del', lockKey) == 1 then
return 1
end
end
return 0
SCRIPT;
return (boolean)$this->redisClient->eval($script, array($this->name, $this->value), 1);
}
/**
* 获取当前毫秒
* @return float|int
*/
private function getMillisecond()
{
return microtime(true) * 1000;
}
}
创建对象的时候,传入 redis 的键,并对 redis 访问客户端进行初始化
获取锁的时候使用 do while 结构进行重试获取锁
释放锁的时候直接删除
业务代码
/**
* 一人一单
* @param Request $request
* @return bool
*/
public function test(Request $request): bool
{
$userId = $request->input("user_id");
$goods = Goods::find(1);
if (!$goods) {
return false;
}
if ($goods->num < 1) {
return false;
}
//使用用户id来当做键,保证同个用户并发过来只有一把锁
$redisLock = new RedisLock($userId);
//设置等待时间为0s,就是获取一次锁操作,10s的有效期
if ($redisLock->tryLock(0, 10)) {
try {
$isBuy = Db::select("select id from ku_order where user_id = {$userId} and goods_id = 1 limit 1");
if ($isBuy) {
return false;
}
$flag = Db::update("update ku_goods set num = num - 1 where id = {$goods->id} and num > 0");
if (!$flag) {
return false;
}
$order = new Order();
$order->user_id = $userId;
$order->goods_id = $goods->id;
$order->save();
return true;
} catch (\Throwable $e) {
} finally {
//释放锁
$redisLock->unlock();
}
}
return false;
}
压测结果
可以看到完美解决一人一单问题
但是这种设计还是有其他问题的
释放锁问题: 当线程 1 业务逻辑出现阻塞时,没有在 10s 内解决,这时候锁超时,自动释放,此时线程 2 可以正常获取锁的,线程 2 获取锁之后,线程 1 正常业务执行完毕,释放锁,这时候线程 1 释放的就是 线程 2 的锁
可重入问题:线程 1 获取锁之后执行业务,在业务中有其他方法也要获取锁处理逻辑,但是此时获取锁是失败的,就会等待线程 1 释放,但是线程 1 在等待业务执行结束,造成死锁
public function a(Request $request): bool
{
$userId = $request->input("user_id");
//使用用户id来当做键,保证同个用户并发过来只有一把锁
$redisLock = new RedisLock($userId);
if ($redisLock->tryLock(0, 10)) {
try {
//处理业务
$this->b($userId);
} catch (\Throwable $e) {
} finally {
$redisLock->unlock();
}
}
return false;
}
private function b($userId)
{
//使用用户id来当做键,保证同个用户并发过来只有一把锁
$redisLock = new RedisLock($userId);
if ($redisLock->tryLock(60, 10)) {
try {
//处理业务
} catch (\Throwable $e) {
} finally {
$redisLock->unlock();
}
}
}
Redis Hash 解决
针对第一个问题可以使用在保存的时候把当前线程 id 保存到 redis 的 value 里面,当删除的时候,判断是否是当前线程,是的话就删除。
针对第二个问题,借鉴 Java 里面的 Redisson 组件的实现方案,通过 redis 的 hash 结构存储,只要在一个线程内获取锁,就对 value+1,释放锁 value-1,当释放到最外层,value = 0,此时删除锁
获取锁
local lockKey = KEYS[1]
local lockField = ARGV[1]
local lockExpireTime = ARGV[2]
if redis.call('exists', lockKey) == 0 then
-- redis 里面不存在,设置 hash 信息
redis.call('hincrby', lockKey, lockField, 1)
redis.call('pexpire', lockKey, lockExpireTime)
return 1
end
-- 判断当前 key 里面的 field 是否是当前协程的
if redis.call('hexists', lockKey, lockField) == 1 then
-- 是当前协程的,value + 1
redis.call('hincrby', lockKey, lockField, 1)
redis.call('pexpire', lockKey, lockExpireTime)
return 1
end
-- 存在 key,但是 field 不是当前协程,其它协程已经锁了,获取锁失败
return 0
判断 key 是否存在,不存在设置 key,field,value,key 就是用户 id,field 就是线程 id,value 第一次是 1,此后当前线程获取一次+1,同时设置过期时间
存在 key,判断是否是当前线程,不是获取锁失败,是的话 value+1
释放锁
local lockKey = KEYS[1]
local lockField = ARGV[1]
if redis.call('hexists', lockKey, lockField) == 0 then
return 0
end
local counter = redis.call('hincrby', lockKey, lockField, -1)
if counter > 0 then
return 1
end
redis.call('del', lockKey)
return 1
判断当前 key,线程 id 存在不存在
存在 value-1,当 value 大于 0 的时候不用管,小于等于 0,删除锁
代码实现
<?php
declare(strict_types=1);
namespace App\Tool\Redis;
use Hyperf\Redis\Exception\InvalidRedisProxyException;
use Hyperf\Redis\Redis;
use Hyperf\Redis\RedisFactory;
use Hyperf\Utils\ApplicationContext;
/**
* 可重入锁
*/
class RedisReentrantLock implements ILock
{
private string $name;
private string $value;
private Redis $redisClient;
public function __construct($name, $requestId, ?Redis $redisClient = null)
{
$this->name = $name;
$this->value = $requestId;
$this->redisClient = $this->initRedisClient($redisClient);
}
private function initRedisClient(?Redis $redisClient = null): Redis
{
if (!$redisClient) {
try {
$redisClient = ApplicationContext::getContainer()->get(RedisFactory::class)->get("default");
} catch (\Throwable $e) {
//初始化redis客户端失败
throw new InvalidRedisProxyException("初始化redis客户端失败");
}
}
return $redisClient;
}
/**
* 获取锁
* @param int $waitTime
* @param int $expireTime
* @return bool
*/
public function tryLock(int $waitTime, int $expireTime): bool
{
//秒转换为毫秒
$waitTime = $waitTime * 1000;
$expireTime = $expireTime * 1000;
//获取超时时间
$currentTime = $this->getMillisecond() + $waitTime;
//编写 lua 脚本
$script =<<<SCRIPT
local lockKey = KEYS[1]
local lockField = ARGV[1]
local lockExpireTime = ARGV[2]
if redis.call('exists', lockKey) == 0 then
-- redis 里面不存在,设置 hash 信息
redis.call('hincrby', lockKey, lockField, 1)
redis.call('pexpire', lockKey, lockExpireTime)
return 1
end
-- 判断当前 key 里面的 field 是否是当前协程的
if redis.call('hexists', lockKey, lockField) == 1 then
-- 是当前协程的,value + 1
redis.call('hincrby', lockKey, lockField, 1)
redis.call('pexpire', lockKey, lockExpireTime)
return 1
end
-- 存在 key,但是 field 不是当前协程,其它协程已经锁了,获取锁失败
return 0
SCRIPT;
do {
$result = $this->redisClient->eval($script, array($this->name, $this->value, $expireTime), 1);
if ($result == 1) {
return true;
}
} while ($this->getMillisecond() < $currentTime);
return false;
}
public function unlock(): bool
{
//编写 lua 脚本
$script =<<<SCRIPT
local lockKey = KEYS[1]
local lockField = ARGV[1]
if redis.call('hexists', lockKey, lockField) == 0 then
return 0
end
local counter = redis.call('hincrby', lockKey, lockField, -1)
if counter > 0 then
return 1
end
redis.call('del', lockKey)
return 1
SCRIPT;
return (boolean)$this->redisClient->eval($script, array($this->name, $this->value), 1);
}
/**
* 获取当前毫秒
* @return float|int
*/
private function getMillisecond()
{
return intval(microtime(true) * 1000);
}
}
在 Hyperf 中每次请求是创建协程处理的,本来考虑使用协程 id,但是测试后有问题,改为中间件来对每次请求赋值
<?php
declare(strict_types=1);
namespace App\Middleware;
use Hyperf\Utils\Context;
use Hyperf\Utils\Coroutine;
use Psr\Container\ContainerInterface;
use Psr\Http\Message\ResponseInterface;
use Psr\Http\Message\ServerRequestInterface;
use Psr\Http\Server\MiddlewareInterface;
use Psr\Http\Server\RequestHandlerInterface;
class ApiMiddleware implements MiddlewareInterface
{
/**
* @var ContainerInterface
*/
protected $container;
public function __construct(ContainerInterface $container)
{
$this->container = $container;
}
public function process(ServerRequestInterface $request, RequestHandlerInterface $handler): ResponseInterface
{
// 生成唯一标识
$requestId = $this->generateRequestId();
// 将唯一标识保存到请求头中
$request = $request->withAddedHeader('request-id', $requestId);
Context::set(ServerRequestInterface::class, $request);
return $handler->handle($request);
}
private function generateRequestId(): string
{
$current = microtime(true) * 10000;
$coroutineId = Coroutine::id();
//简单生成
return $coroutineId . $current . mt_rand(999999999, 99999999999);
}
}
/**
* 可重入性
* Redis锁具备可重入特性,它可以让多个线程同时进入相同的锁,而不会引起死锁问题。
*
* Redis锁也是基于无信号量机制实现的,它能够控制多用户对共享资源的访问,而可重入特性又可以保证同一个用户可以重新获取已经被他自己占用的锁。
*
* 例如,Java应用程序可以在方法A中加锁,而当方法A调用方法B时,线程仍然可以获取它已经持有的锁,从而避免死锁的发生。要实现该功能,就必须确保锁的可重入性,以下是两个使用 Java 编写的样例程序,可以演示 Java 中锁的可重入性:
* @param Request $request
* @return bool
*/
public function test(Request $request): bool
{
$userId = $request->input("user_id");
$requestId = $request->getHeaderLine("request-id");
$goods = Goods::find(1);
if (!$goods) {
return false;
}
if ($goods->num < 1) {
return false;
}
$redisLock = new RedisReentrantLock($userId, $requestId);
if ($redisLock->tryLock(0, 6)) {
try {
$isBuy = Db::select("select id from ku_order where user_id = {$userId} and goods_id = 1 limit 1");
if ($isBuy) {
return false;
}
$flag = Db::update("update ku_goods set num = num - 1 where id = {$goods->id} and num > 0");
if (!$flag) {
return false;
}
$order = new Order();
$order->user_id = $userId;
$order->goods_id = $goods->id;
$order->save();
return true;
} catch (\Throwable $e) {
} finally {
$redisLock->unlock();
}
}
return true;
}
压测结果
其他细节,可以在使用时根据具体业务调整
本作品采用《CC 协议》,转载必须注明作者和本文链接
学到了,楼主好人。
写的不错