关于分布式事务带来的问题及解决方案
相信大家都做过微信提现的业务逻辑,扣除用户余额,调用微信付款到零钱接口进行提现。
代码如下:
<?php
public function withdraw(Request $request)
{
$withdrawMoney = $request->post('money');
$loginUser = UserService::getLoginUserInfo();
$user = User::find($loginUser['id']);
if ($user['money'] >= $withdrawMoney) {
DB::beginTransaction();
try {
$user['money'] -= $withdrawMoney;
$user->save();
UserMoneyLog::record($user['id'], $withdrawMoney);
WechatService::pay($user['openid'], $withdrawMoney);
DB::commit();
//返回成功响应
} catch (\Exception $e) {
DB::rollback();
//返回失败响应
}
}
throw new \Exception('余额不足');
}
这样写在并发提现的时候会有问题,要加上分布式锁。具体参考我的另一篇文章分布式锁在编程中的应用
加上分布式锁后,代码如下:
<?php
public function withdraw(Request $request)
{
$withdrawMoney = $request->post('money');
$loginUser = UserService::getLoginUserInfo();
$lock = Cache::lock('withdraw-' . $user['id'], 10);
if (!$lock->get()) {
throw new \Exception('提现太频繁,请稍后重试');
}
$user = User::find($loginUser['id']);
if ($user['money'] >= $withdrawMoney) {
try {
$user['money'] -= $withdrawMoney;
$user->save();
UserMoneyLog::record($user['id'], $withdrawMoney);
WechatService::pay($user['openid'], $withdrawMoney);
DB::commit();
//返回成功响应
} catch (\Exception $e) {
DB::rollback();
//返回失败响应
}
}
throw new \Exception('余额不足');
}
这样还是会带来一个问题,如果数据库减成功了,微信接口调用失败了怎么办?就会导致余额扣了,钱没到账。
那有朋友可能就会说了,简单啊,微信支付的接口是同步返回的,获取支付结果判断一下,如果没问题,才提交整个事务。
代码如下:
<?php
public function withdraw(Request $request)
{
$withdrawMoney = $request->post('money');
$loginUser = UserService::getLoginUserInfo();
$lock = Cache::lock('withdraw-' . $user['id'], 10);
if (!$lock->get()) {
throw new \Exception('提现太频繁,请稍后重试');
}
$user = User::find($loginUser['id']);
if ($user['money'] >= $withdrawMoney) {
try {
$user['money'] -= $withdrawMoney;
$user->save();
UserMoneyLog::record($user['id'], $withdrawMoney);
$result = WechatService::pay($user['openid'], $withdrawMoney);
if ($result['pay_result'] == 'SUCCESS') {
DB::commit();
} else {
throw new \Exception('提现失败');
}
} catch (\Exception $e) {
DB::rollback();
throw $e;
}
}
}
到此,微信提现才算没有问题,本地事务扣余额和记日志,还有远程事务微信支付要么同时成功,要么同时失败,保证了原子性。
难道,所谓的分布式事务这么轻松就可以解决了吗?我们再来假设一下,不一定符合实际情况,但为了更加深入地探讨分布式事务,我们不妨先接受这个设定。
假设用户资产是在JAVA端管理,而提现功能是用PHP写。那扣除余额和记录资产日志就要调用JAVA提供的接口,再调用微信接口。代码如下:
<?php
public function withdraw(Request $request)
{
$withdrawMoney = $request->post('money');
$loginUser = UserService::getLoginUserInfo();
try {
UserService::withdrawMoney($loginUser['id'], $withdrawMoney); //调用JAVA接口的封装
WechatService::pay($loginUser['openid'], $withdrawMoney);
} catch (\Exception $e) {
throw new \Exception('提现失败');
}
}
这两个都是远程事务,如何保证这两个事务100%执行成功?如果有一个事务失败了,一个事务成功了怎么办?这可不是本地事务,没办法回滚。
这里介绍一种本地消息表的方式来解决分布式事务的问题,其核心思想就是把远程事务化为本地事务,再通过消息队列和重试机制来保证事务100%执行成功
什么叫把远程事务化为本地事务?就是把远程接口调用,封装成一个任务。
新建一个任务表tasks:
- id
- name
- handler_class
- handler_method
- handler_construct_params
- handler_meethod_params
- is_static
- execute_result
- create_time
- throw_time
上面的代码变成了
<?php
public function withdraw(Request $request)
{
$withdrawMoney = $request->post('money');
$loginUser = UserService::getLoginUserInfo();
DB::beginTransaction();
try {
$task = new Task();
$task->name = '提现任务JAVA调用';
$task->handler_class = UserService::class;
$task->handler_method = 'withdrawMoney';
$task->is_static = 1;
$task->handler_method_params = json_encode([
$loginUser['id'],
$withdrawMoney
]);
$task->create_time = date('Y-m-d H:i:s');
$task->save();
$task = new Task();
$task->name = '提现任务微信支付';
$task->handler_class = WechatService::class;
$task->handler_method = 'pay';
$task->is_static = 1;
$task->handler_method_params = json_encode([
$loginUser['openid'],
$withdrawMoney
]);
$task->create_time = date('Y-m-d H:i:s');
$task->save();
DB::commit();
} catch (\Exception $e) {
DB::rollback();
}
}
这样我们就把两个远程事务化为了本地事务,由数据库来保证两个任务同时插入成功的原子性。
到此,只是完成了一半。还需要一个投递进程,也叫消息生产者进程把任务投递到消息队列,由消费者进程进行消费。
但限于篇幅,要完整地展示用于生产环境的投递进程和消费者进程代码难度较大。所以只是简单地写一下代码。
<?php
class MessageProducer
{
public function onWorkerStart()
{
// 获取未投递过的任务
//投递到MQ
}
}
这里有很多要注意的细节,多进程投递还是单进程投递。已经投递未处理的任务如何区分?怎么样避免重复投递任务?投递任务本质也是网络IO,如果失败了怎么办?
<?php
class MessageConsumer
{
public function onWorkerStart()
{
//从MQ中获取任务
//执行任务
}
}
消费者进程也有非常多的细节,以RabbitMQ为例,失败后要不要进死信队列,失败几次进。重试机制是什么样的?重试时间间隔如何设计?进入到死信队列后,要不要设置一个进程专门消费死信队列的任务。
但无论投递到什么MQ,无论如何消费,如何投递。本质上都是把任务先放过数据库,再通过生产者进程投递到消息队列,再通过消费者进程进行消费。就算有一个远程事务失败了,也可以通过重试机制去保证一定可以执行成功。
当然,处理分布式事务不止本地消息表这一种方式,还有TCC等方式。下次有机会再一起探讨。
本作品采用《CC 协议》,转载必须注明作者和本文链接
推荐文章: