php基于dtm分布式事务管理器实现tcc模式分布式事务demo
项目里遇到两个系统互相调但不能保证事务一致性的问题,凑巧这段时间跟着dtm富哥学习了一些分布式事务的知识,大概了解了一些分布式事务的使用场景,看了看官方的demo后,简简单单自己写了个demo,代码已经上传到码云,分布式事务模式我们使用的是tcc,即(try,confirm,cancel),调用方式采用的是http,关于tcc可以到这里了解更多。
dtm是一个go语言开发的分布式事务管理器,在这里我们先将dtm跑起来。
## 拉取代码
cd /home
git clone https://github.com/dtm-labs/dtm && cd dtm
按照官方文档,dtm依赖于mysql,安装了docker20.04+之后,你可以通过
docker-compose -f helper/compose.mysql.yml
在docker里面启动mysql服务。你也可以像我一样使用现有的mysql服务,只需要在dtm项目下:
cp conf.sample.yml conf.yml # 修改conf.yml
将配置文件复制一份出来并修改一下相关的信息:
Store: # specify which engine to store trans status
Driver: 'mysql'
Host: 'localhost'
User: 'root'
Password: ''
Port: 3306
ExamplesDB:
Driver: 'mysql'
Host: 'localhost'
User: 'root'
Password: ''
Port: 3306
这里要注意你配置的mysql账户必须有创建dtm数据库的权限。改完了之后我们就可以启动项目了(前提是你本地已经有了go环境,不然还是要通过docker,通过docker怎么启动可以查看下dtm官方文档):
// 入口文件是app/main.go
go run app/main.go dev
看到下图这样的就是启动成功了,并且会有一个定时任务不停的打印日志
你也可以使用数据库工具查看是否多了一个dtm的数据库。根据性能测试目前dtm搭配mysql可以处理的事务大概为900多个每秒,我写这篇文章的时候已经看到作者发了搭配redis做为存储引擎可以达到每秒处理1W+事务的文章,如果业务上有这么高的要求可以使用redis做为dtm的存储引擎,具体参考(mp.weixin.qq.com/s/lmIVQ2aVksZxiCx...) OK,dtm到了这里就不用去管它了,下面我们看具体的业务。
我们模拟的是一个下单扣库存服务,分别有订单服务、库存服务。php方面使用的hyperf框架,其他框架也可以,项目结构如下:
|—— php-dtm-tcc/
|—— api-hyperf/
|—— order-server/
|—— stock-server/
|—— sql/index.sql ## 建表语句
这里为了简单点数据库我就使用的同一个了,实际操作不同的系统可以使用不同的数据库,数据库表有商品表、商品库存表、订单表、订单商品表、库存锁定表。
请求先到api-hyperf
再调用两个子服务order-server
和stock-server
。
订单服务生成写入订单、订单商品等记录,库存服务扣减库存,锁定库存等操作。
在api-hyperf
项目的composer.json
里加上下面两个包
"linxx/dtmcli-php": "*",//原本是dtm/dtmcli-php,但我觉得他那个包里返回的数据不是我想要的,所以基于原包的情况下我将返回数据修改了一下,下面会做对比
"mix/vega": "^3.0"
然后我们分别启动api-hyperf
服务、order-server
服务、stock-server
服务。
在写具体的代码前我们先来看看整体的流程大概是怎么样的,这里引用一段官方demo的代码:
<?php
require __DIR__ . '/vendor/autoload.php';
function FireTcc () {
$dtm = 'http://localhost:36789/api/dtmsvr';
$svc = 'http://localhost:4005/api';
Dtmcli\tccGlobalTransaction($dtm, function ($tcc) use ($svc) {
/** @var Dtmcli\Tcc $tcc */
$req = ['amount' => 30];
echo 'calling trans out' . PHP_EOL;
$tcc->callBranch($req, $svc . '/TransOutTry', $svc . '/TransOutConfirm', $svc . '/TransOutCancel');
echo 'calling trans in' . PHP_EOL;
$tcc->callBranch($req, $svc . '/TransInTry', $svc . '/TransInConfirm', $svc . '/TransInCancel');
});
}
$vega = new Mix\Vega\Engine();
//转出try
$vega->handleFunc('/api/TransOutTry', function (Mix\Vega\Context $ctx) {
var_dump('TransOutTry', $ctx->request->getQueryParams(), $ctx->request->getParsedBody());
$ctx->JSON(200, ['result' => 'SUCCESS']);
})->methods('POST');
//转出confirm
$vega->handleFunc('/api/TransOutConfirm', function (Mix\Vega\Context $ctx) {
var_dump('TransOutConfirm', $ctx->request->getQueryParams(), $ctx->request->getParsedBody());
$ctx->JSON(200, ['result' => 'SUCCESS']);
})->methods('POST');
//转出commit
$vega->handleFunc('/api/TransOutCancel', function (Mix\Vega\Context $ctx) {
var_dump('TransOutCancel', $ctx->request->getQueryParams(), $ctx->request->getParsedBody());
$ctx->JSON(200, ['result' => 'SUCCESS']);
})->methods('POST');
//转入try
$vega->handleFunc('/api/TransInTry', function (Mix\Vega\Context $ctx) {
var_dump('TransInTry', $ctx->request->getQueryParams(), $ctx->request->getParsedBody());
$ctx->JSON(200, ['result' => 'SUCCESS']);
})->methods('POST');
//转入confirm
$vega->handleFunc('/api/TransInConfirm', function (Mix\Vega\Context $ctx) {
var_dump('TransInConfirm', $ctx->request->getQueryParams(), $ctx->request->getParsedBody());
$ctx->JSON(200, ['result' => 'SUCCESS']);
})->methods('POST');
//转入commit
$vega->handleFunc('/api/TransInCancel', function (Mix\Vega\Context $ctx) {
var_dump('TransInCancel', $ctx->request->getQueryParams(), $ctx->request->getParsedBody());
$ctx->JSON(200, ['result' => 'SUCCESS']);
})->methods('POST');
$vega->handleFunc('/api/FireTcc', function (Mix\Vega\Context $ctx) {
FireTcc();
$ctx->JSON(200, ['result' => 'SUCCESS']);
})->methods('POST');
$http_worker = new Workerman\Worker("http://0.0.0.0:4005");
$http_worker->onMessage = $vega->handler();
$http_worker->count = 4;
Workerman\Worker::runAll();
由上面这个例子我们看到,这是一个仿照转账业务的操作,首先定义了转出、出入的try、confirm、cancel操作,在这个workerman服务启动起来后请求http:127.0.0.1:4006/api/FireTcc
地址,会调用FireTcc()
函数,这个函数里有一个tccGlobalTransaction
函数,这个函数里就是dtm管理事务的相关操作,官方这个包dtm/dtmcli-php
的tccGlobalTransaction
函数返回值是gid(分布式事务id),在整个事务操作成功的时候返回分布式事务id,事务失败时返回空。
function tccGlobalTransaction(string $dtmUrl, callable $cb): string
{
$tcc = new Tcc($dtmUrl, genGid($dtmUrl));
$tbody = [
'gid' => $tcc->gid,
'trans_type' => 'tcc',
];
$client = new \GuzzleHttp\Client();
try {
$response = $client->post($tcc->dtm . '/prepare', ['json' => $tbody]);
checkStatus($response->getStatusCode());
$cb($tcc);
$client->post($tcc->dtm . '/submit', ['json' => $tbody]);
} catch (\Throwable $e) {
$client->post($tcc->dtm . '/abort', ['json' => $tbody]);
return '';
}
return $tcc->gid;
}
linxx/dtmcli-php
这个包返回的是一个数组:
function tccGlobalTransaction(string $dtmUrl, callable $cb): array
{
$tcc = new Tcc($dtmUrl, genGid($dtmUrl));
$tbody = [
'gid' => $tcc->gid,
'trans_type' => 'tcc',
];
$client = new \GuzzleHttp\Client();
$message = '操作成功';
$gid = $tcc->gid;
try {
$response = $client->post($tcc->dtm . '/prepare', ['json' => $tbody]);
checkStatus($response->getStatusCode());
$cb($tcc);
$client->post($tcc->dtm . '/submit', ['json' => $tbody]);
} catch (\Throwable $e) {
$client->post($tcc->dtm . '/abort', ['json' => $tbody]);
$message = $e->getMessage();
$gid = '';
}
return [
'gid' => $gid,
'message' => $message
];
}
除了gid之外还返回了一个message字段,这样可以将子服务失败的原因也返回,方便api-hyperf
服务提示错误信息。
下面我们就开始写具体的业务代码了,我们先到order-server
和stock-server
写上相应的try、confirm、cancel
操作。
订单服务:
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace App\Controller;
use Hyperf\DbConnection\Db;
class Order extends AbstractController
{
public function addOrderTry()
{
Db::beginTransaction();
try {
$postData = $this->request->post();
$totalAmount = 0;
$totalNumber = 0;
foreach ($postData['goods'] as &$value) {
$totalNumber = $value['number'] + $totalNumber;
$goodInfo = Db::table('good')->where('id', $value['good_id'])->first();
$amount = $value['number'] * $goodInfo->price;
$value['amount'] = $amount;
$value['price'] = $goodInfo->price;
$totalAmount = $totalAmount + $amount;
}
unset($value);
if (round($totalAmount, 2) != round($postData['pay_amount'], 2)) {
throw new \Exception('商品金额计算错误', 10010);
}
$orderId = Db::table('order')->insertGetId([
'user_id' => $postData['user_id'],
'order_no' => $postData['order_no'],
'total_amount' => $totalAmount,
'total_number' => $totalNumber,
'create_time' => time(),
'update_time' => time(),
]);
foreach ($postData['goods'] as $value) {
Db::table('order_goods')->insert([
'order_id' => $orderId,
'good_id' => $value['good_id'],
'number' => $value['number'],
'amount' => $value['amount'],
'price' => $value['price'],
'create_time' => time(),
'update_time' => time(),
]);
}
Db::commit();
return [
'code' => 0,
'data' => 'SUCCESS',
'msg' => '成功',
];
} catch (\Exception $e) {
Db::rollBack();
return [
'code' => 10010,
'data' => 'FAILURE',
'msg' => $e->getMessage(),
];
}
}
public function addOrderConfirm()
{
Db::beginTransaction();
try {
$postData = $this->request->post();
$orderInfo = Db::table('order')->where('order_no', $postData['order_no'])->first();
Db::table('order')->where('id', $orderInfo->id)->update([
'is_ok' => 1,
'update_time' => time(),
]);
Db::table('order_goods')->where('order_id', $orderInfo->id)->update([
'is_ok' => 1,
'update_time' => time(),
]);
Db::commit();
return [
'code' => 0,
'data' => 'SUCCESS',
'msg' => '成功',
];
} catch (\Exception $e) {
Db::rollBack();
return [
'code' => 10010,
'data' => 'FAILURE',
$e->getMessage(),
];
}
}
public function addOrderCancel()
{
Db::beginTransaction();
try {
$postData = $this->request->post();
$orderInfo = Db::table('order')->where('order_no', $postData['order_no'])->first();
Db::table('order')->where('id', $orderInfo->id)->update([
'delete_time' => time(),
]);
Db::table('order_goods')->where('order_id', $orderInfo->id)->update([
'delete_time' => time(),
]);
Db::commit();
return [
'code' => 0,
'data' => 'SUCCESS',
'msg' => '成功',
];
} catch (\Exception $e) {
Db::rollBack();
return [
'code' => 10010,
'data' => 'FAILURE',
'msg' => $e->getMessage(),
];
}
}
}
库存服务:
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace App\Controller;
use Hyperf\DbConnection\Db;
class Stock extends AbstractController
{
public function decGoodStockTry()
{
Db::beginTransaction();
try {
$postData = $this->request->post();
foreach ($postData['goods'] as $value) {
$goodInfo = Db::table('good')->where('id', $value['good_id'])->first();
if (empty($goodInfo)) {
throw new \Exception('商品不存在', 10010);
}
//查寻库存
$stockInfo = Db::table('good_stock')->where('good_id', $value['good_id'])->first();
if (empty($goodInfo)) {
throw new \Exception('商品库存为空', 10010);
}
if (round($value['number'], 2) > round($stockInfo->total_number, 2)) {
throw new \Exception('商品库存不足', 10010);
}
Db::table('good_stock')->where('id', $stockInfo->id)->decrement('total_number', $value['number']);
//锁定库存
Db::table('good_stock_lock')->insert([
'order_no' => $postData['order_no'],
'good_id' => $value['good_id'],
'number' => $value['number'],
'create_time' => time(),
'update_time' => time(),
]);
//@todo 库存记录
}
Db::commit();
return [
'code' => 0,
'data' => 'SUCCESS',
'msg' => '成功',
];
} catch (\Exception $e) {
Db::rollBack();
return [
'code' => 10010,
'data' => 'FAILURE',
'msg' => $e->getMessage(),
];
}
}
public function decGoodStockConfirm()
{
Db::beginTransaction();
try {
$postData = $this->request->post();
foreach ($postData['goods'] as $value) {
$info = Db::table('good_stock_lock')->where('good_id', $value['good_id'])
->where('order_no', $postData['order_no'])
->first();
if (! empty($info) && $info->status == 0) {
Db::table('good_stock_lock')->where('id', $info->id)->update([
'status' => '1',
'update_time' => time(),
]);
}
}
Db::commit();
return [
'code' => 0,
'data' => 'SUCCESS',
'msg' => '成功',
];
} catch (\Exception $e) {
Db::rollBack();
return [
'code' => 10010,
'data' => 'FAILURE',
'msg' => $e->getMessage(),
];
}
}
public function decGoodStockCancel()
{
Db::beginTransaction();
try {
$postData = $this->request->post();
foreach ($postData['goods'] as $value) {
$info = Db::table('good_stock_lock')->where('good_id', $value['good_id'])
->where('order_no', $postData['order_no'])
->first();
if (! empty($info) && $info->status == 0) {
Db::table('good_stock_lock')->where('id', $info->id)->update([
'status' => '2',
'update_time' => time(),
]);
Db::table('good_stock')->where('good_id', $value['good_id'])->increment('total_number', $value['number']);
}
}
Db::commit();
return [
'code' => 0,
'data' => 'SUCCESS',
'msg' => '成功',
];
} catch (\Exception $e) {
Db::rollBack();
return [
'code' => 10010,
'data' => 'FAILURE',
'msg' => $e->getMessage(),
];
}
}
}
再到api-hyperf
定义调用相关的操作:
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace App\Controller;
use App\Constants\ErrorCode;
use App\Utils\DtmCli;
class Order extends AbstractController
{
public function addOrder()
{
$orderNo = get_order_no();
$req = [
'user_id' => 1,
'pay_amount' => 100,
'order_no' => $orderNo,
'goods' => [
[
'good_id' => 10000,
'number' => 10,
],
],
];
$serverList = [
[
'server' => 'http://localhost:9551', //可使用etcd等服务注册发现中间件
'try' => 'addOrderTry',
'confirm' => 'addOrderConfirm',
'cancel' => 'addOrderCancel',
],
[
'server' => 'http://localhost:9552',
'try' => 'decGoodStockTry',
'confirm' => 'decGoodStockConfirm',
'cancel' => 'decGoodStockCancel',
],
];
$ret = DtmCli::handleDtmTransaction($serverList, $req);
if ($ret['is_ok']) {
return $this->success([]);
}
return $this->success([], ErrorCode::CODE_ERROR, $ret['message']);
}
}
DtmCli::handleDtmTransaction
是我封装的一个操作:
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace App\Utils;
use App\Constants\ErrorCode;
use Dtmcli\Tcc;
use function Dtmcli\tccGlobalTransaction;
class DtmCli
{
/**
* @param $serverList
* @param $req
*/
public static function handleDtmTransaction($serverList, $req): array
{
$isOk = false;
try {
$dtm = 'http://localhost:36789/api/dtmsvr';//dtm服务地址
if (empty($serverList)) {
throw new \Exception('子服务不能为空', ErrorCode::CODE_ERROR);
}
$ret = tccGlobalTransaction($dtm, function ($tcc) use ($req, $serverList) {
/*
* @var Tcc $tcc
*/
foreach ($serverList as $value) {
if (empty($value['server']) || empty($value['try']) || empty($value['confirm']) || empty($value['cancel'])) {
throw new \Exception('子服务错误', ErrorCode::CODE_ERROR);
}
$tryUrl = $value['server'] . '/' . $value['try'];
$confirmUrl = $value['server'] . '/' . $value['confirm'];
$cancelUrl = $value['server'] . '/' . $value['cancel'];
$tcc->callBranch($req, $tryUrl, $confirmUrl, $cancelUrl);
}
});
if (! empty($ret['gid'])) {
$isOk = true;
}
$message = $ret['message'];
} catch (\Exception $e) {
$message = $e->getMessage();
}
return [
'is_ok' => $isOk,
'message' => $message,
];
}
}
都写好了之后我们来请求一下api-hyperf
下的addOrder
接口:
可以看到返回操作成功了,我们去表里看下数据。
dtm表中的数据也是显示该事务是操作成功的
下面我们再请求一次:
可以看到返回商品库存不足,因为我们库存只有10件,上面第一次请求已经卖了10件没库存了,所以库存服务返回库存不足,我们这里也提示出来了。
订单表的相关数据进行了回滚。dtm数据表标识该事务失败
常见问题
- hyperf框架修改代码后不生效,根目录下执行
composer dump-autoload -o
- tcc模式下Confirm真正执行业务,不作任何业务检查,只使用 Try 阶段预留的业务资源,所以前面try阶段要锁定好confirm需要的资源。
本作品采用《CC 协议》,转载必须注明作者和本文链接
赞👍
假如 try 阶段addOrderTry,decGoodStockTry 都成功了, 但是confim 的阶段 decGoodStockConfirmh或者decGoodStockConfirm失 败了, 或者是有一个 cancel 的阶段 失败了 内部会怎么处理的
请教:此处使用etcd等中山发现中间件,配置该怎么写呢?