php基于dtm分布式事务管理器实现tcc模式分布式事务demo

项目里遇到两个系统互相调但不能保证事务一致性的问题,凑巧这段时间跟着dtm富哥学习了一些分布式事务的知识,大概了解了一些分布式事务的使用场景,看了看官方的demo后,简简单单自己写了个demo,代码已经上传到码云,分布式事务模式我们使用的是tcc,即(try,confirm,cancel),调用方式采用的是http,关于tcc可以到这里了解更多。

dtm是一个go语言开发的分布式事务管理器,在这里我们先将dtm跑起来。

## 拉取代码
cd /home
git clone https://github.com/dtm-labs/dtm && cd dtm

按照官方文档,dtm依赖于mysql,安装了docker20.04+之后,你可以通过

docker-compose -f helper/compose.mysql.yml

在docker里面启动mysql服务。你也可以像我一样使用现有的mysql服务,只需要在dtm项目下:

cp conf.sample.yml conf.yml # 修改conf.yml

将配置文件复制一份出来并修改一下相关的信息:

Store: # specify which engine to store trans status
  Driver: 'mysql'
  Host: 'localhost'
  User: 'root'
  Password: ''
  Port: 3306

ExamplesDB:
  Driver: 'mysql'
  Host: 'localhost'
  User: 'root'
  Password: ''
  Port: 3306

这里要注意你配置的mysql账户必须有创建dtm数据库的权限。改完了之后我们就可以启动项目了(前提是你本地已经有了go环境,不然还是要通过docker,通过docker怎么启动可以查看下dtm官方文档):

// 入口文件是app/main.go
go run app/main.go dev

看到下图这样的就是启动成功了,并且会有一个定时任务不停的打印日志

你也可以使用数据库工具查看是否多了一个dtm的数据库。根据性能测试目前dtm搭配mysql可以处理的事务大概为900多个每秒,我写这篇文章的时候已经看到作者发了搭配redis做为存储引擎可以达到每秒处理1W+事务的文章,如果业务上有这么高的要求可以使用redis做为dtm的存储引擎,具体参考(mp.weixin.qq.com/s/lmIVQ2aVksZxiCx...) OK,dtm到了这里就不用去管它了,下面我们看具体的业务。

我们模拟的是一个下单扣库存服务,分别有订单服务、库存服务。php方面使用的hyperf框架,其他框架也可以,项目结构如下:

|—— php-dtm-tcc/
    |—— api-hyperf/
    |—— order-server/
    |—— stock-server/
    |—— sql/index.sql ## 建表语句

这里为了简单点数据库我就使用的同一个了,实际操作不同的系统可以使用不同的数据库,数据库表有商品表、商品库存表、订单表、订单商品表、库存锁定表。

请求先到api-hyperf再调用两个子服务order-serverstock-server

订单服务生成写入订单、订单商品等记录,库存服务扣减库存,锁定库存等操作。

api-hyperf项目的composer.json里加上下面两个包

"linxx/dtmcli-php": "*",//原本是dtm/dtmcli-php,但我觉得他那个包里返回的数据不是我想要的,所以基于原包的情况下我将返回数据修改了一下,下面会做对比
"mix/vega": "^3.0"

然后我们分别启动api-hyperf服务、order-server服务、stock-server服务。

在写具体的代码前我们先来看看整体的流程大概是怎么样的,这里引用一段官方demo的代码:

<?php

require __DIR__ . '/vendor/autoload.php';

function FireTcc () {
    $dtm = 'http://localhost:36789/api/dtmsvr';
    $svc = 'http://localhost:4005/api';

    Dtmcli\tccGlobalTransaction($dtm, function ($tcc) use ($svc) {
        /** @var Dtmcli\Tcc $tcc */

        $req = ['amount' => 30];
        echo 'calling trans out' . PHP_EOL;
        $tcc->callBranch($req, $svc . '/TransOutTry', $svc . '/TransOutConfirm', $svc . '/TransOutCancel');
        echo 'calling trans in' . PHP_EOL;
        $tcc->callBranch($req, $svc . '/TransInTry', $svc . '/TransInConfirm', $svc . '/TransInCancel');
    });
}

$vega = new Mix\Vega\Engine();

//转出try
$vega->handleFunc('/api/TransOutTry', function (Mix\Vega\Context $ctx) {
    var_dump('TransOutTry', $ctx->request->getQueryParams(), $ctx->request->getParsedBody());
    $ctx->JSON(200, ['result' => 'SUCCESS']);
})->methods('POST');

//转出confirm
$vega->handleFunc('/api/TransOutConfirm', function (Mix\Vega\Context $ctx) {
    var_dump('TransOutConfirm', $ctx->request->getQueryParams(), $ctx->request->getParsedBody());
    $ctx->JSON(200, ['result' => 'SUCCESS']);
})->methods('POST');

//转出commit
$vega->handleFunc('/api/TransOutCancel', function (Mix\Vega\Context $ctx) {
    var_dump('TransOutCancel', $ctx->request->getQueryParams(), $ctx->request->getParsedBody());
    $ctx->JSON(200, ['result' => 'SUCCESS']);
})->methods('POST');

//转入try
$vega->handleFunc('/api/TransInTry', function (Mix\Vega\Context $ctx) {
    var_dump('TransInTry', $ctx->request->getQueryParams(), $ctx->request->getParsedBody());
    $ctx->JSON(200, ['result' => 'SUCCESS']);
})->methods('POST');

//转入confirm
$vega->handleFunc('/api/TransInConfirm', function (Mix\Vega\Context $ctx) {
    var_dump('TransInConfirm', $ctx->request->getQueryParams(), $ctx->request->getParsedBody());
    $ctx->JSON(200, ['result' => 'SUCCESS']);
})->methods('POST');

//转入commit
$vega->handleFunc('/api/TransInCancel', function (Mix\Vega\Context $ctx) {
    var_dump('TransInCancel', $ctx->request->getQueryParams(), $ctx->request->getParsedBody());
    $ctx->JSON(200, ['result' => 'SUCCESS']);
})->methods('POST');

$vega->handleFunc('/api/FireTcc', function (Mix\Vega\Context $ctx) {
    FireTcc();
    $ctx->JSON(200, ['result' => 'SUCCESS']);
})->methods('POST');

$http_worker = new Workerman\Worker("http://0.0.0.0:4005");
$http_worker->onMessage = $vega->handler();
$http_worker->count = 4;
Workerman\Worker::runAll();

由上面这个例子我们看到,这是一个仿照转账业务的操作,首先定义了转出、出入的try、confirm、cancel操作,在这个workerman服务启动起来后请求http:127.0.0.1:4006/api/FireTcc地址,会调用FireTcc()函数,这个函数里有一个tccGlobalTransaction函数,这个函数里就是dtm管理事务的相关操作,官方这个包dtm/dtmcli-phptccGlobalTransaction函数返回值是gid(分布式事务id),在整个事务操作成功的时候返回分布式事务id,事务失败时返回空。

function tccGlobalTransaction(string $dtmUrl, callable $cb): string
    {
        $tcc = new Tcc($dtmUrl, genGid($dtmUrl));
        $tbody = [
            'gid' => $tcc->gid,
            'trans_type' => 'tcc',
        ];
        $client = new \GuzzleHttp\Client();
        try {
            $response = $client->post($tcc->dtm . '/prepare', ['json' => $tbody]);
            checkStatus($response->getStatusCode());
            $cb($tcc);
            $client->post($tcc->dtm . '/submit', ['json' => $tbody]);
        } catch (\Throwable $e) {
            $client->post($tcc->dtm . '/abort', ['json' => $tbody]);
            return '';
        }
        return $tcc->gid;
    }

linxx/dtmcli-php这个包返回的是一个数组:

function tccGlobalTransaction(string $dtmUrl, callable $cb): array
    {
        $tcc = new Tcc($dtmUrl, genGid($dtmUrl));
        $tbody = [
            'gid' => $tcc->gid,
            'trans_type' => 'tcc',
        ];
        $client = new \GuzzleHttp\Client();
        $message = '操作成功';
        $gid = $tcc->gid;
        try {
            $response = $client->post($tcc->dtm . '/prepare', ['json' => $tbody]);
            checkStatus($response->getStatusCode());
            $cb($tcc);
            $client->post($tcc->dtm . '/submit', ['json' => $tbody]);
        } catch (\Throwable $e) {
            $client->post($tcc->dtm . '/abort', ['json' => $tbody]);
            $message = $e->getMessage();
            $gid = '';
        }
        return [
            'gid'   => $gid,
            'message'   => $message
        ];
    }

除了gid之外还返回了一个message字段,这样可以将子服务失败的原因也返回,方便api-hyperf服务提示错误信息。

下面我们就开始写具体的业务代码了,我们先到order-serverstock-server写上相应的try、confirm、cancel操作。

订单服务:

<?php

declare(strict_types=1);
/**
 * This file is part of Hyperf.
 *
 * @link     https://www.hyperf.io
 * @document https://hyperf.wiki
 * @contact  group@hyperf.io
 * @license  https://github.com/hyperf/hyperf/blob/master/LICENSE
 */
namespace App\Controller;

use Hyperf\DbConnection\Db;

class Order extends AbstractController
{
    public function addOrderTry()
    {
        Db::beginTransaction();
        try {
            $postData = $this->request->post();
            $totalAmount = 0;
            $totalNumber = 0;
            foreach ($postData['goods'] as &$value) {
                $totalNumber = $value['number'] + $totalNumber;
                $goodInfo = Db::table('good')->where('id', $value['good_id'])->first();
                $amount = $value['number'] * $goodInfo->price;
                $value['amount'] = $amount;
                $value['price'] = $goodInfo->price;
                $totalAmount = $totalAmount + $amount;
            }
            unset($value);
            if (round($totalAmount, 2) != round($postData['pay_amount'], 2)) {
                throw new \Exception('商品金额计算错误', 10010);
            }
            $orderId = Db::table('order')->insertGetId([
                'user_id' => $postData['user_id'],
                'order_no' => $postData['order_no'],
                'total_amount' => $totalAmount,
                'total_number' => $totalNumber,
                'create_time' => time(),
                'update_time' => time(),
            ]);
            foreach ($postData['goods'] as $value) {
                Db::table('order_goods')->insert([
                    'order_id' => $orderId,
                    'good_id' => $value['good_id'],
                    'number' => $value['number'],
                    'amount' => $value['amount'],
                    'price' => $value['price'],
                    'create_time' => time(),
                    'update_time' => time(),
                ]);
            }
            Db::commit();
            return [
                'code' => 0,
                'data' => 'SUCCESS',
                'msg' => '成功',
            ];
        } catch (\Exception $e) {
            Db::rollBack();
            return [
                'code' => 10010,
                'data' => 'FAILURE',
                'msg' => $e->getMessage(),
            ];
        }
    }

    public function addOrderConfirm()
    {
        Db::beginTransaction();
        try {
            $postData = $this->request->post();
            $orderInfo = Db::table('order')->where('order_no', $postData['order_no'])->first();
            Db::table('order')->where('id', $orderInfo->id)->update([
                'is_ok' => 1,
                'update_time' => time(),
            ]);
            Db::table('order_goods')->where('order_id', $orderInfo->id)->update([
                'is_ok' => 1,
                'update_time' => time(),
            ]);
            Db::commit();
            return [
                'code' => 0,
                'data' => 'SUCCESS',
                'msg' => '成功',
            ];
        } catch (\Exception $e) {
            Db::rollBack();
            return [
                'code' => 10010,
                'data' => 'FAILURE',
                $e->getMessage(),
            ];
        }
    }

    public function addOrderCancel()
    {
        Db::beginTransaction();
        try {
            $postData = $this->request->post();
            $orderInfo = Db::table('order')->where('order_no', $postData['order_no'])->first();
            Db::table('order')->where('id', $orderInfo->id)->update([
                'delete_time' => time(),
            ]);
            Db::table('order_goods')->where('order_id', $orderInfo->id)->update([
                'delete_time' => time(),
            ]);
            Db::commit();
            return [
                'code' => 0,
                'data' => 'SUCCESS',
                'msg' => '成功',
            ];
        } catch (\Exception $e) {
            Db::rollBack();
            return [
                'code' => 10010,
                'data' => 'FAILURE',
                'msg' => $e->getMessage(),
            ];
        }
    }
}

库存服务:

<?php

declare(strict_types=1);
/**
 * This file is part of Hyperf.
 *
 * @link     https://www.hyperf.io
 * @document https://hyperf.wiki
 * @contact  group@hyperf.io
 * @license  https://github.com/hyperf/hyperf/blob/master/LICENSE
 */
namespace App\Controller;

use Hyperf\DbConnection\Db;

class Stock extends AbstractController
{
    public function decGoodStockTry()
    {
        Db::beginTransaction();
        try {
            $postData = $this->request->post();
            foreach ($postData['goods'] as $value) {
                $goodInfo = Db::table('good')->where('id', $value['good_id'])->first();
                if (empty($goodInfo)) {
                    throw new \Exception('商品不存在', 10010);
                }
                //查寻库存
                $stockInfo = Db::table('good_stock')->where('good_id', $value['good_id'])->first();
                if (empty($goodInfo)) {
                    throw new \Exception('商品库存为空', 10010);
                }
                if (round($value['number'], 2) > round($stockInfo->total_number, 2)) {
                    throw new \Exception('商品库存不足', 10010);
                }
                Db::table('good_stock')->where('id', $stockInfo->id)->decrement('total_number', $value['number']);
                //锁定库存
                Db::table('good_stock_lock')->insert([
                    'order_no' => $postData['order_no'],
                    'good_id' => $value['good_id'],
                    'number' => $value['number'],
                    'create_time' => time(),
                    'update_time' => time(),
                ]);
                //@todo 库存记录
            }
            Db::commit();
            return [
                'code' => 0,
                'data' => 'SUCCESS',
                'msg' => '成功',
            ];
        } catch (\Exception $e) {
            Db::rollBack();
            return [
                'code' => 10010,
                'data' => 'FAILURE',
                'msg' => $e->getMessage(),
            ];
        }
    }

    public function decGoodStockConfirm()
    {
        Db::beginTransaction();
        try {
            $postData = $this->request->post();
            foreach ($postData['goods'] as $value) {
                $info = Db::table('good_stock_lock')->where('good_id', $value['good_id'])
                    ->where('order_no', $postData['order_no'])
                    ->first();
                if (! empty($info) && $info->status == 0) {
                    Db::table('good_stock_lock')->where('id', $info->id)->update([
                        'status' => '1',
                        'update_time' => time(),
                    ]);
                }
            }
            Db::commit();
            return [
                'code' => 0,
                'data' => 'SUCCESS',
                'msg' => '成功',
            ];
        } catch (\Exception $e) {
            Db::rollBack();
            return [
                'code' => 10010,
                'data' => 'FAILURE',
                'msg' => $e->getMessage(),
            ];
        }
    }

    public function decGoodStockCancel()
    {
        Db::beginTransaction();
        try {
            $postData = $this->request->post();
            foreach ($postData['goods'] as $value) {
                $info = Db::table('good_stock_lock')->where('good_id', $value['good_id'])
                    ->where('order_no', $postData['order_no'])
                    ->first();
                if (! empty($info) && $info->status == 0) {
                    Db::table('good_stock_lock')->where('id', $info->id)->update([
                        'status' => '2',
                        'update_time' => time(),
                    ]);
                    Db::table('good_stock')->where('good_id', $value['good_id'])->increment('total_number', $value['number']);
                }
            }
            Db::commit();
            return [
                'code' => 0,
                'data' => 'SUCCESS',
                'msg' => '成功',
            ];
        } catch (\Exception $e) {
            Db::rollBack();
            return [
                'code' => 10010,
                'data' => 'FAILURE',
                'msg' => $e->getMessage(),
            ];
        }
    }
}

再到api-hyperf定义调用相关的操作:

<?php

declare(strict_types=1);
/**
 * This file is part of Hyperf.
 *
 * @link     https://www.hyperf.io
 * @document https://hyperf.wiki
 * @contact  group@hyperf.io
 * @license  https://github.com/hyperf/hyperf/blob/master/LICENSE
 */
namespace App\Controller;

use App\Constants\ErrorCode;
use App\Utils\DtmCli;

class Order extends AbstractController
{
    public function addOrder()
    {
        $orderNo = get_order_no();
        $req = [
            'user_id' => 1,
            'pay_amount' => 100,
            'order_no' => $orderNo,
            'goods' => [
                [
                    'good_id' => 10000,
                    'number' => 10,
                ],
            ],
        ];
        $serverList = [
            [
                'server' => 'http://localhost:9551', //可使用etcd等服务注册发现中间件
                'try' => 'addOrderTry',
                'confirm' => 'addOrderConfirm',
                'cancel' => 'addOrderCancel',
            ],
            [
                'server' => 'http://localhost:9552',
                'try' => 'decGoodStockTry',
                'confirm' => 'decGoodStockConfirm',
                'cancel' => 'decGoodStockCancel',
            ],
        ];
        $ret = DtmCli::handleDtmTransaction($serverList, $req);
        if ($ret['is_ok']) {
            return $this->success([]);
        }
        return $this->success([], ErrorCode::CODE_ERROR, $ret['message']);
    }
}

DtmCli::handleDtmTransaction是我封装的一个操作:

<?php

declare(strict_types=1);
/**
 * This file is part of Hyperf.
 *
 * @link     https://www.hyperf.io
 * @document https://hyperf.wiki
 * @contact  group@hyperf.io
 * @license  https://github.com/hyperf/hyperf/blob/master/LICENSE
 */
namespace App\Utils;

use App\Constants\ErrorCode;
use Dtmcli\Tcc;
use function Dtmcli\tccGlobalTransaction;

class DtmCli
{
    /**
     * @param $serverList
     * @param $req
     */
    public static function handleDtmTransaction($serverList, $req): array
    {
        $isOk = false;
        try {
            $dtm = 'http://localhost:36789/api/dtmsvr';//dtm服务地址
            if (empty($serverList)) {
                throw new \Exception('子服务不能为空', ErrorCode::CODE_ERROR);
            }
            $ret = tccGlobalTransaction($dtm, function ($tcc) use ($req, $serverList) {
                /*
                 * @var Tcc $tcc
                 */
                foreach ($serverList as $value) {
                    if (empty($value['server']) || empty($value['try']) || empty($value['confirm']) || empty($value['cancel'])) {
                        throw new \Exception('子服务错误', ErrorCode::CODE_ERROR);
                    }
                    $tryUrl = $value['server'] . '/' . $value['try'];
                    $confirmUrl = $value['server'] . '/' . $value['confirm'];
                    $cancelUrl = $value['server'] . '/' . $value['cancel'];
                    $tcc->callBranch($req, $tryUrl, $confirmUrl, $cancelUrl);
                }
            });
            if (! empty($ret['gid'])) {
                $isOk = true;
            }
            $message = $ret['message'];
        } catch (\Exception $e) {
            $message = $e->getMessage();
        }
        return [
            'is_ok' => $isOk,
            'message' => $message,
        ];
    }
}

都写好了之后我们来请求一下api-hyperf下的addOrder接口:

可以看到返回操作成功了,我们去表里看下数据。

dtm表中的数据也是显示该事务是操作成功的

下面我们再请求一次:

可以看到返回商品库存不足,因为我们库存只有10件,上面第一次请求已经卖了10件没库存了,所以库存服务返回库存不足,我们这里也提示出来了。

订单表的相关数据进行了回滚。dtm数据表标识该事务失败

常见问题

  • hyperf框架修改代码后不生效,根目录下执行composer dump-autoload -o
  • tcc模式下Confirm真正执行业务,不作任何业务检查,只使用 Try 阶段预留的业务资源,所以前面try阶段要锁定好confirm需要的资源。
本作品采用《CC 协议》,转载必须注明作者和本文链接
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!