当前位置:网站首页>seata源码解析:事务状态及全局锁的存储
seata源码解析:事务状态及全局锁的存储
2022-08-05 09:01:00 【51CTO】

事务状态的存储
在seata中,无论你使用哪种事务模式,都会将全局事务状态和分支事务状态存储下来。有三种存储模式可供你选择,db,redis,file。
本文只会详细分析db这一种存储模式,其他的类似。
当使用db这一种模式时,seata server都会将全局事务的状态存在global_table表中,将分支事务的状态存在branch_table表中,如下所示
-- the table to store GlobalSession data
CREATE
TABLE IF
NOT EXISTS `global_table`
(
`xid`
VARCHAR
(
128
)
NOT
NULL
,
`transaction_id`
BIGINT
,
`status`
TINYINT
NOT
NULL
,
`application_id`
VARCHAR
(
32
)
,
`transaction_service_group`
VARCHAR
(
32
)
,
`transaction_name`
VARCHAR
(
128
)
,
`timeout`
INT
,
`begin_time`
BIGINT
,
`application_data`
VARCHAR
(
2000
)
,
`gmt_create`
DATETIME
,
`gmt_modified`
DATETIME
,
PRIMARY KEY
(`xid`
)
,
KEY `idx_gmt_modified_status`
(`gmt_modified`
, `status`
)
,
KEY `idx_transaction_id`
(`transaction_id`
)
) ENGINE
= InnoDB
DEFAULT CHARSET
= utf8
;
-- the table to store BranchSession data
CREATE
TABLE IF
NOT EXISTS `branch_table`
(
`branch_id`
BIGINT
NOT
NULL
,
`xid`
VARCHAR
(
128
)
NOT
NULL
,
`transaction_id`
BIGINT
,
`resource_group_id`
VARCHAR
(
32
)
,
`resource_id`
VARCHAR
(
256
)
,
`branch_type`
VARCHAR
(
8
)
,
`status`
TINYINT
,
`client_id`
VARCHAR
(
64
)
,
`application_data`
VARCHAR
(
2000
)
,
`gmt_create`
DATETIME
(
6
)
,
`gmt_modified`
DATETIME
(
6
)
,
PRIMARY KEY
(`branch_id`
)
,
KEY `idx_xid`
(`xid`
)
) ENGINE
= InnoDB
DEFAULT CHARSET
= utf8
;
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- 35.
- 36.
- 37.
- 38.
global_table表中status表示的含义为全局事务的状态,对应的枚举类为GlobalStatus
branch_table表中status表示的含义为分支事务的状态,对应的枚举类为BranchStatus
当事务发生问题时,我们就可以通过global_table和branch_table的status字段判断事务是哪一步执行失败了。
全局事务的状态表

分支事务的状态表

状态解释来自官网:http://seata.io/zh-cn/docs/user/appendix/global-transaction-status.html
源码分析
前面的文章我们说过当执行如下命令时,会调用Server.java中的main方法
public
class
Server {
public
static
void
main(
String[]
args)
throws
IOException {
// 省略部分代码...
// SessionHolder负责事务日志的持久化存储
// 设置存储模式,有三种可选类型,file,db,redis
SessionHolder.
init(
parameterParser.
getStoreMode());
// 创建事务协调器
DefaultCoordinator
coordinator
=
new
DefaultCoordinator(
nettyRemotingServer);
// 初始化5个定时任务
coordinator.
init();
// 省略部分代码...
System.
exit(
0);
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
SessionHolder#init根据传入的存储模式初始化4个不同的SessionManager,SessionManager是用来存储全局事务和分支事务状态的。其中全局事务用GlobalSession来表示,分支事务用BranchSession来表示,一个GlobalSession包含多个BranchSession
// 保存了所有的GlobalSession
private
static
SessionManager
ROOT_SESSION_MANAGER;
// 需要异步commit的GlobalSession
private
static
SessionManager
ASYNC_COMMITTING_SESSION_MANAGER;
// 需要重试commit的GlobalSession
private
static
SessionManager
RETRY_COMMITTING_SESSION_MANAGER;
// 需要重试roollback的GlobalSession
private
static
SessionManager
RETRY_ROLLBACKING_SESSION_MANAGER;
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
在init方法中通过SPI的方式实例化对应的类
// SessionHolder
public
static
void
init(
String
mode) {
if (
StringUtils.
isBlank(
mode)) {
mode
=
CONFIG.
getConfig(
ConfigurationKeys.
STORE_MODE);
}
StoreMode
storeMode
=
StoreMode.
get(
mode);
if (
StoreMode.
DB.
equals(
storeMode)) {
// 通过spi加载SessionManager
ROOT_SESSION_MANAGER
=
EnhancedServiceLoader.
load(
SessionManager.
class,
StoreMode.
DB.
getName());
ASYNC_COMMITTING_SESSION_MANAGER
=
EnhancedServiceLoader.
load(
SessionManager.
class,
StoreMode.
DB.
getName(),
new
Object[] {
ASYNC_COMMITTING_SESSION_MANAGER_NAME});
RETRY_COMMITTING_SESSION_MANAGER
=
EnhancedServiceLoader.
load(
SessionManager.
class,
StoreMode.
DB.
getName(),
new
Object[] {
RETRY_COMMITTING_SESSION_MANAGER_NAME});
RETRY_ROLLBACKING_SESSION_MANAGER
=
EnhancedServiceLoader.
load(
SessionManager.
class,
StoreMode.
DB.
getName(),
new
Object[] {
RETRY_ROLLBACKING_SESSION_MANAGER_NAME});
}
else
if (
StoreMode.
FILE.
equals(
storeMode)) {
// 省略其他存储方式的加载逻辑
}
// 删除已经完成的GlobalSession
reload(
storeMode);
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
4种类型的SessionManager都是同一个实例,只是调用的构造方法不同而已。以DataBaseSessionManager为例,ROOT_SESSION_MANAGER调用了无参数构造函数,而其他SessionManager传入了taskName属性
为什么要搞4个SessionManager?
其实就是用不同的SessionManager管理不同状态的任务,这样逻辑比较清晰。
当不同的SessionManager调用allSessions方法时,返回的就是对应状态的GlobalSession,逻辑比较清晰
DataBaseSessionManager#allSessions

我们来看一下SessionManager的继承关系

SessionLifecycleListener看接口名字就是基于观察者模式设计的,当GlobalSession状态发生改变的时候,会发布通知给监听者,然后监听者做相应动作。目前SessionLifecycleListener接口的实现类只有各种SessionManager,当收到状态改变的通知时,将其状态存储下来
AbstractSessionManager则是一个抽象类,当SessionLifecycleListener接口方法被回调时,调用SessionManager定义的动作方法
public
abstract
class
AbstractSessionManager
implements
SessionManager,
SessionLifecycleListener {
protected
TransactionStoreManager
transactionStoreManager;
// 省略部分代码
// 重写了SessionManager接口方法
@Override
public
void
addGlobalSession(
GlobalSession
session)
throws
TransactionException {
if (
LOGGER.
isDebugEnabled()) {
LOGGER.
debug(
"MANAGER["
+
name
+
"] SESSION["
+
session
+
"] "
+
LogOperation.
GLOBAL_ADD);
}
writeSession(
LogOperation.
GLOBAL_ADD,
session);
}
// 重写了SessionManager接口方法
@Override
public
void
updateGlobalSessionStatus(
GlobalSession
session,
GlobalStatus
status)
throws
TransactionException {
if (
LOGGER.
isDebugEnabled()) {
LOGGER.
debug(
"MANAGER["
+
name
+
"] SESSION["
+
session
+
"] "
+
LogOperation.
GLOBAL_UPDATE);
}
writeSession(
LogOperation.
GLOBAL_UPDATE,
session);
}
// 重写了SessionLifecycleListener接口方法
@Override
public
void
onBegin(
GlobalSession
globalSession)
throws
TransactionException {
addGlobalSession(
globalSession);
}
// 重写了SessionLifecycleListener接口方法
@Override
public
void
onStatusChange(
GlobalSession
globalSession,
GlobalStatus
status)
throws
TransactionException {
updateGlobalSessionStatus(
globalSession,
status);
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- 35.
- 36.
- 37.
可以看到AbstractSessionManager并没有实现SessionManager接口的方法,而是直接抛出异常,说明具体的存储逻辑交给子类来实现了
AbstractSessionManager有3个实现类,说明seata支持3种存储模式。而最终存储的工作是交给TransactionStoreManager来实现的

这些增加事务和事务状态变化的持久化操作非常简单,就是执行插入sql和更新sql
说回我们的启动流程,在DefaultCoordinator#init方法中,初始化5个定时任务
- retryRollbacking:分支事务回滚失败时,不断重试
- retryCommitting:分支事务提交失败时,不断重试
- asyncCommitting:执行异步commit,用在at模式,因为at模式的commit操作其实就是删除undolog,可以异步执行
- timeoutCheck:当事务处于开始状态,将状态设置为超时回滚,将其放入重试回滚管理器,让其回滚全局事务
- undoLogDelete:向rm端发送请求,删除7天(默认)之前的undolog

这些重试操作执行的逻辑和全局事务的提交/回滚逻辑一致,就不介绍了
// DefaultCoordinator
public
void
init() {
// 重试rollback定时任务
retryRollbacking.
scheduleAtFixedRate(()
-> {
boolean
lock
=
SessionHolder.
retryRollbackingLock();
if (
lock) {
try {
handleRetryRollbacking();
}
catch (
Exception
e) {
LOGGER.
info(
"Exception retry rollbacking ... ",
e);
}
finally {
SessionHolder.
unRetryRollbackingLock();
}
}
},
0,
ROLLBACKING_RETRY_PERIOD,
TimeUnit.
MILLISECONDS);
// 重试commit定时任务
retryCommitting.
scheduleAtFixedRate(()
-> {
boolean
lock
=
SessionHolder.
retryCommittingLock();
if (
lock) {
try {
handleRetryCommitting();
}
catch (
Exception
e) {
LOGGER.
info(
"Exception retry committing ... ",
e);
}
finally {
SessionHolder.
unRetryCommittingLock();
}
}
},
0,
COMMITTING_RETRY_PERIOD,
TimeUnit.
MILLISECONDS);
// 省略部分代码
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
全局锁
在前面的文章中我们说过,在Seata AT模式中,我们用全局锁来避免脏写,同时也可以用全局锁将默认的隔离级别从读未提交升高为读已提交。
全局锁是存在TC端的,所以在TC端要提供相应的接口,来进行加锁,解锁,相应的资源是否被加锁等操作!
当分支事务提交的时候,需要把修改的资源锁定。当全局事务提交后才会把相应的资源解锁。
各种事务的加锁状态会存在lock_table表中
-- the table to store lock data
CREATE
TABLE IF
NOT EXISTS `lock_table`
(
`row_key`
VARCHAR
(
128
)
NOT
NULL
,
-- resource_id+table_name+pk的组合
`xid`
VARCHAR
(
128
)
,
`transaction_id`
BIGINT
,
`branch_id`
BIGINT
NOT
NULL
,
`resource_id`
VARCHAR
(
256
)
,
`table_name`
VARCHAR
(
32
)
,
`pk`
VARCHAR
(
36
)
,
`gmt_create`
DATETIME
,
`gmt_modified`
DATETIME
,
PRIMARY KEY
(`row_key`
)
,
KEY `idx_branch_id`
(`branch_id`
)
) ENGINE
= InnoDB
DEFAULT CHARSET
= utf8
;
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
用个例子演示一下全局锁的工作流程,
假如account_info表中有如下2个账户数据,其中id是主键
id | user_id |
1 | 1001 |
2 | 1002 |
假如说执行如下sql
当进行commit的时候,需要加全局锁,此时根据update语句构建出select语句,找出影响的主键,即1和2
通过主键构建出lockKey=account_info:1,2
lockKey的构建规则如下
如果主键是一列(id列),则形式如下
如果主键是多列(id列+user_id列),则形式如下
当然有可能一个事务中,有可能有多个表,多个表的构建规则如下(中间通过;分隔即可)
加锁
当加锁是会往lock_table中假如如下2个记录(省略无关的列)
row_key | xid |
jdbc:mysql://myhost:3306/db_account_1^^^account_info^^^1 | 127.21.0.14:18091:6449339005964652705 |
jdbc:mysql://myhost:3306/db_account_2^^^account_info^^^1 | 127.21.0.14:18091:6449339005964652705 |
row_key的构建规则如下
tcc模式下resourceId为@TwoPhaseBusinessAction注解的name属性
而在at和xa模式中resourceId都为数据库连接url
解锁
根据xid和row_key删除记录
查询是否能加锁
根据row_key从lock_table中查询记录,如果记录中的xid和查询传递过来的xid不一致则加锁失败,如果没有记录或者记录的xid和传递过来的xid一致,则加锁成功

源码分析
TC端定义的锁操作接口
public
interface
LockManager {
// 对分支事务的资源加锁
boolean
acquireLock(
BranchSession
branchSession)
throws
TransactionException;
// 对分支事务的资源解锁
boolean
releaseLock(
BranchSession
branchSession)
throws
TransactionException;
// 对全局事务中的所有分支事务的资源解锁
boolean
releaseGlobalSessionLock(
GlobalSession
globalSession)
throws
TransactionException;
// 根据xid resourceId lockKey 查询是否已经加锁
boolean
isLockable(
String
xid,
String
resourceId,
String
lockKey)
throws
TransactionException;
// 清除所有锁
void
cleanAllLocks()
throws
TransactionException;
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
老规矩,我们还是只分析db这种存储模式,最终的加解锁操作会交给Locker

加锁
// AbstractLockManager
public
boolean
acquireLock(
BranchSession
branchSession)
throws
TransactionException {
if (
branchSession
==
null) {
throw
new
IllegalArgumentException(
"branchSession can't be null for memory/file locker.");
}
String
lockKey
=
branchSession.
getLockKey();
if (
StringUtils.
isNullOrEmpty(
lockKey)) {
// no lock
return
true;
}
// get locks of branch
// 创建 RowLock 集合,一条加锁记录对应一个RowLock对象
List
<
RowLock
>
locks
=
collectRowLocks(
branchSession);
if (
CollectionUtils.
isEmpty(
locks)) {
// no lock
return
true;
}
return
getLocker(
branchSession).
acquireLock(
locks);
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
把需要加锁的资源转换成RowLock集合
protected
List
<
RowLock
>
collectRowLocks(
BranchSession
branchSession) {
List
<
RowLock
>
locks
=
new
ArrayList
<>();
if (
branchSession
==
null
||
StringUtils.
isBlank(
branchSession.
getLockKey())) {
return
locks;
}
String
xid
=
branchSession.
getXid();
// 得到资源id,也就是数据库连接url
String
resourceId
=
branchSession.
getResourceId();
long
transactionId
=
branchSession.
getTransactionId();
String
lockKey
=
branchSession.
getLockKey();
return
collectRowLocks(
lockKey,
resourceId,
xid,
transactionId,
branchSession.
getBranchId());
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
转换RowLock的过程,基本就是对lockKey的解析过程,
protected
List
<
RowLock
>
collectRowLocks(
String
lockKey,
String
resourceId,
String
xid,
Long
transactionId,
Long
branchID) {
List
<
RowLock
>
locks
=
new
ArrayList
<
RowLock
>();
// 对多个记录加锁,中间使用;分隔
String[]
tableGroupedLockKeys
=
lockKey.
split(
";");
for (
String
tableGroupedLockKey :
tableGroupedLockKeys) {
// 表名和记录主键值之间用:分隔
int
idx
=
tableGroupedLockKey.
indexOf(
":");
if (
idx
<
0) {
return
locks;
}
// 要加锁的表名
String
tableName
=
tableGroupedLockKey.
substring(
0,
idx);
// 加锁的记录主键值,如果需要一次加锁多条记录,记录之间用,分隔
String
mergedPKs
=
tableGroupedLockKey.
substring(
idx
+
1);
if (
StringUtils.
isBlank(
mergedPKs)) {
return
locks;
}
String[]
pks
=
mergedPKs.
split(
",");
if (
pks
==
null
||
pks.
length
==
0) {
return
locks;
}
// 一个主键创建一个RowLock对象
for (
String
pk :
pks) {
if (
StringUtils.
isNotBlank(
pk)) {
RowLock
rowLock
=
new
RowLock();
rowLock.
setXid(
xid);
rowLock.
setTransactionId(
transactionId);
rowLock.
setBranchId(
branchID);
rowLock.
setTableName(
tableName);
rowLock.
setPk(
pk);
rowLock.
setResourceId(
resourceId);
locks.
add(
rowLock);
}
}
}
return
locks;
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- 35.
- 36.
- 37.
- 38.
- 39.
构造好RowLock集合,接下来就是调用DataBaseLocker执行真正的加锁操作
首先将RowLock转为LockDO
protected
List
<
LockDO
>
convertToLockDO(
List
<
RowLock
>
locks) {
List
<
LockDO
>
lockDOs
=
new
ArrayList
<>();
if (
CollectionUtils.
isEmpty(
locks)) {
return
lockDOs;
}
for (
RowLock
rowLock :
locks) {
LockDO
lockDO
=
new
LockDO();
lockDO.
setBranchId(
rowLock.
getBranchId());
lockDO.
setPk(
rowLock.
getPk());
lockDO.
setResourceId(
rowLock.
getResourceId());
// rowKey = resourceId + "^^^" + tableName + "^^^" + pk
lockDO.
setRowKey(
getRowKey(
rowLock.
getResourceId(),
rowLock.
getTableName(),
rowLock.
getPk()));
lockDO.
setXid(
rowLock.
getXid());
lockDO.
setTransactionId(
rowLock.
getTransactionId());
lockDO.
setTableName(
rowLock.
getTableName());
lockDOs.
add(
lockDO);
}
return
lockDOs;
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
// LockStoreDataBaseDAO
public
boolean
acquireLock(
List
<
LockDO
>
lockDOs) {
Connection
conn
=
null;
PreparedStatement
ps
=
null;
ResultSet
rs
=
null;
Set
<
String
>
dbExistedRowKeys
=
new
HashSet
<>();
boolean
originalAutoCommit
=
true;
if (
lockDOs.
size()
>
1) {
// 过滤掉重复的加锁记录
lockDOs
=
lockDOs.
stream().
filter(
LambdaUtils.
distinctByKey(
LockDO::
getRowKey)).
collect(
Collectors.
toList());
}
try {
conn
=
lockStoreDataSource.
getConnection();
if (
originalAutoCommit
=
conn.
getAutoCommit()) {
conn.
setAutoCommit(
false);
}
//check lock
StringJoiner
sj
=
new
StringJoiner(
",");
for (
int
i
=
0;
i
<
lockDOs.
size();
i
++) {
sj.
add(
"?");
}
boolean
canLock
=
true;
//query
// select xid, transaction_id, branch_id, resource_id, table_name, pk, row_key, gmt_create, gmt_modified
// from lock_table where row_key in ('?', '?', '?')
String
checkLockSQL
=
LockStoreSqlFactory.
getLogStoreSql(
dbType).
getCheckLockableSql(
lockTable,
sj.
toString());
ps
=
conn.
prepareStatement(
checkLockSQL);
for (
int
i
=
0;
i
<
lockDOs.
size();
i
++) {
ps.
setString(
i
+
1,
lockDOs.
get(
i).
getRowKey());
}
rs
=
ps.
executeQuery();
String
currentXID
=
lockDOs.
get(
0).
getXid();
while (
rs.
next()) {
String
dbXID
=
rs.
getString(
ServerTableColumnsName.
LOCK_TABLE_XID);
// 查出来的记录的row_key和当前的不一样,则说明被别的事务加锁了
if (
!
StringUtils.
equals(
dbXID,
currentXID)) {
if (
LOGGER.
isInfoEnabled()) {
String
dbPk
=
rs.
getString(
ServerTableColumnsName.
LOCK_TABLE_PK);
String
dbTableName
=
rs.
getString(
ServerTableColumnsName.
LOCK_TABLE_TABLE_NAME);
Long
dbBranchId
=
rs.
getLong(
ServerTableColumnsName.
LOCK_TABLE_BRANCH_ID);
LOGGER.
info(
"Global lock on [{}:{}] is holding by xid {} branchId {}",
dbTableName,
dbPk,
dbXID,
dbBranchId);
}
canLock
&=
false;
break;
}
// 已经被自己加锁的记录
dbExistedRowKeys.
add(
rs.
getString(
ServerTableColumnsName.
LOCK_TABLE_ROW_KEY));
}
// 有记录已经加过锁,回滚退出
if (
!
canLock) {
conn.
rollback();
return
false;
}
// 此次需要加锁的记录
List
<
LockDO
>
unrepeatedLockDOs
=
null;
if (
CollectionUtils.
isNotEmpty(
dbExistedRowKeys)) {
unrepeatedLockDOs
=
lockDOs.
stream().
filter(
lockDO
->
!
dbExistedRowKeys.
contains(
lockDO.
getRowKey()))
.
collect(
Collectors.
toList());
}
else {
unrepeatedLockDOs
=
lockDOs;
}
if (
CollectionUtils.
isEmpty(
unrepeatedLockDOs)) {
conn.
rollback();
return
true;
}
//lock
// 执行加锁操作
if (
unrepeatedLockDOs.
size()
==
1) {
LockDO
lockDO
=
unrepeatedLockDOs.
get(
0);
if (
!
doAcquireLock(
conn,
lockDO)) {
if (
LOGGER.
isInfoEnabled()) {
LOGGER.
info(
"Global lock acquire failed, xid {} branchId {} pk {}",
lockDO.
getXid(),
lockDO.
getBranchId(),
lockDO.
getPk());
}
conn.
rollback();
return
false;
}
}
else {
if (
!
doAcquireLocks(
conn,
unrepeatedLockDOs)) {
if (
LOGGER.
isInfoEnabled()) {
LOGGER.
info(
"Global lock batch acquire failed, xid {} branchId {} pks {}",
unrepeatedLockDOs.
get(
0).
getXid(),
unrepeatedLockDOs.
get(
0).
getBranchId(),
unrepeatedLockDOs.
stream().
map(
lockDO
->
lockDO.
getPk()).
collect(
Collectors.
toList()));
}
conn.
rollback();
return
false;
}
}
conn.
commit();
return
true;
}
catch (
SQLException
e) {
throw
new
StoreException(
e);
}
finally {
IOUtil.
close(
rs,
ps);
if (
conn
!=
null) {
try {
if (
originalAutoCommit) {
conn.
setAutoCommit(
true);
}
conn.
close();
}
catch (
SQLException
e) {
}
}
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- 35.
- 36.
- 37.
- 38.
- 39.
- 40.
- 41.
- 42.
- 43.
- 44.
- 45.
- 46.
- 47.
- 48.
- 49.
- 50.
- 51.
- 52.
- 53.
- 54.
- 55.
- 56.
- 57.
- 58.
- 59.
- 60.
- 61.
- 62.
- 63.
- 64.
- 65.
- 66.
- 67.
- 68.
- 69.
- 70.
- 71.
- 72.
- 73.
- 74.
- 75.
- 76.
- 77.
- 78.
- 79.
- 80.
- 81.
- 82.
- 83.
- 84.
- 85.
- 86.
- 87.
- 88.
- 89.
- 90.
- 91.
- 92.
- 93.
- 94.
- 95.
- 96.
- 97.
- 98.
- 99.
- 100.
- 101.
- 102.
- 103.
- 104.
- 105.
解锁
解锁的逻辑和加锁的逻辑差不多,直接分析最终执行的部分了
public
boolean
unLock(
List
<
LockDO
>
lockDOs) {
Connection
conn
=
null;
PreparedStatement
ps
=
null;
try {
conn
=
lockStoreDataSource.
getConnection();
conn.
setAutoCommit(
true);
StringJoiner
sj
=
new
StringJoiner(
",");
for (
int
i
=
0;
i
<
lockDOs.
size();
i
++) {
sj.
add(
"?");
}
//batch release lock
// delete from lock_table where xid = ? and row_key in (?, ?, ?)
String
batchDeleteSQL
=
LockStoreSqlFactory.
getLogStoreSql(
dbType).
getBatchDeleteLockSql(
lockTable,
sj.
toString());
ps
=
conn.
prepareStatement(
batchDeleteSQL);
ps.
setString(
1,
lockDOs.
get(
0).
getXid());
for (
int
i
=
0;
i
<
lockDOs.
size();
i
++) {
ps.
setString(
i
+
2,
lockDOs.
get(
i).
getRowKey());
}
ps.
executeUpdate();
}
catch (
SQLException
e) {
throw
new
StoreException(
e);
}
finally {
IOUtil.
close(
ps,
conn);
}
return
true;
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
参考博客
事务状态逻辑
[1]http://seata.io/zh-cn/docs/user/appendix/global-transaction-status.html
边栏推荐
- Neuron Newsletter 2022-07|新增非 A11 驱动、即将支持 OPC DA
- sphinx matches the specified field
- 六年团队Leader实战秘诀|程序员最重要的八种软技能 - 脸皮薄容易耽误事 - 自我营销
- XCODE12 在使用模拟器(SIMULATOR)时编译错误的解决方法
- 周报2022-8-4
- Pagoda measurement - building small and medium-sized homestay hotel management source code
- 接口全周期的生产力利器Apifox
- Creo 9.0 基准特征:基准轴
- JS syntax usage
- 按钮上显示值的轮流切换
猜你喜欢

基于 Kubernetes 的微服务项目整体设计与实现

Embedded practice ---- based on RT1170 transplant memtester to do SDRAM test (25)

DTcloud 装饰器

Spark cluster deployment (third bullet)

链表中的数字相加----链表专题

egg框架中解决跨域的三种方案

【LeetCode】623. Add a row to the binary tree

使用稀疏 4D 卷积对 3D LiDAR 数据中的运动对象进行后退分割(IROS 2022)

Chapter 12 贝叶斯网络

Chapter 12 Bayesian Networks
随机推荐
微信小程序请求封装
DNS 查询原理详解
16种香饭做法全攻略
树状数组模版+例题
How to make a puzzle in PS, self-study PS software photoshop2022, PS make a puzzle effect
十一道家常小菜详细攻略[图文并茂]
阿里云存储的数据库是怎么自动加快加载速度的呢www.cxsdkt.cn怎么设置案例?
Creo 9.0 基准特征:基准平面
Redis cache and existing problems--cache penetration, cache avalanche, cache breakdown and solutions
CROS and JSONP configuration
Walk 100 trick society
flink cdc支持从oracle dg库同步吗
原型&原型链
egg framework
七夕看什么电影好?爬取电影评分并存入csv文件
egg框架中解决跨域的三种方案
mySQL数据库初始化失败,有谁可以指导一下吗
Thinking after writing a code with a very high CPU usage
The Secrets of the Six-Year Team Leader | The Eight Most Important Soft Skills of Programmers
Luogu P4588: [TJOI2018]数学计算