微服务框架支持

事务上下文

Seata 的事务上下文由 RootContext 来管理。

应用开启一个全局事务后,RootContext 会自动绑定该事务的 XID,事务结束(提交或回滚完成),RootContext 会自动解绑 XID。

  1. // 绑定 XID
  2. RootContext.bind(xid);
  3. // 解绑 XID
  4. String xid = RootContext.unbind();

应用可以通过 RootContext 的 API 接口来获取当前运行时的全局事务 XID。

  1. // 获取 XID
  2. String xid = RootContext.getXID();

应用是否运行在一个全局事务的上下文中,就是通过 RootContext 是否绑定 XID 来判定的。

  1. public static boolean inGlobalTransaction() {
  2. return CONTEXT_HOLDER.get(KEY_XID) != null;
  3. }

事务传播

Seata 全局事务的传播机制就是指事务上下文的传播,根本上,就是 XID 的应用运行时的传播方式。

1. 服务内部的事务传播

默认的,RootContext 的实现是基于 ThreadLocal 的,即 XID 绑定在当前线程上下文中。

  1. public class ThreadLocalContextCore implements ContextCore {
  2. private ThreadLocal<Map<String, String>> threadLocal = new ThreadLocal<Map<String, String>>() {
  3. @Override
  4. protected Map<String, String> initialValue() {
  5. return new HashMap<String, String>();
  6. }
  7. };
  8. @Override
  9. public String put(String key, String value) {
  10. return threadLocal.get().put(key, value);
  11. }
  12. @Override
  13. public String get(String key) {
  14. return threadLocal.get().get(key);
  15. }
  16. @Override
  17. public String remove(String key) {
  18. return threadLocal.get().remove(key);
  19. }
  20. }

所以服务内部的 XID 传播通常是天然的通过同一个线程的调用链路串连起来的。默认不做任何处理,事务的上下文就是传播下去的。

如果希望挂起事务上下文,则需要通过 RootContext 提供的 API 来实现:

  1. // 挂起(暂停)
  2. String xid = RootContext.unbind();
  3. // TODO: 运行在全局事务外的业务逻辑
  4. // 恢复全局事务上下文
  5. RootContext.bind(xid);

2. 跨服务调用的事务传播

通过上述基本原理,我们可以很容易理解:

跨服务调用场景下的事务传播,本质上就是要把 XID 通过服务调用传递到服务提供方,并绑定到 RootContext 中去。

只要能做到这点,理论上 Seata 可以支持任意的微服务框架。

对 Dubbo 支持的解读

下面,我们通过内置的对 Dubbo RPC 支持机制的解读,来说明 Seata 在实现对一个特定微服务框架支持的机制。

对 Dubbo 的支持,我们利用了 Dubbo 框架的 org.apache.dubbo.rpc.Filter 机制。

  1. /**
  2. * The type Transaction propagation filter.
  3. */
  4. @Activate(group = { Constants.PROVIDER, Constants.CONSUMER }, order = 100)
  5. public class TransactionPropagationFilter implements Filter {
  6. private static final Logger LOGGER = LoggerFactory.getLogger(TransactionPropagationFilter.class);
  7. @Override
  8. public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
  9. String xid = RootContext.getXID(); // 获取当前事务 XID
  10. String rpcXid = RpcContext.getContext().getAttachment(RootContext.KEY_XID); // 获取 RPC 调用传递过来的 XID
  11. if (LOGGER.isDebugEnabled()) {
  12. LOGGER.debug("xid in RootContext[" + xid + "] xid in RpcContext[" + rpcXid + "]");
  13. }
  14. boolean bind = false;
  15. if (xid != null) { // Consumer:把 XID 置入 RPC 的 attachment 中
  16. RpcContext.getContext().setAttachment(RootContext.KEY_XID, xid);
  17. } else {
  18. if (rpcXid != null) { // Provider:把 RPC 调用传递来的 XID 绑定到当前运行时
  19. RootContext.bind(rpcXid);
  20. bind = true;
  21. if (LOGGER.isDebugEnabled()) {
  22. LOGGER.debug("bind[" + rpcXid + "] to RootContext");
  23. }
  24. }
  25. }
  26. try {
  27. return invoker.invoke(invocation); // 业务方法的调用
  28. } finally {
  29. if (bind) { // Provider:调用完成后,对 XID 的清理
  30. String unbindXid = RootContext.unbind();
  31. if (LOGGER.isDebugEnabled()) {
  32. LOGGER.debug("unbind[" + unbindXid + "] from RootContext");
  33. }
  34. if (!rpcXid.equalsIgnoreCase(unbindXid)) {
  35. LOGGER.warn("xid in change during RPC from " + rpcXid + " to " + unbindXid);
  36. if (unbindXid != null) { // 调用过程有新的事务上下文开启,则不能清除
  37. RootContext.bind(unbindXid);
  38. LOGGER.warn("bind [" + unbindXid + "] back to RootContext");
  39. }
  40. }
  41. }
  42. }
  43. }
  44. }