关于分布式事务带来的问题及解决方案

相信大家都做过微信提现的业务逻辑,扣除用户余额,调用微信付款到零钱接口进行提现。

代码如下:

<?php

public function withdraw(Request $request)
{
    $withdrawMoney = $request->post('money');
    $loginUser = UserService::getLoginUserInfo();
    $user = User::find($loginUser['id']);

    if ($user['money'] >= $withdrawMoney) {
        DB::beginTransaction();

        try {
            $user['money'] -= $withdrawMoney;
            $user->save();

            UserMoneyLog::record($user['id'], $withdrawMoney);
            WechatService::pay($user['openid'], $withdrawMoney);

            DB::commit();
            //返回成功响应
        } catch (\Exception $e) {
            DB::rollback();
            //返回失败响应
        }

    }

    throw new \Exception('余额不足');
}

这样写在并发提现的时候会有问题,要加上分布式锁。具体参考我的另一篇文章分布式锁在编程中的应用

加上分布式锁后,代码如下:

<?php

public function withdraw(Request $request)
{
    $withdrawMoney = $request->post('money');
    $loginUser = UserService::getLoginUserInfo();

    $lock = Cache::lock('withdraw-' . $user['id'], 10);

    if (!$lock->get()) {
        throw new \Exception('提现太频繁,请稍后重试');
    }

    $user = User::find($loginUser['id']);

    if ($user['money'] >= $withdrawMoney) {
        try {
            $user['money'] -= $withdrawMoney;
            $user->save();

            UserMoneyLog::record($user['id'], $withdrawMoney);
            WechatService::pay($user['openid'], $withdrawMoney);

            DB::commit();

            //返回成功响应
        } catch (\Exception $e) {
            DB::rollback();
            //返回失败响应
        }

    }

    throw new \Exception('余额不足');
}

这样还是会带来一个问题,如果数据库减成功了,微信接口调用失败了怎么办?就会导致余额扣了,钱没到账。

那有朋友可能就会说了,简单啊,微信支付的接口是同步返回的,获取支付结果判断一下,如果没问题,才提交整个事务。

代码如下:

<?php

public function withdraw(Request $request)
{
    $withdrawMoney = $request->post('money');
    $loginUser = UserService::getLoginUserInfo();

    $lock = Cache::lock('withdraw-' . $user['id'], 10);

    if (!$lock->get()) {
        throw new \Exception('提现太频繁,请稍后重试');
    }

    $user = User::find($loginUser['id']);

    if ($user['money'] >= $withdrawMoney) {
        try {
            $user['money'] -= $withdrawMoney;
            $user->save();

            UserMoneyLog::record($user['id'], $withdrawMoney);
            $result = WechatService::pay($user['openid'], $withdrawMoney);

            if ($result['pay_result'] == 'SUCCESS') {
                DB::commit();
            } else {
                throw new \Exception('提现失败');
            }
        } catch (\Exception $e) {
            DB::rollback();
            throw $e;
        }
    }
}

到此,微信提现才算没有问题,本地事务扣余额和记日志,还有远程事务微信支付要么同时成功,要么同时失败,保证了原子性。

难道,所谓的分布式事务这么轻松就可以解决了吗?我们再来假设一下,不一定符合实际情况,但为了更加深入地探讨分布式事务,我们不妨先接受这个设定。
假设用户资产是在JAVA端管理,而提现功能是用PHP写。那扣除余额和记录资产日志就要调用JAVA提供的接口,再调用微信接口。代码如下:

<?php

public function withdraw(Request $request)
{
    $withdrawMoney  =  $request->post('money');  
    $loginUser  =  UserService::getLoginUserInfo();

    try {
        UserService::withdrawMoney($loginUser['id'], $withdrawMoney); //调用JAVA接口的封装
        WechatService::pay($loginUser['openid'], $withdrawMoney);
    } catch (\Exception $e) {
        throw new \Exception('提现失败');
    }

}

这两个都是远程事务,如何保证这两个事务100%执行成功?如果有一个事务失败了,一个事务成功了怎么办?这可不是本地事务,没办法回滚。

这里介绍一种本地消息表的方式来解决分布式事务的问题,其核心思想就是把远程事务化为本地事务,再通过消息队列和重试机制来保证事务100%执行成功

什么叫把远程事务化为本地事务?就是把远程接口调用,封装成一个任务。

新建一个任务表tasks:

  • id
  • name
  • handler_class
  • handler_method
  • handler_construct_params
  • handler_meethod_params
  • is_static
  • execute_result
  • create_time
  • throw_time

上面的代码变成了

<?php

public function withdraw(Request $request)
{
    $withdrawMoney  =  $request->post('money');  
    $loginUser  =  UserService::getLoginUserInfo();

    DB::beginTransaction();

    try {
        $task = new Task();
        $task->name = '提现任务JAVA调用';
        $task->handler_class = UserService::class;
        $task->handler_method = 'withdrawMoney';
        $task->is_static = 1;
        $task->handler_method_params = json_encode([
            $loginUser['id'],
            $withdrawMoney
        ]);
        $task->create_time = date('Y-m-d H:i:s');
        $task->save();

        $task = new Task();
        $task->name = '提现任务微信支付';
        $task->handler_class = WechatService::class;
        $task->handler_method = 'pay';
        $task->is_static = 1;
        $task->handler_method_params = json_encode([
            $loginUser['openid'],
            $withdrawMoney
        ]);
        $task->create_time = date('Y-m-d H:i:s');
        $task->save();

        DB::commit();
    } catch (\Exception $e) {
        DB::rollback();
    }
}

这样我们就把两个远程事务化为了本地事务,由数据库来保证两个任务同时插入成功的原子性。

到此,只是完成了一半。还需要一个投递进程,也叫消息生产者进程把任务投递到消息队列,由消费者进程进行消费。

但限于篇幅,要完整地展示用于生产环境的投递进程和消费者进程代码难度较大。所以只是简单地写一下代码。

<?php

class MessageProducer
{
    public function onWorkerStart()
    {
        // 获取未投递过的任务

        //投递到MQ
    }
}

这里有很多要注意的细节,多进程投递还是单进程投递。已经投递未处理的任务如何区分?怎么样避免重复投递任务?投递任务本质也是网络IO,如果失败了怎么办?

<?php

class MessageConsumer
{
    public function onWorkerStart()
    {
        //从MQ中获取任务

        //执行任务
    }
}

消费者进程也有非常多的细节,以RabbitMQ为例,失败后要不要进死信队列,失败几次进。重试机制是什么样的?重试时间间隔如何设计?进入到死信队列后,要不要设置一个进程专门消费死信队列的任务。

但无论投递到什么MQ,无论如何消费,如何投递。本质上都是把任务先放过数据库,再通过生产者进程投递到消息队列,再通过消费者进程进行消费。就算有一个远程事务失败了,也可以通过重试机制去保证一定可以执行成功。

当然,处理分布式事务不止本地消息表这一种方式,还有TCC等方式。下次有机会再一起探讨。

本作品采用《CC 协议》,转载必须注明作者和本文链接
《L02 从零构建论坛系统》
以构建论坛项目 LaraBBS 为线索,展开对 Laravel 框架的全面学习。应用程序架构思路贴近 Laravel 框架的设计哲学。
《G01 Go 实战入门》
从零开始带你一步步开发一个 Go 博客项目,让你在最短的时间内学会使用 Go 进行编码。项目结构很大程度上参考了 Laravel。
讨论数量: 6

这种与外部系统交互一般都会有全局唯一业务凭证。就算是分布式事物也有全局唯一的事物ID。

单说问题的场景【微信付款】,一般的解决方案是:

  1. 用户发起或系统发起,在系统中预先创建一笔记录并标记唯一ID(业务单号、提现单号等概念)
  2. ①的事物结束,开始发起请求,或投递到消息队列
  3. 如果②是 开始发起 则带着 本系统中唯一ID和三方系统交互,拿微信付款来说,接口中有个参数叫:out_batch_no,单笔明细中还有一个out_detail_no。都可以作为这个唯一ID实现和三方系统的【幂等】
  4. 如果③中是投递到消息队列,那就在消费时再执行③的操作

在发起时,先使用预先生成的唯一业务ID进行【查询】操作。有查询结果,表示之前已发起过,同步状态即可,如果查询结果是【订单不存在】则进行【发起】操作。

有些三方业务是不具备【通知回调】能力,则需要自己实现定时批量【查询】进行同步

1年前 评论

把一个事务的两个任务投放进消息队列中,如果第一个任务成功了,第二个失败了,用消息队列如何回滚第一个任务?这里只说了第二个任务如果失败了可以使用消息队列的重试机制,但是如果重试一直不成功呢

1年前 评论
抖森先森 1年前
fireqong (楼主) 1年前

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!