PHP 高并发下锁的实现及应用

本文主要介绍了 PHP 在高并发环境下互斥锁的几种实现方式及其应用,希望能为大家提供帮助。

PHP 中互斥锁的几种实现

1. 文件锁:适用于单机部署的场景

利用 flock() 函数对文件进行加锁。

  1. 阻塞模式:当并发请求发生时,未获得锁的请求会一直等待锁的释放。
$fp = fopen('./lock.txt', 'w+');
// 阻塞模式
if (flock($fp, LOCK_EX)) {
    // TODO 执行业务代码
    flock($fp, LOCK_UN); // 释放锁
}
fclose($fp);
  1. 非阻塞模式:当并发请求发生时,未获得锁的请求会直接返回,不会等待。
$fp = fopen('./lock.txt', 'w+');
// 阻塞模式
if (flock($fp, LOCK_EX | LOCK_NB)) {
    // TODO 执行业务代码
    flock($fp, LOCK_UN); // 释放锁
} else {
    // TODO 异常捕获
    throw new \Exception('系统繁忙,请稍后再试~');
}
fclose($fp);

2. 分布式锁(Redis 实现)

注意:Laravel 框架中提供了 RedisLock 的功能,可以直接使用。

// 获取锁
private function lock($name, $expire = 0)
{
    $redis = app('redis')->connection('default');
    $result = $redis->setnx($name, 1);
    if ($result === 1 && $expire) {
        $redis->expire($name, $expire);
    }
    return $result === 1;
}

// 释放锁
private function unlock($name)
{
    $redis = app('redis')->connection('default');
    $redis->del($name);
}

$signature = 'create_user_' . sha1($mobile) ;
while (!$this->lock($signature, 10)) {
    // 阻塞执行并等待锁的释放
    usleep(100000);
}
// TODO 执行业务代码
// 释放锁
$this->unlock($signature);

3. 分布式锁(MySQL 实现)

利用 SELECT ... FOR UPDATE 语句在事务中实现分布式锁。

// =============== 以下代码仅供演示,具体可以参考实验 3 代码示例 ===============
try {
    $conn = app('db');
    // 开启事务
    $conn->beginTransaction();
    // 行锁
    $sql = "SELECT ... FROM ... WHERE ... FOR UPDATE";
    $conn->select($sql);
    // TODO 执行业务代码,更新锁定行数据
    $sql = "UPDATE ... SET ... WHERE ...";
    $conn->update($sql);
    // 提交事务
    $conn->commit();
} catch (\Exception $e) {
    // 执行异常,事务回滚
    $conn->rollback();
}

动手实验

1. 测试环境准备:

-- 创建商品表
CREATE TABLE `products` (
    `id` BIGINT(20) NOT NULL AUTO_INCREMENT,
    `name` VARCHAR(64) NOT NULL COMMENT '商品名称' COLLATE 'utf8_general_ci',
    `store` INT(11) NULL DEFAULT '0' COMMENT '库存',
    PRIMARY KEY (`id`) USING BTREE
)
COLLATE='utf8_general_ci'
ENGINE=InnoDB;

-- 插入测试数据
INSERT INTO `products` (`id`, `name`, `store`) VALUES (1, '苹果', 100);

2. 未加锁时进行并发压测:

$sql = "SELECT * FROM `products` WHERE `id` = 1";
$product = app('db')->selectOne($sql);
if ($product->store <= 0) {
    throw new \Exception('商品库存不足');
}
$new_store = $product->store - 1;
app('log')->info("当前剩余库存为: {$new_store} 个");
$sql = "UPDATE `products` SET `store` = {$new_store} WHERE `id` = 1";
app('db')->update($sql);

压测:

# 总共发起 20 次请求,并发数为 5。
sudo ab -n 20 -c 5 http://example.test/api/createOrder

执行结果:
未使用锁时并发压测结果

从执行结果中可以看到,我们的商品出现了超卖的情况。正常情况下应该剩余 80 个库存,结果最后显示还有 84 个库存。

3. 加锁后我们再次进行压测:

  1. 利用文件锁,适合单机部署时使用。
$fp = fopen('./lock.txt', 'w+');
// 阻塞模式
if (flock($fp, LOCK_EX)) {
    // 业务代码
    $sql = "SELECT * FROM `products` WHERE `id` = 1";
    $product = app('db')->selectOne($sql);
    if ($product->store <= 0) {
        throw new \Exception('商品库存不足');
    }
    $new_store = $product->store - 1;
    app('log')->info("当前剩余库存为: {$new_store} 个");
    $sql = "UPDATE `products` SET `store` = {$new_store} WHERE `id` = 1";
    app('db')->update($sql);
    // 释放锁
    flock($fp, LOCK_UN);
}
fclose($fp);
  1. 分布式锁(Redis 实现)。
// 获取锁
private function lock($name, $expire = 0)
{
    $redis = app('redis')->connection('default');
    $result = $redis->setnx($name, 1);
    if ($result === 1 && $expire) {
        $redis->expire($name, $expire);
    }
    return $result === 1;
}
// 释放锁
private function unlock($name)
{
    $redis = app('redis')->connection('default');
    $redis->del($name);
}

// =============== 示例代码 ===============
$product_id = 1;
$signature = 'create_order_' . sha1($product_id) ;
while (!$this->lock($signature, 10)) {
    // 阻塞执行并等待锁的释放
    usleep(100000);
}
// 业务代码
$sql = "SELECT * FROM `products` WHERE `id` = {$product_id}";
$product = app('db')->selectOne($sql);
if ($product->store <= 0) {
    throw new \Exception('商品库存不足');
}
$new_store = $product->store - 1;
app('log')->info("当前剩余库存为: {$new_store} 个");
$sql = "UPDATE `products` SET `store` = {$new_store} WHERE `id` = {$product_id}";
app('db')->update($sql);
// 释放锁
$this->unlock($signature);
  1. 分布式锁(MySQL 实现)。

注意: SELECT … FOR UPDATE 必须在事务中才会生效。

try {
    $conn = app('db');
    $conn->beginTransaction();
    $sql = "SELECT * FROM `products` WHERE `id` = 1 FOR UPDATE";
    $product = app('db')->selectOne($sql);
    if ($product->store <= 0) {
        throw new \Exception('商品库存不足');
    }
    $new_store = $product->store - 1;
    app('log')->info("当前剩余库存为: {$new_store} 个");
    $sql = "UPDATE `products` SET `store` = {$new_store} WHERE `id` = 1";
    $conn->update($sql);
    $conn->commit();
} catch (\Exception $e) {
    app('log')->error("库存更新失败: " . $e->getMessage());
    $conn->rollback();
}

压测:

# 总共发起 20 次请求,并发数为 5。
sudo ab -n 20 -c 5 http://example.test/api/createOrder

执行结果(三种方式实验结果均一样,这里就不重复贴了):
使用锁后并发压测结果

可以看到加了锁之后,我们的库存扣减就正常了。

总结

在处理高并发时,如果不加锁,可能会出现多个请求同时更新同一数据的情况,最终导致数据不一致。因此,在实际应用中,我们应根据具体业务场景选择合适的锁机制来确保数据的一致性和准确性。

  1. 使用 flock() 函数实现的文件锁仅适用于单机部署时使用。

  2. MySQL 的 SELECT ... FOR UPDATE 实现比较简单,但是数据库无法支撑大量的请求,还有对于无索引的普通字段进行 FOR UPDATE 时会导致表锁。

  3. 最后,还是推荐使用 Redis 实现的分布式锁,不管是单机还是集群都可以使用,性能也不错。

写在最后

如果本文对您有所帮助或者有所启发,请帮忙扫描下方二维码或微信搜索 「自在牛马」 关注一下我的公众号,您的支持是我最大的写作动力。感谢~

拒绝白嫖,转载请注明出处。

自在牛马

本作品采用《CC 协议》,转载必须注明作者和本文链接
讨论数量: 1

我二十年铺阿七铺经验,竟然没有看懂。

3天前 评论

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!