seata分布式事务介绍与搭建

Seata简介

Seata(Simple Extensible Autonomous Transaction Architecture) 是阿里巴巴开源的分布式事务中间件,以高效并且对业务0侵入的方式,解决微服务场景下面临的分布式事务问题。

附上项目github连接:

https://github.com/seata

中文文档连接:http://seata.io/zh-cn/docs/overview/what-is-seata.html

分布式事务产生背景

讲到事务,又得搬出经典的银行转账问题了,下面以实例说明

假设银行(bank)中有两个客户(name)张三和李四
我们需要将张三的1000元存款(sal)转到李四的账户上

目标就是张三账户减1000,李四账户加1000,不能出现中间步骤(张三减1000,李四没加)

假设dao层代码如下

1
2
3
4
5
6
7
public interface BankMapper {
/**
* @param userName 用户名
* @param changeSal 余额变动值
*/
public void updateSal(String userName,int changeSal);
}

对应xml中sql如下

1
2
3
<update id="updateSal">
update bank SET sal = sal+#{changeSal} WHERE name = #{userName}
</update>

如果两个用户对应的银行存款数据在一个数据源中,即一个数据库中,那么service层代码可以如下编写

1
2
3
4
5
6
7
8
9
10
/**
* @param fromUserName 转账人
* @param toUserName 被转账人
* @param changeSal 转账额度
*/
@Transactional(rollbackFor = Exception.class)
public void changeSal(String fromUserName,String toUserName,int changeSal) {
bankMapper.updateSal(fromUserName, -1 * changeSal);
bankMapper.updateSal(toUserName, changeSal);
}

通过spring框架下的@Transactional注解来保证单一数据源增删改查的一致性

但是随着业务的不断扩大,用户数在不断变多,几百万几千万用户时数据可以存一个库甚至一个表里,假设有10个亿的用户?

数据库的水平分割
为了解决数据库上的瓶颈,分库是很常见的解决方案,不同用户就可能落在不同的数据库里,原来一个库里的事务操作,现在变成了跨数据库的事务操作。

此时@Transactional注解就失效了,这就是跨数据库分布式事务问题

微服务化
当然,更多的情形是随着业务不断增长,将业务中不同模块服务拆分成微服务后,同时调用多个微服务所产生的

微服务化的银行转账情景往往是这样的

调用交易系统服务创建交易订单;
调用支付系统记录支付明细;
调用账务系统执行 A 扣钱;
调用账务系统执行 B 加钱;

如图所示,每个系统都对应一个独立的数据源,且可能位于不同机房,同时调用多个系统的服务很难保证同时成功,这就是跨服务分布式事务问题

分布式事务理论基础
两阶段提交(2pc)
两阶段提交协议(Two Phase Commitment Protocol)中,涉及到两种角色

一个事务协调者(coordinator):负责协调多个参与者进行事务投票及提交(回滚)
多个事务参与者(participants):即本地事务执行者

总共处理步骤有两个
(1)投票阶段(voting phase):协调者将通知事务参与者准备提交或取消事务,然后进入表决过程。参与者将告知协调者自己的决策:同意(事务参与者本地事务执行成功,但未提交)或取消(本地事务执行故障);
(2)提交阶段(commit phase):收到参与者的通知后,协调者再向参与者发出通知,根据反馈情况决定各参与者是否要提交还是回滚;

如果所示 1-2为第一阶段,2-3为第二阶段

如果任一资源管理器在第一阶段返回准备失败,那么事务管理器会要求所有资源管理器在第二阶段执行回滚操作。通过事务管理器的两阶段协调,最终所有资源管理器要么全部提交,要么全部回滚,最终状态都是一致的

TCC

基本原理

TCC 将事务提交分为 Try - Confirm - Cancel 3个操作。其和两阶段提交有点类似,Try为第一阶段,Confirm - Cancel为第二阶段,是一种应用层面侵入业务的两阶段提交。

操作方法 含义
Try 预留业务资源/数据效验
Confirm 确认执行业务操作,实际提交数据,不做任何业务检查,try成功,confirm必定成功,需保证幂等
Cancel 取消执行业务操作,实际回滚数据,需保证幂等
其核心在于将业务分为两个操作步骤完成。不依赖 RM 对分布式事务的支持,而是通过对业务逻辑的分解来实现分布式事务。

下面还是以银行转账例子来说明

假设用户user表中有两个字段:可用余额(available_money)、冻结余额(frozen_money)
A扣钱对应服务A(ServiceA)
B加钱对应服务B(ServiceB)
转账订单服务(OrderService)
业务转账方法服务(BusinessService)

ServiceA,ServiceB,OrderService都需分别实现try(),confirm(),cancle()方法,方法对应业务逻辑如下

ServiceA ServiceB OrderService
try() 校验余额(并发控制)
冻结余额+1000
余额-1000 冻结余额+1000 创建转账订单,状态待转账
confirm() 冻结余额-1000 余额+1000
冻结余额-1000 状态变为转账成功
cancle() 冻结余额-1000
余额+1000 冻结余额-1000 状态变为转账失败
其中业务调用方BusinessService中就需要调用
ServiceA.try()
ServiceB.try()
OrderService.try()

1、当所有try()方法均执行成功时,对全局事物进行提交,即由事物管理器调用每个微服务的confirm()方法
2、 当任意一个方法try()失败(预留资源不足,抑或网络异常,代码异常等任何异常),由事物管理器调用每个微服务的cancle()方法对全局事务进行回滚

引用网上一张TCC原理的参考图片

幂等控制

使用TCC时要注意Try - Confirm - Cancel 3个操作的幂等控制,网络原因,或者重试操作都有可能导致这几个操作的重复执行

业务实现过程中需重点关注幂等实现,讲到幂等,以上述TCC转账例子中confirm()方法来说明

在confirm()方法中
余额-1000,冻结余额-1000,这一步是实现幂等性的关键,你会怎么做?

大家在自己系统里操作资金账户时,为了防止并发情况下数据不一致的出现,肯定会避免出现这种代码

1
2
3
4
5
6
7
//根据userId查到账户
Account account = accountMapper.selectById(userId);
//取出当前资金
int availableMoney = account.getAvailableMoney();
account.setAvailableMoney(availableMoney-1000);
//更新剩余资金
accountMapper.update(account);

因为这本质上是一个 读-改-写的过程,不是原子的,在并发情况下会出现数据不一致问题

所以最简单的做法是

1
update account set available_money = available_money-1000 where user_id=#{userId}

这利用了数据库行锁特性解决了并发情况下的数据不一致问题,但是TCC中,单纯使用这个方法适用么?

答案是不行的,该方法能解决并发单次操作下的扣减余额问题,但是不能解决多次操作带来的多次扣减问题,假设我执行了两次,按这种方案,用户账户就少了2000块

那么具体怎么做?上诉转账例子中,可以引入转账订单状态来做判断,若订单状态为已支付,则直接return

1
2
3
if( order!=null && order.getStatus().equals("转账成功")){
return;
}

当然,新建一张去重表,用订单id做唯一建,若插入报错返回也是可以的,不管怎么样,核心就是保证,操作幂等性

空回滚

如下图所示,事务协调器在调用TCC服务的一阶段Try操作时,可能会出现因为丢包而导致的网络超时,此时事务协调器会触发二阶段回滚,调用TCC服务的Cancel操作;

TCC服务在未收到Try请求的情况下收到Cancel请求,这种场景被称为空回滚;TCC服务在实现时应当允许空回滚的执行;

那么具体代码里怎么做呢?
分析下,如果try()方法没执行,那么订单一定没创建,所以cancle方法里可以加一个判断,如果上下文中订单编号orderNo不存在或者订单不存在,直接return

1
2
3
if(orderNo==null || order==null){
return;
}

核心思想就是 回滚请求处理时,如果对应的具体业务数据为空,则返回成功

当然这种问题也可以通过中间件层面来实现,如,在第一阶段try()执行完后,向一张事务表中插入一条数据(包含事务id,分支id),cancle()执行时,判断如果没有事务记录则直接返回,但是现在还不支持

防悬挂

如下图所示,事务协调器在调用TCC服务的一阶段Try操作时,可能会出现因网络拥堵而导致的超时,此时事务协调器会触发二阶段回滚,调用TCC服务的Cancel操作;在此之后,拥堵在网络上的一阶段Try数据包被TCC服务收到,出现了二阶段Cancel请求比一阶段Try请求先执行的情况;

用户在实现TCC服务时,应当允许空回滚,但是要拒绝执行空回滚之后到来的一阶段Try请求;
在这里插入图片描述
这里又怎么做呢?

可以在二阶段执行时插入一条事务控制记录,状态为已回滚,这样当一阶段执行时,先读取该记录,如果记录存在,就认为二阶段回滚操作已经执行,不再执行try方法;

事务消息

事务消息更倾向于达成分布式事务的最终一致性,适用于分布式事务的提交或回滚只取决于事务发起方的业务需求,如A给B打了款并且成功了,那么下游业务B一定需要加钱这种场景,或许下了单,用户积分一定得增加这种场景。RocketMQ4.3中已经开源了事务消息,具体设计思路分析及demo演示,大家有兴趣可以看下我写的这篇文章

https://blog.csdn.net/hosaos/article/details/90050276

Demo上手-AT模式Dubbo集成Seata

Demo的github项目名称是seata-samples,链接如下

https://github.com/seata/seata-samples

要跑demo例子,首先需要下载上述链接官方demo,我下面以IDEA为例子演示demo中dubbo的分布式事务例子,另外还需要一个seata-server,我下的版本是1.4.1,链接如下

https://github.com/seata/seata/releases

先看下seata-example的项目结构

其中dubbo模块就是dubbo的demo

配置修改

由于我本地启动了Zookeeper服务端做dubbo注册中心,所以我修改了4个dubbo配置文件中的注册中心为zk,官方默认的是用的广播,没有Zk用广播或者Redis做注册中心都可以

数据库地址,由于我们是针对Rpc远程服务做分布式事务测试,所以数据库用一个也能达到测试效果,本地启动Mysql服务,并新建名为fescar的数据库

再执行dubbo_biz.sql中的建表语句,创建storage_tbl,order_tbl,account_tbl 3个业务表
再执行undo_log.sql 创建seata所需记录undolog的回滚日志表

并配置数据库连接地址

启动测试
先启动seata-server,启动方式
linux:

sh fescar-server.sh $LISTEN_PORT $PATH_FOR_PERSISTENT_DATA

参数有两个,LISTEN_PORT代表端口号,PATH_FOR_PERSISTENT_DATA表示Seata持久化数据存放路径

将安装包解压后cd到bin目录下,我这里指定端口号为8091,data路径为我自己创建的一个目录

sh fescar-server.sh 8091 /Users/chenyin/fescar/data

windows:
直接点击:bin/seata-server.bat

启动成功效果图如下

回到项目代码中

先启动DubboAccountServiceStarter,其初始化时向account_tbl表中插入一个用户编号为
U100001的用户,初始金额为999

再启动DubboOrderServiceStarter,DubboStorageServiceStarter,DubboStorageServiceStarter中默认初始化一个商品编号为C00321的商品,初始库存100

看下BusinessService业务处理类,里面调用了库存类(StorageService)扣减库存,调用订单类(OrderService)下单,其中手动抛出RuntimeException模拟分布式事务中的异常情况

1
2
3
4
5
6
7
8
@GlobalTransactional(timeoutMills = 300000, name = "dubbo-demo-tx")
public void purchase(String userId, String commodityCode, int orderCount) {
LOGGER.info("purchase begin ... xid: " + RootContext.getXID());
storageService.deduct(commodityCode, orderCount);
orderService.create(userId, commodityCode, orderCount);
throw new RuntimeException("xxx");

}

如果没有throw new RuntimeException(“xxx”); 正确的业务操作结果是用户账户余额减400变成599,库存减2变成98,具体为什么是余额-400,库存-2大家看下demo中具体业务类的代码就知道,不多说。异常的情况,也就是分布式事务回滚的情况,应该是余额还是999,库存还是100

先看下有抛出异常的情况,启动业务类,DubboBusinessTester,执行结果如下

检查下数据库中数据是否正确,有没发生数据未回滚的情况。

数据正确无误,证明数据正确的回滚了。上面介绍过了,Seata是根据undolog中记录来回滚的,但是异常回滚后undolog表却为空?怎么回事,这是因为undolog日志被删除了,想要看到undolog表中记录,我们打断点来看,在异常还没抛出时打断点,看下数据库undolog表中数据情况。

断点处触发后,查看undolog表,可以看到3条记录,3个branch_id对应3个rpc分支事务,也就对应3个业务表的回滚日志,一个xid标识这3个分支事务处于一个全局分布式事务中

其中rollbackinfo字段是bytes类型,看不到具体数据,怎么办?我们把数据导成txt格式

数据内容如下,可以看到是BASE64加密过的,进行解密

最终内容如下,我只贴出第一行数据中的rollback_info,可以看到其中记录了数据操作前后的镜像数据beforeImage,afterImage,如果发生回滚,可以通过xid,branchid定位到undolog中的rollback_info,并将beforeImage中内容反解析成sql来达到回滚目的的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
{
"branchId": 2008522332,
"sqlUndoLogs": [
{
"afterImage": {
"rows": [
{
"fields": [
{
"keyType": "PrimaryKey",
"name": "id",
"type": 4,
"value": 3
},
{
"keyType": "NULL",
"name": "count",
"type": 4,
"value": 98
}
]
}
],
"tableName": "storage_tbl"
},
"beforeImage": {
"rows": [
{
"fields": [
{
"keyType": "PrimaryKey",
"name": "id",
"type": 4,
"value": 3
},
{
"keyType": "NULL",
"name": "count",
"type": 4,
"value": 100
}
]
}
],
"tableName": "storage_tbl"
},
"sqlType": "UPDATE",
"tableName": "storage_tbl"
}
],
"xid": "192.168.202.197:8091:2008522331"
}

Demo上手-TCC模式Dubbo集成Seata

tcc模块下有个transfer-tcc-sample项目,不过数据库不是Mysql的,下面进行部分修改并演示

先看下代码结构

核心是action包下的2个类,都暴露成了dubbo服务,同时使用注解标记为TCC服务,并实现try-commit-cancle方法
FirsetAction-对应扣钱service
SecondAction-对应加钱service

看下FirsetAction源码,@TwoPhaseBusinessAction是TCC服务参与者必须加的注解,指定服务名称,提交方法commitMethod及回滚方法rollbackMethod,SecondAction同理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public interface FirstTccAction {

/**
* 一阶段方法
*
* @param businessActionContext
* @param accountNo
* @param amount
*/
@TwoPhaseBusinessAction(name = "firstTccAction", commitMethod = "commit", rollbackMethod = "rollback")
public boolean prepareMinus(BusinessActionContext businessActionContext,
@BusinessActionContextParameter(paramName = "accountNo") String accountNo,
@BusinessActionContextParameter(paramName = "amount") double amount);

/**
* 二阶段提交
* @param businessActionContext
* @return
*/
public boolean commit(BusinessActionContext businessActionContext);

/**
* 二阶段回滚
* @param businessActionContext
* @return
*/
public boolean rollback(BusinessActionContext businessActionContext);
}

transfer包下TransferService是具体业务实现类(即转账操作类)

看下TransferServiceImpl源码,转账方法上加了 @GlobalTransactional 将方法纳入事务管理器管理范围

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class TransferServiceImpl implements TransferService {

private FirstTccAction firstTccAction;

private SecondTccAction secondTccAction;

/**
* 转账操作
* @param from 扣钱账户
* @param to 加钱账户
* @param amount 转账金额
* @return
*/
@Override
@GlobalTransactional
public boolean transfer(final String from, final String to, final double amount) {
//扣钱参与者,一阶段执行
boolean ret = firstTccAction.prepareMinus(null, from, amount);

if(!ret){
//扣钱参与者,一阶段失败; 回滚本地事务和分布式事务
throw new RuntimeException("账号:["+from+"] 预扣款失败");
}

//加钱参与者,一阶段执行
ret = secondTccAction.prepareAdd(null, to, amount);

if(!ret){
throw new RuntimeException("账号:["+to+"] 预收款失败");
}

System.out.println(String.format("transfer amount[%s] from [%s] to [%s] finish.", String.valueOf(amount), from, to));
return true;
}

public void setFirstTccAction(FirstTccAction firstTccAction) {
this.firstTccAction = firstTccAction;
}

public void setSecondTccAction(SecondTccAction secondTccAction) {
this.secondTccAction = secondTccAction;
}
}

配置修改

由于我本地启动的Mysql服务,而demo是以h2Database为例,所以pom中引入mysql相关包

1
2
3
4
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>

修改from-datasource-bean.xml,from-datasource-bean.xml中数据源配置,我这里转出账户对应xa1数据库,转入账户对应xa2数据库

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
<bean id="fromAccountDataSource"  class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName">
<value>com.mysql.jdbc.Driver</value>
</property>
<property name="url">
<value>jdbc:mysql://localhost:3306/xa1</value>
</property>
<property name="username">
<value>root</value>
</property>
<property name="password">
<value>123456</value>
</property>
</bean>
<bean id="toAccountDataSource" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName">
<value>com.mysql.jdbc.Driver</value>
</property>
<property name="url">
<value>jdbc:mysql://localhost:3306/xa2</value>
</property>
<property name="username">
<value>root</value>
</property>
<property name="password">
<value>123456</value>
</property>
</bean>

启动测试

1、配置修改完毕,先在本地启动seata-server

2、然后启动 TransferProviderStarter 暴露dubbo服务并初始化数据库,数据库初始化完后,数据如下

xa1库account表

xa2库account表

3、启动TransferApplication,其main方法执行2个子方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* 执行转账成功 demo
*
* @param initAmount 初始化余额
* @param transferAmount 转账余额
*/
private static void doTransferSuccess(double initAmount, double transferAmount) throws SQLException {
//执行转账操作
doTransfer("A", "C", transferAmount);

//校验A账户余额:initAmount - transferAmount
checkAmount(fromAccountDAO, "A", initAmount - transferAmount);

//校验C账户余额:initAmount + transferAmount
checkAmount(toAccountDAO, "C", initAmount + transferAmount);
}

/**
* 执行转账 失败 demo, 'B' 向未知用户 'XXX' 转账,转账失败分布式事务回滚
* @param initAmount 初始化余额
* @param transferAmount 转账余额
*/
private static void doTransferFailed(int initAmount, int transferAmount) throws SQLException {
// 'B' 向未知用户 'XXX' 转账,转账失败分布式事务回滚
try{
doTransfer("B", "XXX", transferAmount);
}catch (Throwable t){
System.out.println("从账户B向未知账号XXX转账失败.");
}

//校验A2账户余额:initAmount
checkAmount(fromAccountDAO, "B", initAmount);

//账户XXX 不存在,无需校验
}

执行结果应该是doTransferSuccess()执行成功,A账户变成90,C账户变成110
doTransferFailed()执行失败(secondTccAction的try方法中有对转账接收账户做校验,账户不存在,抛异常),B账户数据还是100

执行下看下结果

xa1库数据如下

xa2库数据如下

说明TCC分布式事务生效,如果不是微服务带来的分布式事务问题,而是本地分库操作带来的事务问题,可以看下local-tcc-sample例子

该demo中并未对3个方法做幂等控制,实际业务实现中需多加注意

最后贴上Seata中AT、TCC模式源码的分析,有兴趣的可以看一下哦
Seata实战-AT模式源码分析
Seata实战-TCC模式源码分析