Seata Transaction Isolation

This article aims to help users understand how to correctly implement transaction isolation when using Seata AT mode to prevent dirty reads and writes.

It is expected that readers have already read the introduction to the AT mode on the Seata official website and have an understanding of local database locks.

(For example, when two transactions are simultaneously updating the same record, only the transaction that holds the record lock can update successfully, while the other transaction must wait until the record lock is released, or until the transaction times out)

First, take a look at this piece of code. Although it looks “basic,” the main thing that the persistence layer framework actually does for us is just that.

  1. @Service
  2. public class StorageService {
  3. @Autowired
  4. private DataSource dataSource;
  5. @GlobalTransactional
  6. public void batchUpdate() throws SQLException {
  7. Connection connection = null;
  8. PreparedStatement preparedStatement = null;
  9. try {
  10. connection = dataSource.getConnection();
  11. connection.setAutoCommit(false);
  12. String sql = "update storage_tbl set count = ?" +
  13. " where id = ? and commodity_code = ?";
  14. preparedStatement = connection.prepareStatement(sql);
  15. preparedStatement.setInt(1, 100);
  16. preparedStatement.setLong(2, 1);
  17. preparedStatement.setString(3, "2001");
  18. preparedStatement.executeUpdate();
  19. connection.commit();
  20. } catch (Exception e) {
  21. throw e;
  22. } finally {
  23. IOutils.close(preparedStatement);
  24. IOutils.close(connection);
  25. }
  26. }
  27. }

Starting with the Proxy Data Source

When using the AT mode, the most important thing is the proxy data source. So what is the purpose of using DataSourceProxy to proxy the data source?

DataSourceProxy can help us obtain several important proxy objects

  • Obtain ConnectionProxy through DataSourceProxy.getConnection()

  • Obtain StatementProxy through ConnectionProxy.prepareStatement(...)

Seata’s implementation of transaction isolation is hidden in these 2 proxies, let me outline the implementation logic first.

Processing logic of StatementProxy.executeXXX()

  • When calling io.seata.rm.datasource.StatementProxy.executeXXX(), the SQL is passed to io.seata.rm.datasource.exec.ExecuteTemplate.execute(...) to process.

    • In the ExecuteTemplate.execute(...) method, Seata uses different Executers based on different dbType and SQL statement types, and calls the execute(Object... args) method of the io.seata.rm.datasource.exec.Executer class.
    • If a DML type Executer is chosen, the following main actions are performed:
      • Pre-query image (select for update, obtaining local lock at this time)
      • Execute business SQL
      • Post-query image
      • Prepare undoLog
    • If your SQL is select for update, then SelectForUpdateExecutor will be used (Seata proxies select for update), and the logic for post-processing after proxying is as follows:
      • First, execute select for update (obtain the database’s local lock)
      • If in @GlobalTransactional or @GlobalLock, check if there is a global lock
      • If there is a global lock, under the condition of not starting a local transaction, rollback the local transaction, then re-acquire the local lock and global lock, and so on, unless the global lock is obtained.

Handling logic of ConnectionProxy.commit()

  • In a global transaction (i.e., the data persistence method has @GlobalTransactional)
    • Register branch transaction, obtain global lock
    • UndoLog data persistence
    • Let the database commit the current transaction
  • In @GlobalLock (i.e., the data persistence method has @GlobalLock)
    • Query the TC for the existence of a global lock, and if it exists, throw an exception
    • Let the database commit the current transaction
  • For other cases (the else branch)
    • Let the database commit the current transaction

Purpose of @GlobalTransactional

Identifies a global transaction

Purpose of @GlobalLock + select for update

If a method like updateA() has @GlobalLock + select for update, Seata, in processing, will first obtain a database local lock, then query if there is a global lock for that record, and if there is, it will throw a LockConflictException.

Let’s first give an example of dirty write, and then see how Seata prevents dirty write

Let’s assume your business code is like this:

  • updateAll() is used to update records in both table A and B, updateA() and updateB() are used to update records in table A and B respectively
  • updateAll() has already been annotated with @GlobalTransactional
  1. class YourBussinessService {
  2. DbServiceA serviceA;
  3. DbServiceB serviceB;
  4. @GlobalTransactional
  5. public boolean updateAll(DTO dto) {
  6. serviceA.update(dto.getA());
  7. serviceB.update(dto.getB());
  8. }
  9. public boolean updateA(DTO dto) {
  10. serviceA.update(dto.getA());
  11. }
  12. }
  1. class DbServiceA {
  2. @Transactional
  3. public boolean update(A a) {
  4. }
  5. }

dirty-write |

How to prevent dirty write using Seata?

Method 1: Add @GlobalTransactional to updateA() as well, how does Seata ensure transaction isolation in this case?

  1. class DbServiceA {
  2. @GlobalTransactional
  3. @Transactional
  4. public boolean updateA(DTO dto) {
  5. serviceA.update(dto.getA());
  6. }
  7. }
  • updateAll() is called first (not completed), updateA() is called afterwards

dirty-write

Method 2: @GlobalLock + select for update

  1. class DbServiceA {
  2. @GlobalLock
  3. @Transactional
  4. public boolean updateA(DTO dto) {
  5. serviceA.selectForUpdate(dto.getA());
  6. serviceA.update(dto.getA());
  7. }
  8. }
  • updateAll() is called first (not completed), updateA() is called afterwards

dirty-write

  • What if updateA() is called first (not completed), and then updateAll() is called? Since both transactions need to acquire local locks first, dirty write will not occur.
  • Someone may ask, “Why do we need to add select for update here? Can’t we prevent dirty write with just @GlobalLock?” Yes. But please refer to the diagram above, select for update brings a few advantages:
    • Lock conflicts are handled more gracefully. If only @GlobalLock is used, it immediately throws an exception when a global lock is detected. It’s a pity to release the global lock after a little “persistence” and throw an exception.
    • In updateA(), we can use select for update to get the latest A and then perform the update.

How to prevent dirty reads?

Scenario: One business calls updateAll() first, updateAll() is not completed, and then another business calls queryA()

dirty-write


Source Code Display

  1. @Service
  2. public class StorageService {
  3. @Autowired
  4. private DataSource dataSource;
  5. @GlobalTransactional
  6. public void update() throws SQLException {
  7. Connection connection = null;
  8. PreparedStatement preparedStatement = null;
  9. try {
  10. connection = dataSource.getConnection();
  11. connection.setAutoCommit(false);
  12. String sql = "update storage_tbl set count = ?" +
  13. " where id = ? and commodity_code = ?";
  14. preparedStatement = connection.prepareStatement(sql);
  15. preparedStatement.setInt(1, 100);
  16. preparedStatement.setLong(2, 1);
  17. preparedStatement.setString(3, "2001");
  18. preparedStatement.execute();
  19. connection.commit();
  20. } catch (Exception e) {
  21. throw e;
  22. } finally {
  23. IOutils.close(preparedStatement);
  24. IOutils.close(connection);
  25. }
  26. }
  27. }

Although this code looks very basic and does not use the persistence layer framework, if we abstract what the framework does for us, it is actually the above code.

Brief explanation of the context of the following source code introduction (mainly focusing on source code related to transaction isolation)

  • Purpose of proxy data source
    • The role of DataSourceProxy (returns ConnectionProxy)
      • Introducing a small function of ConnectionProxy (storing undolog)
    • The role of ConnectionProxy (returns StatementProxy)
    • Processing logic of StatementProxy.execute()
      • Execution logic of io.seata.rm.datasource.exec.UpdateExecutor (pre-check image, execute sql, post-check image, prepare undoLog)
      • Execution logic of SelectForUpdateExecutor (fight for local lock, check global lock. If there is a global lock, roll back, fight again…)
    • Processing logic of ConnectionProxy.commit() (register branch transaction (fight for global lock), write undoLog, database commit)
  • Introducing RootContext
  • Different proxy logic for GlobalTransactionalInterceptor
    • How to handle with @GlobalTransactional
    • How to deal with @GlobalLock

The role of DataSourceProxy

DataSourceProxy helps us obtain several important proxy objects

  • Obtain ConnectionProxy through DataSourceProxy.getConnection()

    1. package io.seata.rm.datasource;
    2. import java.sql.Connection;
    3. public class DataSourceProxy extends AbstractDataSourceProxy implements Resource {
    4. @Override
    5. public ConnectionProxy getConnection() throws SQLException {
    6. Connection targetConnection = targetDataSource.getConnection();
    7. return new ConnectionProxy(this, targetConnection);
    8. }
    9. }
    • First, let’s introduce ConnectionContext in ConnectionProxy, one of its functions is to store undoLog.

      1. package io.seata.rm.datasource;
      2. import io.seata.rm.datasource.undo.SQLUndoLog;
      3. public class ConnectionProxy extends AbstractConnectionProxy {
      4. private ConnectionContext context = new ConnectionContext();
      5. public void appendUndoLog(SQLUndoLog sqlUndoLog) {
      6. context.appendUndoItem(sqlUndoLog);
      7. }
      8. }
      1. package io.seata.rm.datasource;
      2. public class ConnectionContext {
      3. private static final Savepoint DEFAULT_SAVEPOINT = new Savepoint() {
      4. @Override
      5. public int getSavepointId() throws SQLException {
      6. return 0;
      7. }
      8. @Override
      9. public String getSavepointName() throws SQLException {
      10. return "DEFAULT_SEATA_SAVEPOINT";
      11. }
      12. };
      13. private final Map<Savepoint, List<SQLUndoLog>> sqlUndoItemsBuffer = new LinkedHashMap<>();
      14. private Savepoint currentSavepoint = DEFAULT_SAVEPOINT;
      15. void appendUndoItem(SQLUndoLog sqlUndoLog) {
      16. sqlUndoItemsBuffer.computeIfAbsent(currentSavepoint, k -> new ArrayList<>()).add(sqlUndoLog);
      17. }
      18. }

Get StatementProxy through ConnectionProxy.prepareStatement(...)

  1. package io.seata.rm.datasource;
  2. public class ConnectionProxy extends AbstractConnectionProxy {
  3. public ConnectionProxy(DataSourceProxy dataSourceProxy, Connection targetConnection) {
  4. super(dataSourceProxy, targetConnection);
  5. }
  6. }
  1. package io.seata.rm.datasource;
  2. import java.sql.Connection;
  3. public abstract class AbstractConnectionProxy implements Connection {
  4. protected Connection targetConnection;
  5. public AbstractConnectionProxy(DataSourceProxy dataSourceProxy, Connection targetConnection) {
  6. this.dataSourceProxy = dataSourceProxy;
  7. this.targetConnection = targetConnection;
  8. }
  9. @Override
  10. public PreparedStatement prepareStatement(String sql) throws SQLException {
  11. String dbType = getDbType();
  12. // support oracle 10.2+
  13. PreparedStatement targetPreparedStatement = null;
  14. if (BranchType.AT == RootContext.getBranchType()) { //为什么这里会返回AT?
  15. List<SQLRecognizer> sqlRecognizers = SQLVisitorFactory.get(sql, dbType);
  16. if (sqlRecognizers != null && sqlRecognizers.size() == 1) {
  17. SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
  18. if (sqlRecognizer != null && sqlRecognizer.getSQLType() == SQLType.INSERT) {
  19. TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dbType).getTableMeta(getTargetConnection(),
  20. sqlRecognizer.getTableName(), getDataSourceProxy().getResourceId());
  21. String[] pkNameArray = new String[tableMeta.getPrimaryKeyOnlyName().size()];
  22. tableMeta.getPrimaryKeyOnlyName().toArray(pkNameArray);
  23. // If it is an insert statement, the PreparedStatement created here needs to be able to return the automatically generated primary key, so use this prepareStatement()
  24. targetPreparedStatement = getTargetConnection().prepareStatement(sql,pkNameArray);
  25. }
  26. }
  27. }
  28. if (targetPreparedStatement == null) {
  29. targetPreparedStatement = getTargetConnection().prepareStatement(sql);
  30. }
  31. return new PreparedStatementProxy(this, targetPreparedStatement, sql);
  32. }
  33. public Connection getTargetConnection() {
  34. return targetConnection;
  35. }
  36. }

First, let’s raise a question here, and explain it later.
How could RootContext.getBranchType() return AT?

Processing logic for StatementProxy.execute()

  • When calling io.seata.rm.datasource.StatementProxy.execute(), the SQL will be handed over to io.seata.rm.datasource.exec.ExecuteTemplate.execute(...) for processing.

    1. package io.seata.rm.datasource;
    2. public class PreparedStatementProxy extends AbstractPreparedStatementProxy
    3. implements PreparedStatement, ParametersHolder {
    4. @Override
    5. public boolean execute() throws SQLException {
    6. return ExecuteTemplate.execute(this, (statement, args) -> statement.execute());
    7. }
    8. }
    • In the ExecuteTemplate.execute(...) method, Seata uses different Executers based on the dbType and the type of SQL statement, and calls the execute(Object... args) method of the io.seata.rm.datasource.exec.Executer class.

      ``` package io.seata.rm.datasource.exec;

  1. public class ExecuteTemplate {
  2. public static <T, S extends Statement> T execute(StatementProxy<S> statementProxy,
  3. StatementCallback<T, S> statementCallback,
  4. Object... args) throws SQLException {
  5. return execute(null, statementProxy, statementCallback, args);
  6. }
  7. public static <T, S extends Statement> T execute(List<SQLRecognizer> sqlRecognizers,
  8. StatementProxy<S> statementProxy,
  9. StatementCallback<T, S> statementCallback,
  10. Object... args) throws SQLException {
  11. if (!RootContext.requireGlobalLock() && BranchType.AT != RootContext.getBranchType()) {
  12. // Just work as original statement
  13. return statementCallback.execute(statementProxy.getTargetStatement(), args);
  14. }
  15. String dbType = statementProxy.getConnectionProxy().getDbType();
  16. if (CollectionUtils.isEmpty(sqlRecognizers)) {
  17. sqlRecognizers = SQLVisitorFactory.get(
  18. statementProxy.getTargetSQL(),
  19. dbType);
  20. }
  21. Executor<T> executor;
  22. if (CollectionUtils.isEmpty(sqlRecognizers)) {
  23. executor = new PlainExecutor<>(statementProxy, statementCallback);
  24. } else {
  25. if (sqlRecognizers.size() == 1) {
  26. SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
  27. switch (sqlRecognizer.getSQLType()) {
  28. case INSERT:
  29. executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType,
  30. new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class},
  31. new Object[]{statementProxy, statementCallback, sqlRecognizer});
  32. break;
  33. case UPDATE:
  34. executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
  35. break;
  36. case DELETE:
  37. executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);
  38. break;
  39. case SELECT_FOR_UPDATE:
  40. executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
  41. break;
  42. default:
  43. executor = new PlainExecutor<>(statementProxy, statementCallback);
  44. break;
  45. }
  46. } else {
  47. executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers);
  48. }
  49. }
  50. T rs;
  51. try {
  52. rs = executor.execute(args);
  53. } catch (Throwable ex) {
  54. if (!(ex instanceof SQLException)) {
  55. // Turn other exception into SQLException
  56. ex = new SQLException(ex);
  57. }
  58. throw (SQLException) ex;
  59. }
  60. return rs;
  61. }
  62. }
  63. ```
  64. > Also, a question is raised here, explained later. **How does `RootContext.requireGlobalLock()` determine if the global lock is needed?**
  65. Taking `io.seata.rm.datasource.exec.UpdateExecutor` as an example, `UpdateExecutor` extends `AbstractDMLBaseExecutor` extends `BaseTransactionalExecutor`. Observing what the `execute()` method does
  66. ```
  67. package io.seata.rm.datasource.exec;
  68. public abstract class BaseTransactionalExecutor<T, S extends Statement> implements Executor<T> {
  69. protected StatementProxy<S> statementProxy;
  70. protected StatementCallback<T, S> statementCallback;
  71. protected SQLRecognizer sqlRecognizer;
  72. public BaseTransactionalExecutor(StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback,
  73. SQLRecognizer sqlRecognizer) {
  74. this.statementProxy = statementProxy;
  75. this.statementCallback = statementCallback;
  76. this.sqlRecognizer = sqlRecognizer;
  77. }
  78. @Override
  79. public T execute(Object... args) throws Throwable {
  80. String xid = RootContext.getXID();
  81. if (xid != null) {
  82. statementProxy.getConnectionProxy().bind(xid);
  83. }
  84. statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock());
  85. return doExecute(args);
  86. }
  87. }
  88. ```
  89. ```
  90. public abstract class AbstractDMLBaseExecutor<T, S extends Statement> extends BaseTransactionalExecutor<T, S> {
  91. public AbstractDMLBaseExecutor(StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback,
  92. SQLRecognizer sqlRecognizer) {
  93. super(statementProxy, statementCallback, sqlRecognizer);
  94. }
  95. @Override
  96. public T doExecute(Object... args) throws Throwable {
  97. AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
  98. if (connectionProxy.getAutoCommit()) {
  99. return executeAutoCommitTrue(args);
  100. } else {
  101. return executeAutoCommitFalse(args);
  102. }
  103. }
  104. protected T executeAutoCommitTrue(Object[] args) throws Throwable {
  105. ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
  106. try {
  107. connectionProxy.changeAutoCommit(); // 注意,你如果没开启事务,seata帮你开启
  108. return new LockRetryPolicy(connectionProxy).execute(() -> {
  109. T result = executeAutoCommitFalse(args);
  110. connectionProxy.commit(); // 帮你开启事务后,通过connectionProxy来提交
  111. return result;
  112. });
  113. } catch (Exception e) {
  114. // when exception occur in finally,this exception will lost, so just print it here
  115. LOGGER.error("execute executeAutoCommitTrue error:{}", e.getMessage(), e);
  116. if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) {
  117. connectionProxy.getTargetConnection().rollback();
  118. }
  119. throw e;
  120. } finally {
  121. connectionProxy.getContext().reset();
  122. connectionProxy.setAutoCommit(true);
  123. }
  124. }
  125. protected T executeAutoCommitFalse(Object[] args) throws Exception {
  126. if (!JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && isMultiPk()) {
  127. throw new NotSupportYetException("multi pk only support mysql!");
  128. }
  129. TableRecords beforeImage = beforeImage();
  130. T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
  131. TableRecords afterImage = afterImage(beforeImage);
  132. prepareUndoLog(beforeImage, afterImage);
  133. return result;
  134. }
  135. }
  136. ```
  137. ```
  138. package io.seata.rm.datasource.exec;
  139. public class UpdateExecutor<T, S extends Statement> extends AbstractDMLBaseExecutor<T, S> {
  140. public UpdateExecutor(StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback,
  141. SQLRecognizer sqlRecognizer) {
  142. super(statementProxy, statementCallback, sqlRecognizer);
  143. }
  144. }
  145. ```
  146. - If you have chosen a DML type Executer, you can see in the executeAutoCommitFalse() method above, it mainly does the following:
  147. - Query before image (select for update, so local lock is acquired at this time)
  148. ```
  149. package io.seata.rm.datasource.exec;
  150. public class UpdateExecutor<T, S extends Statement> extends AbstractDMLBaseExecutor<T, S> {
  151. private static final boolean ONLY_CARE_UPDATE_COLUMNS = CONFIG.getBoolean(
  152. ConfigurationKeys.TRANSACTION_UNDO_ONLY_CARE_UPDATE_COLUMNS, DefaultValues.DEFAULT_ONLY_CARE_UPDATE_COLUMNS); // 默认为true
  153. @Override
  154. protected TableRecords beforeImage() throws SQLException {
  155. ArrayList<List<Object>> paramAppenderList = new ArrayList<>();
  156. TableMeta tmeta = getTableMeta();
  157. String selectSQL = buildBeforeImageSQL(tmeta, paramAppenderList);
  158. // SELECT id, count FROM storage_tbl WHERE id = ? FOR UPDATE
  159. return buildTableRecords(tmeta, selectSQL, paramAppenderList);
  160. }
  161. private String buildBeforeImageSQL(TableMeta tableMeta, ArrayList<List<Object>> paramAppenderList) {
  162. SQLUpdateRecognizer recognizer = (SQLUpdateRecognizer) sqlRecognizer;
  163. List<String> updateColumns = recognizer.getUpdateColumns();
  164. StringBuilder prefix = new StringBuilder("SELECT ");
  165. StringBuilder suffix = new StringBuilder(" FROM ").append(getFromTableInSQL());
  166. String whereCondition = buildWhereCondition(recognizer, paramAppenderList);
  167. if (StringUtils.isNotBlank(whereCondition)) {
  168. suffix.append(WHERE).append(whereCondition);
  169. }
  170. String orderBy = recognizer.getOrderBy();
  171. if (StringUtils.isNotBlank(orderBy)) {
  172. suffix.append(orderBy);
  173. }
  174. ParametersHolder parametersHolder = statementProxy instanceof ParametersHolder ? (ParametersHolder)statementProxy : null;
  175. String limit = recognizer.getLimit(parametersHolder, paramAppenderList);
  176. if (StringUtils.isNotBlank(limit)) {
  177. suffix.append(limit);
  178. }
  179. suffix.append(" FOR UPDATE");
  180. StringJoiner selectSQLJoin = new StringJoiner(", ", prefix.toString(), suffix.toString());
  181. if (ONLY_CARE_UPDATE_COLUMNS) {
  182. if (!containsPK(updateColumns)) {// 如果本次更新的行不包含主键,那select for update的时候加上主键
  183. selectSQLJoin.add(getColumnNamesInSQL(tableMeta.getEscapePkNameList(getDbType())));
  184. }
  185. for (String columnName : updateColumns) {
  186. selectSQLJoin.add(columnName);
  187. }
  188. } else {
  189. for (String columnName : tableMeta.getAllColumns().keySet()) {
  190. selectSQLJoin.add(ColumnUtils.addEscape(columnName, getDbType()));
  191. }
  192. }
  193. return selectSQLJoin.toString();
  194. }
  195. protected TableRecords buildTableRecords(TableMeta tableMeta, String selectSQL, ArrayList<List<Object>> paramAppenderList) throws SQLException {
  196. ResultSet rs = null;
  197. try (PreparedStatement ps = statementProxy.getConnection().prepareStatement(selectSQL)) { // 执行select for update,然后就拿到了本地锁
  198. if (CollectionUtils.isNotEmpty(paramAppenderList)) {
  199. for (int i = 0, ts = paramAppenderList.size(); i < ts; i++) {
  200. List<Object> paramAppender = paramAppenderList.get(i);
  201. for (int j = 0, ds = paramAppender.size(); j < ds; j++) {
  202. ps.setObject(i * ds + j + 1, paramAppender.get(j));
  203. }
  204. }
  205. }
  206. rs = ps.executeQuery();
  207. return TableRecords.buildRecords(tableMeta, rs);
  208. } finally {
  209. IOUtil.close(rs);
  210. }
  211. }
  212. }
  213. ```
  214. - Execute business SQL
  215. - Query the mirrored image
  216. ```
  217. package io.seata.rm.datasource.exec;
  218. public class UpdateExecutor<T, S extends Statement> extends AbstractDMLBaseExecutor<T, S> {
  219. @Override
  220. protected TableRecords afterImage(TableRecords beforeImage) throws SQLException {
  221. TableMeta tmeta = getTableMeta();
  222. if (beforeImage == null || beforeImage.size() == 0) {
  223. return TableRecords.empty(getTableMeta());
  224. }
  225. String selectSQL = buildAfterImageSQL(tmeta, beforeImage);
  226. //SELECT id, count FROM storage_tbl WHERE (id) in ( (?) )
  227. ResultSet rs = null;
  228. try (PreparedStatement pst = statementProxy.getConnection().prepareStatement(selectSQL)) {
  229. SqlGenerateUtils.setParamForPk(beforeImage.pkRows(), getTableMeta().getPrimaryKeyOnlyName(), pst);
  230. rs = pst.executeQuery();
  231. return TableRecords.buildRecords(tmeta, rs);
  232. } finally {
  233. IOUtil.close(rs);
  234. }
  235. }
  236. }
  237. ```
  238. - Prepare undoLog
  239. ```
  240. public abstract class BaseTransactionalExecutor<T, S extends Statement> implements Executor<T> {
  241. protected void prepareUndoLog(TableRecords beforeImage, TableRecords afterImage) throws SQLException {
  242. if (beforeImage.getRows().isEmpty() && afterImage.getRows().isEmpty()) {
  243. return;
  244. }
  245. if (SQLType.UPDATE == sqlRecognizer.getSQLType()) {
  246. if (beforeImage.getRows().size() != afterImage.getRows().size()) {
  247. throw new ShouldNeverHappenException("Before image size is not equaled to after image size, probably because you updated the primary keys.");
  248. }
  249. }
  250. ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
  251. TableRecords lockKeyRecords = sqlRecognizer.getSQLType() == SQLType.DELETE ? beforeImage : afterImage;
  252. String lockKeys = buildLockKey(lockKeyRecords);
  253. if (null != lockKeys) {
  254. connectionProxy.appendLockKey(lockKeys);
  255. SQLUndoLog sqlUndoLog = buildUndoItem(beforeImage, afterImage);
  256. connectionProxy.appendUndoLog(sqlUndoLog); // 把undoLog存到connectionProxy中,具体怎么回事上面有提过
  257. }
  258. }
  259. }
  260. ```
  261. - If your sql is select for update, `SelectForUpdateExecutor` will be used (Seata proxies select for update), and the processing logic after proxy is as follows:
  262. - First execute select for update (obtain the database local lock)
  263. - If it is in `@GlobalTransactional` or `@GlobalLock`, **check** whether there is a global lock
  264. - If there is a global lock, and local transaction is not started, roll back the local transaction, then re-acquire the local lock and query the global lock until the global lock is released
  265. ```
  266. package io.seata.rm.datasource.exec;
  267. public class SelectForUpdateExecutor<T, S extends Statement> extends BaseTransactionalExecutor<T, S> {
  268. @Override
  269. public T doExecute(Object... args) throws Throwable {
  270. Connection conn = statementProxy.getConnection();
  271. DatabaseMetaData dbmd = conn.getMetaData();
  272. T rs;
  273. Savepoint sp = null;
  274. boolean originalAutoCommit = conn.getAutoCommit();
  275. try {
  276. if (originalAutoCommit) {
  277. /*
  278. * In order to hold the local db lock during global lock checking
  279. * set auto commit value to false first if original auto commit was true
  280. */
  281. conn.setAutoCommit(false);
  282. } else if (dbmd.supportsSavepoints()) {
  283. /*
  284. * In order to release the local db lock when global lock conflict
  285. * create a save point if original auto commit was false, then use the save point here to release db
  286. * lock during global lock checking if necessary
  287. */
  288. sp = conn.setSavepoint();
  289. } else {
  290. throw new SQLException("not support savepoint. please check your db version");
  291. }
  292. LockRetryController lockRetryController = new LockRetryController();
  293. ArrayList<List<Object>> paramAppenderList = new ArrayList<>();
  294. String selectPKSQL = buildSelectSQL(paramAppenderList);
  295. while (true) {
  296. try {
  297. // #870
  298. // execute return Boolean
  299. // executeQuery return ResultSet
  300. rs = statementCallback.execute(statementProxy.getTargetStatement(), args); // execute select for update (get database local lock)
  301. // Try to get global lock of those rows selected
  302. TableRecords selectPKRows = buildTableRecords(getTableMeta(), selectPKSQL, paramAppenderList);
  303. String lockKeys = buildLockKey(selectPKRows);
  304. if (StringUtils.isNullOrEmpty(lockKeys)) {
  305. break;
  306. }
  307. if (RootContext.inGlobalTransaction() || RootContext.requireGlobalLock()) {
  308. // Do the same thing under either @GlobalTransactional or @GlobalLock,
  309. // that only check the global lock here.
  310. statementProxy.getConnectionProxy().checkLock(lockKeys);
  311. } else {
  312. throw new RuntimeException("Unknown situation!");
  313. }
  314. break;
  315. } catch (LockConflictException lce) {
  316. if (sp != null) {
  317. conn.rollback(sp);
  318. } else {
  319. conn.rollback();// Roll back and release local lock
  320. }
  321. // trigger retry
  322. lockRetryController.sleep(lce);
  323. }
  324. }
  325. } finally {
  326. if (sp != null) {
  327. try {
  328. if (!JdbcConstants.ORACLE.equalsIgnoreCase(getDbType())) {
  329. conn.releaseSavepoint(sp);
  330. }
  331. } catch (SQLException e) {
  332. LOGGER.error("{} release save point error.", getDbType(), e);
  333. }
  334. }
  335. if (originalAutoCommit) {
  336. conn.setAutoCommit(true);
  337. }
  338. }
  339. return rs;
  340. }
  341. }
  342. ```

Processing Logic of ConnectionProxy.commit()

  1. public class ConnectionProxy extends AbstractConnectionProxy {
  2. private final static LockRetryPolicy LOCK_RETRY_POLICY = new LockRetryPolicy();
  3. private ConnectionContext context = new ConnectionContext();
  4. @Override
  5. public void commit() throws SQLException {
  6. try {
  7. LOCK_RETRY_POLICY.execute(() -> {
  8. doCommit();
  9. return null;
  10. });
  11. } catch (SQLException e) {
  12. if (targetConnection != null && !getAutoCommit() && !getContext().isAutoCommitChanged()) {
  13. rollback();
  14. }
  15. throw e;
  16. } catch (Exception e) {
  17. throw new SQLException(e);
  18. }
  19. }
  20. private void doCommit() throws SQLException {
  21. if (context.inGlobalTransaction()) {
  22. processGlobalTransactionCommit();
  23. } else if (context.isGlobalLockRequire()) {
  24. processLocalCommitWithGlobalLocks();
  25. } else {
  26. targetConnection.commit();
  27. }
  28. }
  29. }

Also, a question has been raised here, which will be explained later. How does ConnectionContext in ConnectionProxy determine inGlobalTransaction() or isGlobalLockRequire()?

  • In a global transaction (i.e., data persistence method with @GlobalTransactional)

    • Register branch transaction, acquire global lock
    • Store undo log data
    • Commit the transaction in the database

    ```

    1. public class ConnectionProxy extends AbstractConnectionProxy {
    2. private final static LockRetryPolicy LOCK_RETRY_POLICY = new LockRetryPolicy();
    3. private ConnectionContext context = new ConnectionContext();
    4. private void processGlobalTransactionCommit() throws SQLException {
    5. try {
    6. register(); // Register branch and contend for global lock
    7. } catch (TransactionException e) {
    8. recognizeLockKeyConflictException(e, context.buildLockKeys());
    9. }
    10. try {
    11. UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this); // Store undolog
    12. targetConnection.commit(); // Commit branch transaction
    13. } catch (Throwable ex) {
    14. LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
    15. report(false);
    16. throw new SQLException(ex);
    17. }
    18. if (IS_REPORT_SUCCESS_ENABLE) {
    19. report(true);
    20. }
    21. context.reset();
    22. }
    23. private void register() throws TransactionException {
    24. if (!context.hasUndoLog() || !context.hasLockKey()) {
    25. return;
    26. }
    27. Long branchId = DefaultResourceManager.get().branchRegister(BranchType.AT, getDataSourceProxy().getResourceId(),
    28. null, context.getXid(), null, context.buildLockKeys());
    29. context.setBranchId(branchId);
    30. }
  1. }
  2. ```
  • In @GlobalLock (i.e., data persistence method with @GlobalLock):

    • Query tc for the presence of global lock
    • Commit the transaction to the database

    ``` public class ConnectionProxy extends AbstractConnectionProxy {

    1. private final static LockRetryPolicy LOCK_RETRY_POLICY = new LockRetryPolicy();
    2. private ConnectionContext context = new ConnectionContext();
    3. private void processLocalCommitWithGlobalLocks() throws SQLException {
    4. checkLock(context.buildLockKeys());
    5. try {
    6. targetConnection.commit();
    7. } catch (Throwable ex) {
    8. throw new SQLException(ex);
    9. }
    10. context.reset();
    11. }
    12. public void checkLock(String lockKeys) throws SQLException {
    13. if (StringUtils.isBlank(lockKeys)) {
    14. return;
    15. }
    16. // Just check lock without requiring lock by now.
    17. try {
    18. boolean lockable = DefaultResourceManager.get().lockQuery(BranchType.AT,
    19. getDataSourceProxy().getResourceId(), context.getXid(), lockKeys);
    20. if (!lockable) {
    21. throw new LockConflictException();
    22. }
    23. } catch (TransactionException e) {
    24. recognizeLockKeyConflictException(e, lockKeys);
    25. }
    26. }
  1. }
  2. ```
  • Other than the above cases (the else branch)

    • Let the database commit the current transaction.

Introduction to RootContext

We left three “clues” above, now it’s time to answer them in conjunction with the RootContext source code.

  1. How could the return value of RootContext.getBranchType() be AT?
    The logic in this method is: as long as it is determined that the current transaction is in a global state (i.e., as long as RootContext.bind(xid) has been called somewhere), it will return the default BranchType.AT.

    1. public class RootContext {
    2. public static final String KEY_XID = "TX_XID";
    3. private static ContextCore CONTEXT_HOLDER = ContextCoreLoader.load();
    4. private static BranchType DEFAULT_BRANCH_TYPE;
    5. @Nullable
    6. public static BranchType getBranchType() {
    7. if (inGlobalTransaction()) {
    8. BranchType branchType = (BranchType) CONTEXT_HOLDER.get(KEY_BRANCH_TYPE);
    9. if (branchType != null) {
    10. return branchType;
    11. }
    12. //Returns the default branch type.
    13. return DEFAULT_BRANCH_TYPE != null ? DEFAULT_BRANCH_TYPE : BranchType.AT;
    14. }
    15. return null;
    16. }
    17. public static boolean inGlobalTransaction() {
    18. return CONTEXT_HOLDER.get(KEY_XID) != null;
    19. }
    20. public static void bind(@Nonnull String xid) {
    21. if (StringUtils.isBlank(xid)) {
    22. if (LOGGER.isDebugEnabled()) {
    23. LOGGER.debug("xid is blank, switch to unbind operation!");
    24. }
    25. unbind();
    26. } else {
    27. MDC.put(MDC_KEY_XID, xid);
    28. if (LOGGER.isDebugEnabled()) {
    29. LOGGER.debug("bind {}", xid);
    30. }
    31. CONTEXT_HOLDER.put(KEY_XID, xid);
    32. }
    33. }
    34. }
  2. How to determine whether RootContext.requireGlobalLock() needs a global lock? Somewhere needs to call RootContext.bindGlobalLockFlag()

    1. public class RootContext {
    2. public static final String KEY_GLOBAL_LOCK_FLAG = "TX_LOCK";
    3. public static final Boolean VALUE_GLOBAL_LOCK_FLAG = true;
    4. private static ContextCore CONTEXT_HOLDER = ContextCoreLoader.load();
    5. public static boolean requireGlobalLock() {
    6. return CONTEXT_HOLDER.get(KEY_GLOBAL_LOCK_FLAG) != null;
    7. }
    8. public static void bindGlobalLockFlag() {
    9. if (LOGGER.isDebugEnabled()) {
    10. LOGGER.debug("Local Transaction Global Lock support enabled");
    11. }
    12. //just put something not null
    13. CONTEXT_HOLDER.put(KEY_GLOBAL_LOCK_FLAG, VALUE_GLOBAL_LOCK_FLAG);
    14. }
    15. }
  3. How does ConnectionProxy.commit() distinguish between different states based on the context, and how does ConnectionContext determine inGlobalTransaction() or isGlobalLockRequire()?

    1. public class ConnectionProxy extends AbstractConnectionProxy {
    2. private ConnectionContext context = new ConnectionContext();
    3. private void doCommit() throws SQLException {
    4. if (context.inGlobalTransaction()) {
    5. processGlobalTransactionCommit();
    6. } else if (context.isGlobalLockRequire()) {
    7. processLocalCommitWithGlobalLocks();
    8. } else {
    9. targetConnection.commit();
    10. }
    11. }
    12. }
    • How is inGlobalTransaction() determined? (Note that this is different from the mentioned RootContext above)

      1. public class ConnectionContext {
      2. private String xid;
      3. void setXid(String xid) {
      4. this.xid = xid;
      5. }
      6. public boolean inGlobalTransaction() {
      7. return xid != null;
      8. }
      9. void bind(String xid) {
      10. if (xid == null) {
      11. throw new IllegalArgumentException("xid should not be null");
      12. }
      13. if (!inGlobalTransaction()) {
      14. setXid(xid);
      15. } else {
      16. if (!this.xid.equals(xid)) {
      17. throw new ShouldNeverHappenException();
      18. }
      19. }
      20. }
      21. }

      Where is ConnectionContext.bind(xid) called?

      1. package io.seata.rm.datasource.exec;
      2. public abstract class BaseTransactionalExecutor<T, S extends Statement> implements Executor<T> {
      3. @Override
      4. public T execute(Object... args) throws Throwable {
      5. // So, where does the XID come from here? Look ahead and you will know that it comes from when the global transaction is opened, and is related to @GlobalTransactional
      6. String xid = RootContext.getXID();
      7. if (xid != null) {
      8. statementProxy.getConnectionProxy().bind(xid);
      9. }
      10. // This is the position to set isGlobalLockRequire, related to @GlobalLock
      11. statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock());
      12. return doExecute(args);
      13. }
      14. }
      1. public class ConnectionProxy extends AbstractConnectionProxy {
      2. private ConnectionContext context = new ConnectionContext();
      3. public void bind(String xid) {
      4. context.bind(xid);
      5. }
      6. public void setGlobalLockRequire(boolean isLock) {
      7. context.setGlobalLockRequire(isLock);
      8. }
      9. }
    • How to determine isGlobalLockRequire()?

      1. public class ConnectionContext {
      2. private boolean isGlobalLockRequire;
      3. boolean isGlobalLockRequire() {
      4. return isGlobalLockRequire;
      5. }
      6. void setGlobalLockRequire(boolean isGlobalLockRequire) {
      7. this.isGlobalLockRequire = isGlobalLockRequire;
      8. }
      9. }

    After looking at the code, we know that as long as somewhere in RootContext sets xid, or bindGlobalLockFlag(), it will be recognized as a different state. So where is it called? The answer is in the GlobalTransactionalInterceptor below.

GlobalTransactionalInterceptor handles methods with @GlobalTransactional or @GlobalLock

Methods with @GlobalTransactional and @GlobalLock will be proxied and handled by GlobalTransactionalInterceptor

  1. public class GlobalTransactionalInterceptor implements ConfigurationChangeListener, MethodInterceptor {
  2. @Override
  3. public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
  4. Class<?> targetClass =
  5. methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;
  6. Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
  7. if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {
  8. final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
  9. final GlobalTransactional globalTransactionalAnnotation =
  10. getAnnotation(method, targetClass, GlobalTransactional.class);
  11. final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);
  12. boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes);
  13. if (!localDisable) {
  14. if (globalTransactionalAnnotation != null) {
  15. return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);// Handle @GlobalTransactional
  16. } else if (globalLockAnnotation != null) {
  17. return handleGlobalLock(methodInvocation, globalLockAnnotation); // Handle @GlobalLock
  18. }
  19. }
  20. }
  21. return methodInvocation.proceed();
  22. }
  23. }

First, handle @GlobalTransactional

  1. public class GlobalTransactionalInterceptor implements ConfigurationChangeListener, MethodInterceptor {
  2. private final TransactionalTemplate transactionalTemplate = new TransactionalTemplate();
  3. Object handleGlobalTransaction(final MethodInvocation methodInvocation,
  4. final GlobalTransactional globalTrxAnno) throws Throwable {
  5. //...
  6. try {
  7. return transactionalTemplate.execute(...);
  8. } catch (TransactionalExecutor.ExecutionException e) {
  9. // ...
  10. } finally {
  11. //...
  12. }
  13. }
  14. }

We have arrived at the classic seata transaction template method, and we need to focus on the part where the transaction is initiated.

  1. public class TransactionalTemplate {
  2. public Object execute(TransactionalExecutor business) throws Throwable {
  3. // 1. Get transactionInfo
  4. //...
  5. // 1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'.
  6. GlobalTransaction tx = GlobalTransactionContext.getCurrent();
  7. // 1.2 Handle the transaction propagation.
  8. // ...
  9. // 1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'.
  10. if (tx == null) {
  11. tx = GlobalTransactionContext.createNew();
  12. }
  13. //...
  14. try {
  15. // 2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,
  16. // else do nothing. Of course, the hooks will still be triggered.
  17. beginTransaction(txInfo, tx);
  18. Object rs;
  19. try {
  20. // Do Your Business
  21. rs = business.execute();
  22. } catch (Throwable ex) {
  23. // 3. The needed business exception to rollback.
  24. completeTransactionAfterThrowing(txInfo, tx, ex);
  25. throw ex;
  26. }
  27. // 4. everything is fine, commit.
  28. commitTransaction(tx);
  29. return rs;
  30. } finally {
  31. //5. clear
  32. //...
  33. }
  34. } finally {
  35. // If the transaction is suspended, resume it.
  36. // ...
  37. }
  38. }
  39. private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
  40. try {
  41. triggerBeforeBegin();
  42. tx.begin(txInfo.getTimeOut(), txInfo.getName());
  43. triggerAfterBegin();
  44. } catch (TransactionException txe) {
  45. throw new TransactionalExecutor.ExecutionException(tx, txe,
  46. TransactionalExecutor.Code.BeginFailure);
  47. }
  48. }
  49. }
  1. public class DefaultGlobalTransaction implements GlobalTransaction {
  2. @Override
  3. public void begin(int timeout, String name) throws TransactionException {
  4. if (role != GlobalTransactionRole.Launcher) {
  5. assertXIDNotNull();
  6. if (LOGGER.isDebugEnabled()) {
  7. LOGGER.debug("Ignore Begin(): just involved in global transaction [{}]", xid);
  8. }
  9. return;
  10. }
  11. assertXIDNull();
  12. String currentXid = RootContext.getXID();
  13. if (currentXid != null) {
  14. throw new IllegalStateException("Global transaction already exists," +
  15. " can't begin a new global transaction, currentXid = " + currentXid);
  16. }
  17. xid = transactionManager.begin(null, null, name, timeout);
  18. status = GlobalStatus.Begin;
  19. RootContext.bind(xid); // Bind xid
  20. if (LOGGER.isInfoEnabled()) {
  21. LOGGER.info("Begin new global transaction [{}]", xid);
  22. }
  23. }
  24. }

See RootContext.bind(xid);

Continue to handle @GlobalLock

  1. public class GlobalTransactionalInterceptor implements ConfigurationChangeListener, MethodInterceptor {
  2. private final GlobalLockTemplate globalLockTemplate = new GlobalLockTemplate();
  3. Object handleGlobalLock(final MethodInvocation methodInvocation,
  4. final GlobalLock globalLockAnno) throws Throwable {
  5. return globalLockTemplate.execute(new GlobalLockExecutor() {...});
  6. }
  7. }

Also using the template method to handle GlobalLock

  1. public class GlobalLockTemplate {
  2. public Object execute(GlobalLockExecutor executor) throws Throwable {
  3. boolean alreadyInGlobalLock = RootContext.requireGlobalLock();
  4. if (!alreadyInGlobalLock) {
  5. RootContext.bindGlobalLockFlag();
  6. }
  7. // set my config to config holder so that it can be access in further execution
  8. // for example, LockRetryController can access it with config holder
  9. GlobalLockConfig myConfig = executor.getGlobalLockConfig();
  10. GlobalLockConfig previousConfig = GlobalLockConfigHolder.setAndReturnPrevious(myConfig);
  11. try {
  12. return executor.execute();
  13. } finally {
  14. // only unbind when this is the root caller.
  15. // otherwise, the outer caller would lose global lock flag
  16. if (!alreadyInGlobalLock) {
  17. RootContext.unbindGlobalLockFlag();
  18. }
  19. // if previous config is not null, we need to set it back
  20. // so that the outer logic can still use their config
  21. if (previousConfig != null) {
  22. GlobalLockConfigHolder.setAndReturnPrevious(previousConfig);
  23. } else {
  24. GlobalLockConfigHolder.remove();
  25. }
  26. }
  27. }
  28. }

See, as soon as it enters the template method, RootContext.bindGlobalLockFlag();