Seata事务隔离

本文目标:帮助用户明白使用Seata AT模式时,该如何正确实现事务隔离,防止脏读脏写。

希望读者在阅读本文前,已阅读过seata官网中对AT模式的介绍,并且对数据库本地锁有所了解

(例如,两个事务同时在对同一条记录做update时,只有拿到record lock的事务才能更新成功,另一个事务在record lock未释放前只能等待,直到事务超时)

首先请看这样的一段代码,尽管看着“初级”,但持久层框架实际上帮我们做的主要事情也就这样。

  1. @Service
  2. public class StorageService {
  3. @Autowired
  4. private DataSource dataSource;
  5. @GlobalTransactional
  6. public void batchUpdate() throws SQLException {
  7. Connection connection = null;
  8. PreparedStatement preparedStatement = null;
  9. try {
  10. connection = dataSource.getConnection();
  11. connection.setAutoCommit(false);
  12. String sql = "update storage_tbl set count = ?" +
  13. " where id = ? and commodity_code = ?";
  14. preparedStatement = connection.prepareStatement(sql);
  15. preparedStatement.setInt(1, 100);
  16. preparedStatement.setLong(2, 1);
  17. preparedStatement.setString(3, "2001");
  18. preparedStatement.executeUpdate();
  19. connection.commit();
  20. } catch (Exception e) {
  21. throw e;
  22. } finally {
  23. IOutils.close(preparedStatement);
  24. IOutils.close(connection);
  25. }
  26. }
  27. }

从代理数据源说起

使用AT模式,最重要的事情便是代理数据源,那么用DataSourceProxy代理数据源有什么作用呢?

DataSourceProxy能帮助我们获得几个重要的代理对象

  • 通过DataSourceProxy.getConnection()获得ConnectionProxy

  • 通过ConnectionProxy.prepareStatement(...)获得StatementProxy

Seata的如何实现事务隔离,就藏在这2个Proxy中,我先概述下实现逻辑。

StatementProxy.executeXXX()的处理逻辑

  • 当调用io.seata.rm.datasource.StatementProxy.executeXXX()会将sql交给io.seata.rm.datasource.exec.ExecuteTemplate.execute(...)处理。

    • ExecuteTemplate.execute(...)方法中,Seata根据不同dbType和sql语句类型使用不同的Executer,调用io.seata.rm.datasource.exec.Executer类的execute(Object... args)
    • 如果选了DML类型Executer,主要做了以下事情:
      • 查询前镜像(select for update,因此此时获得本地锁)
      • 执行业务sql
      • 查询后镜像
      • 准备undoLog
    • 如果你的sql是select for update则会使用SelectForUpdateExecutor(Seata代理了select for update),代理后处理的逻辑是这样的:
      • 先执行 select for update(获取数据库本地锁)
      • 如果处于@GlobalTransactional or @GlobalLock检查是否有全局锁
      • 如果有全局锁,则未开启本地事务下会rollback本地事务,再重新争抢本地锁和全局锁,以此类推,除非拿到全局锁

ConnectionProxy.commit()的处理逻辑

  • 处于全局事务中(即,数据持久化方法带有@GlobalTransactional
    • 注册分支事务,获取全局锁
    • undoLog数据入库
    • 让数据库commit本次事务
  • 处于@GlobalLock中(即,数据持久化方法带有@GlobalLock
    • 向tc查询是否有全局锁存在,如存在,则抛出异常
    • 让数据库commit本次事务
  • 除了以上情况(else分支)
    • 让数据库commit本次事务

@GlobalTransactional的作用

标识一个全局事务

@GlobalLock + select for update的作用

如果像updateA()方法带有@GlobalLock + select for update,Seata在处理时,会先获取数据库本地锁,然后查询该记录是否有全局锁存在,若有,则抛出LockConflictException。

先举一个脏写的例子,再来看Seata如何防止脏写

假设你的业务代码是这样的:

  • updateAll()用来同时更新A和B表记录,updateA() updateB()则分别更新A、B表记录
  • updateAll()已经加上了@GlobalTransactional
  1. class YourBussinessService {
  2. DbServiceA serviceA;
  3. DbServiceB serviceB;
  4. @GlobalTransactional
  5. public boolean updateAll(DTO dto) {
  6. serviceA.update(dto.getA());
  7. serviceB.update(dto.getB());
  8. }
  9. public boolean updateA(DTO dto) {
  10. serviceA.update(dto.getA());
  11. }
  12. }
  1. class DbServiceA {
  2. @Transactional
  3. public boolean update(A a) {
  4. }
  5. }

dirty-write |

怎么用Seata防止脏写?

办法一:updateA()也加上@GlobalTransactional,此时Seata会如何保证事务隔离?

  1. class DbServiceA {
  2. @GlobalTransactional
  3. @Transactional
  4. public boolean updateA(DTO dto) {
  5. serviceA.update(dto.getA());
  6. }
  7. }
  • updateAll()先被调用(未完成),updateA()后被调用

dirty-write

办法二: @GlobalLock + select for update

  1. class DbServiceA {
  2. @GlobalLock
  3. @Transactional
  4. public boolean updateA(DTO dto) {
  5. serviceA.selectForUpdate(dto.getA());
  6. serviceA.update(dto.getA());
  7. }
  8. }
  • updateAll()先被调用(未完成),updateA()后被调用 dirty-write

  • 那如果是updateA()先被调用(未完成),updateAll()后被调用呢?
    由于2个业务都是要先获得本地锁,因此同样不会发生脏写

  • 一定有人会问,“这里为什么要加上select for update? 只用@GlobalLock能不能防止脏写?” 能。但请再回看下上面的图,select for update能带来这么几个好处:

    • 锁冲突更“温柔”些。如果只有@GlobalLock,检查到全局锁,则立刻抛出异常,也许再“坚持”那么一下,全局锁就释放了,抛出异常岂不可惜了。
    • updateA()中可以通过select for update获得最新的A,接着再做更新。

如何防止脏读?

场景: 某业务先调用updateAll()updateAll()未执行完成,另一业务后调用queryA()

dirty-write


源码展示

  1. @Service
  2. public class StorageService {
  3. @Autowired
  4. private DataSource dataSource;
  5. @GlobalTransactional
  6. public void update() throws SQLException {
  7. Connection connection = null;
  8. PreparedStatement preparedStatement = null;
  9. try {
  10. connection = dataSource.getConnection();
  11. connection.setAutoCommit(false);
  12. String sql = "update storage_tbl set count = ?" +
  13. " where id = ? and commodity_code = ?";
  14. preparedStatement = connection.prepareStatement(sql);
  15. preparedStatement.setInt(1, 100);
  16. preparedStatement.setLong(2, 1);
  17. preparedStatement.setString(3, "2001");
  18. preparedStatement.execute();
  19. connection.commit();
  20. } catch (Exception e) {
  21. throw e;
  22. } finally {
  23. IOutils.close(preparedStatement);
  24. IOutils.close(connection);
  25. }
  26. }
  27. }

这段代码虽然看着很初级,没有使用持久层框架,但如果将框架帮我们做的事情抽象出来,其实也就是上面这段代码。

简单说明接下来源码介绍的脉络(主要关注和事务隔离有关的源码)

  • 代理数据源的用途
    • DataSourceProxy的作用(返回ConnectionProxy
      • 介绍 ConnectionProxy的一个小功能(存放undolog)
    • ConnectionProxy的作用(返回StatementProxy
    • StatementProxy.execute()的处理逻辑
      • io.seata.rm.datasource.exec.UpdateExecutor的执行逻辑(查前镜像、执行sql、查后镜像、准备undoLog)
      • SelectForUpdateExecutor的执行逻辑(争本地锁,查全局锁。有全局锁,回滚,再争…)
    • ConnectionProxy.commit()的处理逻辑(注册分支事务(争全局锁),写入undoLog,数据库提交)
  • 介绍RootContext
  • GlobalTransactionalInterceptor的不同代理逻辑
    • 带有@GlobalTransactional如何处理
    • 带有@GlobalLock如何处理

DataSourceProxy的作用

DataSourceProxy帮助我们获得几个重要的代理对象

  • 通过DataSourceProxy.getConnection()获得ConnectionProxy

    1. package io.seata.rm.datasource;
    2. import java.sql.Connection;
    3. public class DataSourceProxy extends AbstractDataSourceProxy implements Resource {
    4. @Override
    5. public ConnectionProxy getConnection() throws SQLException {
    6. Connection targetConnection = targetDataSource.getConnection();
    7. return new ConnectionProxy(this, targetConnection);
    8. }
    9. }
    • 现在先介绍下ConnectionProxy中的ConnectionContext,它的有一个功能是存放undoLog

      1. package io.seata.rm.datasource;
      2. import io.seata.rm.datasource.undo.SQLUndoLog;
      3. public class ConnectionProxy extends AbstractConnectionProxy {
      4. private ConnectionContext context = new ConnectionContext();
      5. public void appendUndoLog(SQLUndoLog sqlUndoLog) {
      6. context.appendUndoItem(sqlUndoLog);
      7. }
      8. }
      1. package io.seata.rm.datasource;
      2. public class ConnectionContext {
      3. private static final Savepoint DEFAULT_SAVEPOINT = new Savepoint() {
      4. @Override
      5. public int getSavepointId() throws SQLException {
      6. return 0;
      7. }
      8. @Override
      9. public String getSavepointName() throws SQLException {
      10. return "DEFAULT_SEATA_SAVEPOINT";
      11. }
      12. };
      13. private final Map<Savepoint, List<SQLUndoLog>> sqlUndoItemsBuffer = new LinkedHashMap<>();
      14. private Savepoint currentSavepoint = DEFAULT_SAVEPOINT;
      15. void appendUndoItem(SQLUndoLog sqlUndoLog) {
      16. sqlUndoItemsBuffer.computeIfAbsent(currentSavepoint, k -> new ArrayList<>()).add(sqlUndoLog);
      17. }
      18. }

通过ConnectionProxy.prepareStatement(...)获得StatementProxy

  1. package io.seata.rm.datasource;
  2. public class ConnectionProxy extends AbstractConnectionProxy {
  3. public ConnectionProxy(DataSourceProxy dataSourceProxy, Connection targetConnection) {
  4. super(dataSourceProxy, targetConnection);
  5. }
  6. }
  1. package io.seata.rm.datasource;
  2. import java.sql.Connection;
  3. public abstract class AbstractConnectionProxy implements Connection {
  4. protected Connection targetConnection;
  5. public AbstractConnectionProxy(DataSourceProxy dataSourceProxy, Connection targetConnection) {
  6. this.dataSourceProxy = dataSourceProxy;
  7. this.targetConnection = targetConnection;
  8. }
  9. @Override
  10. public PreparedStatement prepareStatement(String sql) throws SQLException {
  11. String dbType = getDbType();
  12. // support oracle 10.2+
  13. PreparedStatement targetPreparedStatement = null;
  14. if (BranchType.AT == RootContext.getBranchType()) { //为什么这里会返回AT?
  15. List<SQLRecognizer> sqlRecognizers = SQLVisitorFactory.get(sql, dbType);
  16. if (sqlRecognizers != null && sqlRecognizers.size() == 1) {
  17. SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
  18. if (sqlRecognizer != null && sqlRecognizer.getSQLType() == SQLType.INSERT) {
  19. TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dbType).getTableMeta(getTargetConnection(),
  20. sqlRecognizer.getTableName(), getDataSourceProxy().getResourceId());
  21. String[] pkNameArray = new String[tableMeta.getPrimaryKeyOnlyName().size()];
  22. tableMeta.getPrimaryKeyOnlyName().toArray(pkNameArray);
  23. // 如果是insert语句,这里创建的PreparedStatement需要可以返回自动生成的主键,因此使用这个prepareStatement()
  24. targetPreparedStatement = getTargetConnection().prepareStatement(sql,pkNameArray);
  25. }
  26. }
  27. }
  28. if (targetPreparedStatement == null) {
  29. targetPreparedStatement = getTargetConnection().prepareStatement(sql);
  30. }
  31. return new PreparedStatementProxy(this, targetPreparedStatement, sql);
  32. }
  33. public Connection getTargetConnection() {
  34. return targetConnection;
  35. }
  36. }

先在这打下个疑问,后边解释。
RootContext.getBranchType()的返回值怎么会是AT?

StatementProxy.execute()的处理逻辑

  • 当调用io.seata.rm.datasource.StatementProxy.execute()会将sql交给io.seata.rm.datasource.exec.ExecuteTemplate.execute(...)处理。

    1. package io.seata.rm.datasource;
    2. public class PreparedStatementProxy extends AbstractPreparedStatementProxy
    3. implements PreparedStatement, ParametersHolder {
    4. @Override
    5. public boolean execute() throws SQLException {
    6. return ExecuteTemplate.execute(this, (statement, args) -> statement.execute());
    7. }
    8. }
    • ExecuteTemplate.execute(...)方法中,Seata根据不同dbType和sql语句类型使用不同的Executer,调用io.seata.rm.datasource.exec.Executer类的execute(Object... args)

      ``` package io.seata.rm.datasource.exec;

  1. public class ExecuteTemplate {
  2. public static <T, S extends Statement> T execute(StatementProxy<S> statementProxy,
  3. StatementCallback<T, S> statementCallback,
  4. Object... args) throws SQLException {
  5. return execute(null, statementProxy, statementCallback, args);
  6. }
  7. public static <T, S extends Statement> T execute(List<SQLRecognizer> sqlRecognizers,
  8. StatementProxy<S> statementProxy,
  9. StatementCallback<T, S> statementCallback,
  10. Object... args) throws SQLException {
  11. if (!RootContext.requireGlobalLock() && BranchType.AT != RootContext.getBranchType()) {
  12. // Just work as original statement
  13. return statementCallback.execute(statementProxy.getTargetStatement(), args);
  14. }
  15. String dbType = statementProxy.getConnectionProxy().getDbType();
  16. if (CollectionUtils.isEmpty(sqlRecognizers)) {
  17. sqlRecognizers = SQLVisitorFactory.get(
  18. statementProxy.getTargetSQL(),
  19. dbType);
  20. }
  21. Executor<T> executor;
  22. if (CollectionUtils.isEmpty(sqlRecognizers)) {
  23. executor = new PlainExecutor<>(statementProxy, statementCallback);
  24. } else {
  25. if (sqlRecognizers.size() == 1) {
  26. SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
  27. switch (sqlRecognizer.getSQLType()) {
  28. case INSERT:
  29. executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType,
  30. new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class},
  31. new Object[]{statementProxy, statementCallback, sqlRecognizer});
  32. break;
  33. case UPDATE:
  34. executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
  35. break;
  36. case DELETE:
  37. executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);
  38. break;
  39. case SELECT_FOR_UPDATE:
  40. executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
  41. break;
  42. default:
  43. executor = new PlainExecutor<>(statementProxy, statementCallback);
  44. break;
  45. }
  46. } else {
  47. executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers);
  48. }
  49. }
  50. T rs;
  51. try {
  52. rs = executor.execute(args);
  53. } catch (Throwable ex) {
  54. if (!(ex instanceof SQLException)) {
  55. // Turn other exception into SQLException
  56. ex = new SQLException(ex);
  57. }
  58. throw (SQLException) ex;
  59. }
  60. return rs;
  61. }
  62. }
  63. ```
  64. > 也在这打下个疑问,后边解释。
  65. > **`RootContext.requireGlobalLock()`怎么判断当前是否需要全局锁?**
  66. 先以`io.seata.rm.datasource.exec.UpdateExecutor`举例,`UpdateExecutor` extends `AbstractDMLBaseExecutor` extends `BaseTransactionalExecutor`。 观察`execute()`方法的做了什么
  67. ```
  68. package io.seata.rm.datasource.exec;
  69. public abstract class BaseTransactionalExecutor<T, S extends Statement> implements Executor<T> {
  70. protected StatementProxy<S> statementProxy;
  71. protected StatementCallback<T, S> statementCallback;
  72. protected SQLRecognizer sqlRecognizer;
  73. public BaseTransactionalExecutor(StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback,
  74. SQLRecognizer sqlRecognizer) {
  75. this.statementProxy = statementProxy;
  76. this.statementCallback = statementCallback;
  77. this.sqlRecognizer = sqlRecognizer;
  78. }
  79. @Override
  80. public T execute(Object... args) throws Throwable {
  81. String xid = RootContext.getXID();
  82. if (xid != null) {
  83. statementProxy.getConnectionProxy().bind(xid);
  84. }
  85. statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock());
  86. return doExecute(args);
  87. }
  88. }
  89. ```
  90. ```
  91. public abstract class AbstractDMLBaseExecutor<T, S extends Statement> extends BaseTransactionalExecutor<T, S> {
  92. public AbstractDMLBaseExecutor(StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback,
  93. SQLRecognizer sqlRecognizer) {
  94. super(statementProxy, statementCallback, sqlRecognizer);
  95. }
  96. @Override
  97. public T doExecute(Object... args) throws Throwable {
  98. AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
  99. if (connectionProxy.getAutoCommit()) {
  100. return executeAutoCommitTrue(args);
  101. } else {
  102. return executeAutoCommitFalse(args);
  103. }
  104. }
  105. protected T executeAutoCommitTrue(Object[] args) throws Throwable {
  106. ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
  107. try {
  108. connectionProxy.changeAutoCommit(); // 注意,你如果没开启事务,seata帮你开启
  109. return new LockRetryPolicy(connectionProxy).execute(() -> {
  110. T result = executeAutoCommitFalse(args);
  111. connectionProxy.commit(); // 帮你开启事务后,通过connectionProxy来提交
  112. return result;
  113. });
  114. } catch (Exception e) {
  115. // when exception occur in finally,this exception will lost, so just print it here
  116. LOGGER.error("execute executeAutoCommitTrue error:{}", e.getMessage(), e);
  117. if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) {
  118. connectionProxy.getTargetConnection().rollback();
  119. }
  120. throw e;
  121. } finally {
  122. connectionProxy.getContext().reset();
  123. connectionProxy.setAutoCommit(true);
  124. }
  125. }
  126. protected T executeAutoCommitFalse(Object[] args) throws Exception {
  127. if (!JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && isMultiPk()) {
  128. throw new NotSupportYetException("multi pk only support mysql!");
  129. }
  130. TableRecords beforeImage = beforeImage();
  131. T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
  132. TableRecords afterImage = afterImage(beforeImage);
  133. prepareUndoLog(beforeImage, afterImage);
  134. return result;
  135. }
  136. }
  137. ```
  138. ```
  139. package io.seata.rm.datasource.exec;
  140. public class UpdateExecutor<T, S extends Statement> extends AbstractDMLBaseExecutor<T, S> {
  141. public UpdateExecutor(StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback,
  142. SQLRecognizer sqlRecognizer) {
  143. super(statementProxy, statementCallback, sqlRecognizer);
  144. }
  145. }
  146. ```
  147. - 如果选了DML类型Executer,可以在上面的executeAutoCommitFalse()中看到,主要做了以下事情:
  148. - 查询前镜像(select for update,因此此时获得本地锁)
  149. ```
  150. package io.seata.rm.datasource.exec;
  151. public class UpdateExecutor<T, S extends Statement> extends AbstractDMLBaseExecutor<T, S> {
  152. private static final boolean ONLY_CARE_UPDATE_COLUMNS = CONFIG.getBoolean(
  153. ConfigurationKeys.TRANSACTION_UNDO_ONLY_CARE_UPDATE_COLUMNS, DefaultValues.DEFAULT_ONLY_CARE_UPDATE_COLUMNS); // 默认为true
  154. @Override
  155. protected TableRecords beforeImage() throws SQLException {
  156. ArrayList<List<Object>> paramAppenderList = new ArrayList<>();
  157. TableMeta tmeta = getTableMeta();
  158. String selectSQL = buildBeforeImageSQL(tmeta, paramAppenderList);
  159. // SELECT id, count FROM storage_tbl WHERE id = ? FOR UPDATE
  160. return buildTableRecords(tmeta, selectSQL, paramAppenderList);
  161. }
  162. private String buildBeforeImageSQL(TableMeta tableMeta, ArrayList<List<Object>> paramAppenderList) {
  163. SQLUpdateRecognizer recognizer = (SQLUpdateRecognizer) sqlRecognizer;
  164. List<String> updateColumns = recognizer.getUpdateColumns();
  165. StringBuilder prefix = new StringBuilder("SELECT ");
  166. StringBuilder suffix = new StringBuilder(" FROM ").append(getFromTableInSQL());
  167. String whereCondition = buildWhereCondition(recognizer, paramAppenderList);
  168. if (StringUtils.isNotBlank(whereCondition)) {
  169. suffix.append(WHERE).append(whereCondition);
  170. }
  171. String orderBy = recognizer.getOrderBy();
  172. if (StringUtils.isNotBlank(orderBy)) {
  173. suffix.append(orderBy);
  174. }
  175. ParametersHolder parametersHolder = statementProxy instanceof ParametersHolder ? (ParametersHolder)statementProxy : null;
  176. String limit = recognizer.getLimit(parametersHolder, paramAppenderList);
  177. if (StringUtils.isNotBlank(limit)) {
  178. suffix.append(limit);
  179. }
  180. suffix.append(" FOR UPDATE");
  181. StringJoiner selectSQLJoin = new StringJoiner(", ", prefix.toString(), suffix.toString());
  182. if (ONLY_CARE_UPDATE_COLUMNS) {
  183. if (!containsPK(updateColumns)) {// 如果本次更新的行不包含主键,那select for update的时候加上主键
  184. selectSQLJoin.add(getColumnNamesInSQL(tableMeta.getEscapePkNameList(getDbType())));
  185. }
  186. for (String columnName : updateColumns) {
  187. selectSQLJoin.add(columnName);
  188. }
  189. } else {
  190. for (String columnName : tableMeta.getAllColumns().keySet()) {
  191. selectSQLJoin.add(ColumnUtils.addEscape(columnName, getDbType()));
  192. }
  193. }
  194. return selectSQLJoin.toString();
  195. }
  196. protected TableRecords buildTableRecords(TableMeta tableMeta, String selectSQL, ArrayList<List<Object>> paramAppenderList) throws SQLException {
  197. ResultSet rs = null;
  198. try (PreparedStatement ps = statementProxy.getConnection().prepareStatement(selectSQL)) { // 执行select for update,然后就拿到了本地锁
  199. if (CollectionUtils.isNotEmpty(paramAppenderList)) {
  200. for (int i = 0, ts = paramAppenderList.size(); i < ts; i++) {
  201. List<Object> paramAppender = paramAppenderList.get(i);
  202. for (int j = 0, ds = paramAppender.size(); j < ds; j++) {
  203. ps.setObject(i * ds + j + 1, paramAppender.get(j));
  204. }
  205. }
  206. }
  207. rs = ps.executeQuery();
  208. return TableRecords.buildRecords(tableMeta, rs);
  209. } finally {
  210. IOUtil.close(rs);
  211. }
  212. }
  213. }
  214. ```
  215. - 执行业务sql
  216. - 查询后镜像
  217. ```
  218. package io.seata.rm.datasource.exec;
  219. public class UpdateExecutor<T, S extends Statement> extends AbstractDMLBaseExecutor<T, S> {
  220. @Override
  221. protected TableRecords afterImage(TableRecords beforeImage) throws SQLException {
  222. TableMeta tmeta = getTableMeta();
  223. if (beforeImage == null || beforeImage.size() == 0) {
  224. return TableRecords.empty(getTableMeta());
  225. }
  226. String selectSQL = buildAfterImageSQL(tmeta, beforeImage);
  227. //SELECT id, count FROM storage_tbl WHERE (id) in ( (?) )
  228. ResultSet rs = null;
  229. try (PreparedStatement pst = statementProxy.getConnection().prepareStatement(selectSQL)) {
  230. SqlGenerateUtils.setParamForPk(beforeImage.pkRows(), getTableMeta().getPrimaryKeyOnlyName(), pst);
  231. rs = pst.executeQuery();
  232. return TableRecords.buildRecords(tmeta, rs);
  233. } finally {
  234. IOUtil.close(rs);
  235. }
  236. }
  237. }
  238. ```
  239. - 准备undoLog
  240. ```
  241. public abstract class BaseTransactionalExecutor<T, S extends Statement> implements Executor<T> {
  242. protected void prepareUndoLog(TableRecords beforeImage, TableRecords afterImage) throws SQLException {
  243. if (beforeImage.getRows().isEmpty() && afterImage.getRows().isEmpty()) {
  244. return;
  245. }
  246. if (SQLType.UPDATE == sqlRecognizer.getSQLType()) {
  247. if (beforeImage.getRows().size() != afterImage.getRows().size()) {
  248. throw new ShouldNeverHappenException("Before image size is not equaled to after image size, probably because you updated the primary keys.");
  249. }
  250. }
  251. ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
  252. TableRecords lockKeyRecords = sqlRecognizer.getSQLType() == SQLType.DELETE ? beforeImage : afterImage;
  253. String lockKeys = buildLockKey(lockKeyRecords);
  254. if (null != lockKeys) {
  255. connectionProxy.appendLockKey(lockKeys);
  256. SQLUndoLog sqlUndoLog = buildUndoItem(beforeImage, afterImage);
  257. connectionProxy.appendUndoLog(sqlUndoLog); // 把undoLog存到connectionProxy中,具体怎么回事上面有提过
  258. }
  259. }
  260. }
  261. ```
  262. - 如果你的sql是select for update则会使用`SelectForUpdateExecutor`(Seata代理了select for update),代理后处理的逻辑是这样的:
  263. - 先执行 select for update(获取数据库本地锁)
  264. - 如果处于`@GlobalTransactional` or `@GlobalLock`,**检查**是否有全局锁
  265. - 如果有全局锁,则未开启本地事务下会rollback本地事务,再重新争抢本地锁和查询全局锁,直到全局锁释放
  266. ```
  267. package io.seata.rm.datasource.exec;
  268. public class SelectForUpdateExecutor<T, S extends Statement> extends BaseTransactionalExecutor<T, S> {
  269. @Override
  270. public T doExecute(Object... args) throws Throwable {
  271. Connection conn = statementProxy.getConnection();
  272. DatabaseMetaData dbmd = conn.getMetaData();
  273. T rs;
  274. Savepoint sp = null;
  275. boolean originalAutoCommit = conn.getAutoCommit();
  276. try {
  277. if (originalAutoCommit) {
  278. /*
  279. * In order to hold the local db lock during global lock checking
  280. * set auto commit value to false first if original auto commit was true
  281. */
  282. conn.setAutoCommit(false);
  283. } else if (dbmd.supportsSavepoints()) {
  284. /*
  285. * In order to release the local db lock when global lock conflict
  286. * create a save point if original auto commit was false, then use the save point here to release db
  287. * lock during global lock checking if necessary
  288. */
  289. sp = conn.setSavepoint();
  290. } else {
  291. throw new SQLException("not support savepoint. please check your db version");
  292. }
  293. LockRetryController lockRetryController = new LockRetryController();
  294. ArrayList<List<Object>> paramAppenderList = new ArrayList<>();
  295. String selectPKSQL = buildSelectSQL(paramAppenderList);
  296. while (true) {
  297. try {
  298. // #870
  299. // execute return Boolean
  300. // executeQuery return ResultSet
  301. rs = statementCallback.execute(statementProxy.getTargetStatement(), args); //执行 select for update(获取数据库本地锁)
  302. // Try to get global lock of those rows selected
  303. TableRecords selectPKRows = buildTableRecords(getTableMeta(), selectPKSQL, paramAppenderList);
  304. String lockKeys = buildLockKey(selectPKRows);
  305. if (StringUtils.isNullOrEmpty(lockKeys)) {
  306. break;
  307. }
  308. if (RootContext.inGlobalTransaction() || RootContext.requireGlobalLock()) {
  309. // Do the same thing under either @GlobalTransactional or @GlobalLock,
  310. // that only check the global lock here.
  311. statementProxy.getConnectionProxy().checkLock(lockKeys);
  312. } else {
  313. throw new RuntimeException("Unknown situation!");
  314. }
  315. break;
  316. } catch (LockConflictException lce) {
  317. if (sp != null) {
  318. conn.rollback(sp);
  319. } else {
  320. conn.rollback();// 回滚,释放本地锁
  321. }
  322. // trigger retry
  323. lockRetryController.sleep(lce);
  324. }
  325. }
  326. } finally {
  327. if (sp != null) {
  328. try {
  329. if (!JdbcConstants.ORACLE.equalsIgnoreCase(getDbType())) {
  330. conn.releaseSavepoint(sp);
  331. }
  332. } catch (SQLException e) {
  333. LOGGER.error("{} release save point error.", getDbType(), e);
  334. }
  335. }
  336. if (originalAutoCommit) {
  337. conn.setAutoCommit(true);
  338. }
  339. }
  340. return rs;
  341. }
  342. }
  343. ```

ConnectionProxy.commit()的处理逻辑

  1. public class ConnectionProxy extends AbstractConnectionProxy {
  2. private final static LockRetryPolicy LOCK_RETRY_POLICY = new LockRetryPolicy();
  3. private ConnectionContext context = new ConnectionContext();
  4. @Override
  5. public void commit() throws SQLException {
  6. try {
  7. LOCK_RETRY_POLICY.execute(() -> {
  8. doCommit();
  9. return null;
  10. });
  11. } catch (SQLException e) {
  12. if (targetConnection != null && !getAutoCommit() && !getContext().isAutoCommitChanged()) {
  13. rollback();
  14. }
  15. throw e;
  16. } catch (Exception e) {
  17. throw new SQLException(e);
  18. }
  19. }
  20. private void doCommit() throws SQLException {
  21. if (context.inGlobalTransaction()) {
  22. processGlobalTransactionCommit();
  23. } else if (context.isGlobalLockRequire()) {
  24. processLocalCommitWithGlobalLocks();
  25. } else {
  26. targetConnection.commit();
  27. }
  28. }
  29. }

也在这打下个疑问,后边解释。
ConnectionProxy里的ConnectionContext是如何判断inGlobalTransaction() or isGlobalLockRequire()的呢?

  • 处于全局事务中(即,数据持久化方法带有@GlobalTransactional

    • 注册分支事务,获取全局锁
    • undoLog数据入库
    • 让数据库commit本次事务

    ```

    1. public class ConnectionProxy extends AbstractConnectionProxy {
    2. private final static LockRetryPolicy LOCK_RETRY_POLICY = new LockRetryPolicy();
    3. private ConnectionContext context = new ConnectionContext();
    4. private void processGlobalTransactionCommit() throws SQLException {
    5. try {
    6. register(); // 注册分支,争全局锁
    7. } catch (TransactionException e) {
    8. recognizeLockKeyConflictException(e, context.buildLockKeys());
    9. }
    10. try {
    11. UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this); // undolog入库
    12. targetConnection.commit(); // 分支事务提交
    13. } catch (Throwable ex) {
    14. LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
    15. report(false);
    16. throw new SQLException(ex);
    17. }
    18. if (IS_REPORT_SUCCESS_ENABLE) {
    19. report(true);
    20. }
    21. context.reset();
    22. }
    23. private void register() throws TransactionException {
    24. if (!context.hasUndoLog() || !context.hasLockKey()) {
    25. return;
    26. }
    27. Long branchId = DefaultResourceManager.get().branchRegister(BranchType.AT, getDataSourceProxy().getResourceId(),
    28. null, context.getXid(), null, context.buildLockKeys());
    29. context.setBranchId(branchId);
    30. }
  1. }
  2. ```
  • 处于@GlobalLock中(即,数据持久化方法带有@GlobalLock

    • 向tc查询是否有全局锁存在
    • 让数据库commit本次事务

    ``` public class ConnectionProxy extends AbstractConnectionProxy {

    1. private final static LockRetryPolicy LOCK_RETRY_POLICY = new LockRetryPolicy();
    2. private ConnectionContext context = new ConnectionContext();
    3. private void processLocalCommitWithGlobalLocks() throws SQLException {
    4. checkLock(context.buildLockKeys());
    5. try {
    6. targetConnection.commit();
    7. } catch (Throwable ex) {
    8. throw new SQLException(ex);
    9. }
    10. context.reset();
    11. }
    12. public void checkLock(String lockKeys) throws SQLException {
    13. if (StringUtils.isBlank(lockKeys)) {
    14. return;
    15. }
    16. // Just check lock without requiring lock by now.
    17. try {
    18. boolean lockable = DefaultResourceManager.get().lockQuery(BranchType.AT,
    19. getDataSourceProxy().getResourceId(), context.getXid(), lockKeys);
    20. if (!lockable) {
    21. throw new LockConflictException();
    22. }
    23. } catch (TransactionException e) {
    24. recognizeLockKeyConflictException(e, lockKeys);
    25. }
    26. }
  1. }
  2. ```
  • 除了以上情况(else分支)

    • 让数据库commit本次事务

介绍RootContext

我们在上面留下了3个“扣儿”,现在到了结合RootContext源码来解答的时候。

  1. RootContext.getBranchType()的返回值怎么会是AT?
    该方法的判断逻辑是:只要判断出当前处于全局事务中(即,只要有地方调用过RootContext.bind(xid)), 就会返回默认BranchType.AT

    1. public class RootContext {
    2. public static final String KEY_XID = "TX_XID";
    3. private static ContextCore CONTEXT_HOLDER = ContextCoreLoader.load();
    4. private static BranchType DEFAULT_BRANCH_TYPE;
    5. @Nullable
    6. public static BranchType getBranchType() {
    7. if (inGlobalTransaction()) {
    8. BranchType branchType = (BranchType) CONTEXT_HOLDER.get(KEY_BRANCH_TYPE);
    9. if (branchType != null) {
    10. return branchType;
    11. }
    12. //Returns the default branch type.
    13. return DEFAULT_BRANCH_TYPE != null ? DEFAULT_BRANCH_TYPE : BranchType.AT;
    14. }
    15. return null;
    16. }
    17. public static boolean inGlobalTransaction() {
    18. return CONTEXT_HOLDER.get(KEY_XID) != null;
    19. }
    20. public static void bind(@Nonnull String xid) {
    21. if (StringUtils.isBlank(xid)) {
    22. if (LOGGER.isDebugEnabled()) {
    23. LOGGER.debug("xid is blank, switch to unbind operation!");
    24. }
    25. unbind();
    26. } else {
    27. MDC.put(MDC_KEY_XID, xid);
    28. if (LOGGER.isDebugEnabled()) {
    29. LOGGER.debug("bind {}", xid);
    30. }
    31. CONTEXT_HOLDER.put(KEY_XID, xid);
    32. }
    33. }
    34. }
  2. RootContext.requireGlobalLock()怎么判断当前是否需要全局锁?
    需要有地方调用RootContext.bindGlobalLockFlag()

    1. public class RootContext {
    2. public static final String KEY_GLOBAL_LOCK_FLAG = "TX_LOCK";
    3. public static final Boolean VALUE_GLOBAL_LOCK_FLAG = true;
    4. private static ContextCore CONTEXT_HOLDER = ContextCoreLoader.load();
    5. public static boolean requireGlobalLock() {
    6. return CONTEXT_HOLDER.get(KEY_GLOBAL_LOCK_FLAG) != null;
    7. }
    8. public static void bindGlobalLockFlag() {
    9. if (LOGGER.isDebugEnabled()) {
    10. LOGGER.debug("Local Transaction Global Lock support enabled");
    11. }
    12. //just put something not null
    13. CONTEXT_HOLDER.put(KEY_GLOBAL_LOCK_FLAG, VALUE_GLOBAL_LOCK_FLAG);
    14. }
    15. }
  3. ConnectionProxy.commit()会根据context的不同状态区分处理,那ConnectionContext是如何判断inGlobalTransaction() or isGlobalLockRequire()的呢?

    1. public class ConnectionProxy extends AbstractConnectionProxy {
    2. private ConnectionContext context = new ConnectionContext();
    3. private void doCommit() throws SQLException {
    4. if (context.inGlobalTransaction()) {
    5. processGlobalTransactionCommit();
    6. } else if (context.isGlobalLockRequire()) {
    7. processLocalCommitWithGlobalLocks();
    8. } else {
    9. targetConnection.commit();
    10. }
    11. }
    12. }
    • 如何判断inGlobalTransaction()?(注意下,这里和上面提到的RootContext不是一个东西)

      1. public class ConnectionContext {
      2. private String xid;
      3. void setXid(String xid) {
      4. this.xid = xid;
      5. }
      6. public boolean inGlobalTransaction() {
      7. return xid != null;
      8. }
      9. void bind(String xid) {
      10. if (xid == null) {
      11. throw new IllegalArgumentException("xid should not be null");
      12. }
      13. if (!inGlobalTransaction()) {
      14. setXid(xid);
      15. } else {
      16. if (!this.xid.equals(xid)) {
      17. throw new ShouldNeverHappenException();
      18. }
      19. }
      20. }
      21. }

      哪里调用的ConnectionContext.bind(xid)?

      1. package io.seata.rm.datasource.exec;
      2. public abstract class BaseTransactionalExecutor<T, S extends Statement> implements Executor<T> {
      3. @Override
      4. public T execute(Object... args) throws Throwable {
      5. // 那么,这里的XID哪来的呢?往后看就知道,是来自开启全局事务的时候获得的,和@GlobalTransactional有关
      6. String xid = RootContext.getXID();
      7. if (xid != null) {
      8. statementProxy.getConnectionProxy().bind(xid);
      9. }
      10. // 这里就是设置 isGlobalLockRequire的位置,和 @GlobalLock有关
      11. statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock());
      12. return doExecute(args);
      13. }
      14. }
      1. public class ConnectionProxy extends AbstractConnectionProxy {
      2. private ConnectionContext context = new ConnectionContext();
      3. public void bind(String xid) {
      4. context.bind(xid);
      5. }
      6. public void setGlobalLockRequire(boolean isLock) {
      7. context.setGlobalLockRequire(isLock);
      8. }
      9. }
    • 如何判断isGlobalLockRequire()

      1. public class ConnectionContext {
      2. private boolean isGlobalLockRequire;
      3. boolean isGlobalLockRequire() {
      4. return isGlobalLockRequire;
      5. }
      6. void setGlobalLockRequire(boolean isGlobalLockRequire) {
      7. this.isGlobalLockRequire = isGlobalLockRequire;
      8. }
      9. }

    在看过代码后,我们知道,只要有地方在RootContext中设置了xid,或bindGlobalLockFlag(),就会识别成不同的状态。 那么哪儿调用的呢?答案就在下方的GlobalTransactionalInterceptor中。

GlobalTransactionalInterceptor处理带有@GlobalTransactional@GlobalLock的方法

带有@GlobalTransactional@GlobalLock的方法会被代理,交给GlobalTransactionalInterceptor处理

  1. public class GlobalTransactionalInterceptor implements ConfigurationChangeListener, MethodInterceptor {
  2. @Override
  3. public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
  4. Class<?> targetClass =
  5. methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;
  6. Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
  7. if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {
  8. final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
  9. final GlobalTransactional globalTransactionalAnnotation =
  10. getAnnotation(method, targetClass, GlobalTransactional.class);
  11. final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);
  12. boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes);
  13. if (!localDisable) {
  14. if (globalTransactionalAnnotation != null) {
  15. return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);// 处理 @GlobalTransactional
  16. } else if (globalLockAnnotation != null) {
  17. return handleGlobalLock(methodInvocation, globalLockAnnotation); // 处理 @GlobalLock
  18. }
  19. }
  20. }
  21. return methodInvocation.proceed();
  22. }
  23. }

先看处理@GlobalTransactional

  1. public class GlobalTransactionalInterceptor implements ConfigurationChangeListener, MethodInterceptor {
  2. private final TransactionalTemplate transactionalTemplate = new TransactionalTemplate();
  3. Object handleGlobalTransaction(final MethodInvocation methodInvocation,
  4. final GlobalTransactional globalTrxAnno) throws Throwable {
  5. //...
  6. try {
  7. return transactionalTemplate.execute(...);
  8. } catch (TransactionalExecutor.ExecutionException e) {
  9. // ...
  10. } finally {
  11. //...
  12. }
  13. }
  14. }

来到了经典的seata事务模板方法,我们要关注开启事务的部分

  1. public class TransactionalTemplate {
  2. public Object execute(TransactionalExecutor business) throws Throwable {
  3. // 1. Get transactionInfo
  4. //...
  5. // 1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'.
  6. GlobalTransaction tx = GlobalTransactionContext.getCurrent();
  7. // 1.2 Handle the transaction propagation.
  8. // ...
  9. // 1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'.
  10. if (tx == null) {
  11. tx = GlobalTransactionContext.createNew();
  12. }
  13. //...
  14. try {
  15. // 2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,
  16. // else do nothing. Of course, the hooks will still be triggered.
  17. beginTransaction(txInfo, tx);
  18. Object rs;
  19. try {
  20. // Do Your Business
  21. rs = business.execute();
  22. } catch (Throwable ex) {
  23. // 3. The needed business exception to rollback.
  24. completeTransactionAfterThrowing(txInfo, tx, ex);
  25. throw ex;
  26. }
  27. // 4. everything is fine, commit.
  28. commitTransaction(tx);
  29. return rs;
  30. } finally {
  31. //5. clear
  32. //...
  33. }
  34. } finally {
  35. // If the transaction is suspended, resume it.
  36. // ...
  37. }
  38. }
  39. private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
  40. try {
  41. triggerBeforeBegin();
  42. tx.begin(txInfo.getTimeOut(), txInfo.getName());
  43. triggerAfterBegin();
  44. } catch (TransactionException txe) {
  45. throw new TransactionalExecutor.ExecutionException(tx, txe,
  46. TransactionalExecutor.Code.BeginFailure);
  47. }
  48. }
  49. }
  1. public class DefaultGlobalTransaction implements GlobalTransaction {
  2. @Override
  3. public void begin(int timeout, String name) throws TransactionException {
  4. if (role != GlobalTransactionRole.Launcher) {
  5. assertXIDNotNull();
  6. if (LOGGER.isDebugEnabled()) {
  7. LOGGER.debug("Ignore Begin(): just involved in global transaction [{}]", xid);
  8. }
  9. return;
  10. }
  11. assertXIDNull();
  12. String currentXid = RootContext.getXID();
  13. if (currentXid != null) {
  14. throw new IllegalStateException("Global transaction already exists," +
  15. " can't begin a new global transaction, currentXid = " + currentXid);
  16. }
  17. xid = transactionManager.begin(null, null, name, timeout);
  18. status = GlobalStatus.Begin;
  19. RootContext.bind(xid); // 绑定xid
  20. if (LOGGER.isInfoEnabled()) {
  21. LOGGER.info("Begin new global transaction [{}]", xid);
  22. }
  23. }
  24. }

看到了吗?RootContext.bind(xid);

接着看处理@GlobalLock

  1. public class GlobalTransactionalInterceptor implements ConfigurationChangeListener, MethodInterceptor {
  2. private final GlobalLockTemplate globalLockTemplate = new GlobalLockTemplate();
  3. Object handleGlobalLock(final MethodInvocation methodInvocation,
  4. final GlobalLock globalLockAnno) throws Throwable {
  5. return globalLockTemplate.execute(new GlobalLockExecutor() {...});
  6. }
  7. }

也使用了模板方法来处理GlobalLock

  1. public class GlobalLockTemplate {
  2. public Object execute(GlobalLockExecutor executor) throws Throwable {
  3. boolean alreadyInGlobalLock = RootContext.requireGlobalLock();
  4. if (!alreadyInGlobalLock) {
  5. RootContext.bindGlobalLockFlag();
  6. }
  7. // set my config to config holder so that it can be access in further execution
  8. // for example, LockRetryController can access it with config holder
  9. GlobalLockConfig myConfig = executor.getGlobalLockConfig();
  10. GlobalLockConfig previousConfig = GlobalLockConfigHolder.setAndReturnPrevious(myConfig);
  11. try {
  12. return executor.execute();
  13. } finally {
  14. // only unbind when this is the root caller.
  15. // otherwise, the outer caller would lose global lock flag
  16. if (!alreadyInGlobalLock) {
  17. RootContext.unbindGlobalLockFlag();
  18. }
  19. // if previous config is not null, we need to set it back
  20. // so that the outer logic can still use their config
  21. if (previousConfig != null) {
  22. GlobalLockConfigHolder.setAndReturnPrevious(previousConfig);
  23. } else {
  24. GlobalLockConfigHolder.remove();
  25. }
  26. }
  27. }
  28. }

看到吗,一进模板方法就RootContext.bindGlobalLockFlag();