前置条件

在开始使用Saga之前,要确保服务的choerodon-starters-asgard依赖在0.6.3.RELEASE版本及之上, 推荐最新版0.12.0.RELEASE。同时需要对Saga 有一定的了解。可以参考Choerodon猪齿鱼平台中的微服务数据一致性解决方案

介绍

Saga 是分布式系统中数据一致性的一种解决方案。本章介绍了如何使用Choerodon 的Saga。并包含如下的内容:

  • 添加依赖
  • 注解
  • 开启Saga
  • 消费者
  • Asgard 服务

添加依赖

在本地服务的pom.xml 中添加如下的依赖。

  1. <dependency>
  2. <groupId>io.choerodon</groupId>
  3. <artifactId>choerodon-starter-asgard</artifactId>
  4. <version>${choerodon.starters.version}</version>
  5. </dependency>

消费者端配置

  1. choerodon:
  2. saga:
  3. consumer:
  4. enabled: false # 启动消费端
  5. thread-num: 2 # saga消息消费线程池大小
  6. max-poll-size: 200 # 每次拉取消息最大数量
  7. poll-interval-ms: 1000 # 拉取间隔,默认1000毫秒

注解

choerodon-starter-asgard 中,包含有两个注解。

  • @Saga:定义一个类似于Kafkatopic。任务会被添加@SagaTask注解订阅。
  • @SagaTask:订阅对应的@Saga 的任务。首先,请确保注解所在类可以被spring扫描到

@Saga

在方法或者类上添加@Saga 注解。

  1. @Saga(code = "asgard-create-user", description = "创建用户", inputSchemaClass = AsgardUser.class)
字段作用
code任务通过@SagaTask订阅,对应@SagaTasksagaCode
description描述信息
inputSchemaSaga输入的参数示例。例如{"name":"string", "age":0}。会覆盖inputSchemaClass自动生成
inputSchemaClass指定class自动生成。比如指定User将自动生成{"id":0,"username":"string","password":"string"}

@SagaTask

在方法上添加@SagaTask注解,@SagaTask本身封装了事务,无需再使用@Transacional声明事务

  1. @SagaTask(code = "devopsCreateUser",
  2. description = "devops创建用户",
  3. sagaCode = "asgard-create-user",
  4. concurrentLimitNum = 2,
  5. concurrentLimitPolicy = SagaDefinition.ConcurrentLimitPolicy.NONE,
  6. seq = 2)
字段作用
codetaskcode,同一个sagaCode下的taskCode需要唯一
sagaCode对应@Sagacode,表示订阅该Saga
seq执行顺序,同一个Saga下的task将按照seq顺序一次消费,值越小消费顺序越高
enabledDbRecord是否在数据库中记录消息消费,默认
description描述
maxRetryCount最大自动重试次数,默认次数为1
concurrentLimitPolicy并发策略,默认为NONE
concurrentLimitNum并发数,当concurrentLimitPolicy不为NONE时生效
timeoutSeconds超时时间
timeoutPolicy超时策略,默认为重试
outputSchemaClass默认将@SagaTask的返回类型生成输出,也可通过此属性指定
outputSchema通过json字符串手动指定输出参数。比如{“name”:“wang”,“age”:23}
transactionIsolation事务的隔离级别
transactionManager使用的事务管理器

在一个Saga 定义中。上一个SagaTask的输出是下一个的输入,当seq相同时,则并行执行,并行的任务输出的结果json进行一个合并,作为下一个次序的输入。

并发策略,默认为NONETYPE根据sagaClient.startSaga时的refType设置并发,TYPE_AND_ID根据refTyperef_id设置并发,并发数为concurrentLimitNum。一个服务将@SagaTask注解删除,asgard服务也会同步删除该SagaTask

开启Saga

Saga 被定义好之后,可以通过服务自身,启动一个Saga 实例。

启动Saga实例

通过Feign启动一个Saga(过时)

  • 注入SagaClient,通过feign调用saga
  • 将业务代码和sagaClient.startSaga()放在一个事务中。
  • 当不需要消费端消费该Saga实例时,添加choerodon.saga.consumer.enabled: false配置,这样不会创建消费端拉取消息和消息消费的bean和线程。
  • feign字段
    • sagaCode: 要启用的sagacode字段,对应@Saga里的code
    • StartInstanceDTO: DTO
      • input: 输入的json数据
      • userId: 方便追踪用户。DetailsHelper.getUserDetails().getUserId()传入
      • refType: 关联业务类型,比如projectuser这些。非必须,该字段用于并发策略
      • refId: 关联业务类型,比如projectIduserId这些。非必须,该字段用于并发策略
      • level: 层级。取值”site”、”organization”、”project”、”user”。
      • sourceId: 资源id。比如项目层就为项目id,设置了level和sourceId之后,就可以对应的层级下看到事务实例信息,否则默认全局层,只能在全局层页面看到该事务运行实例信息。请确保@EnableFeignClients包含io.choerodon.asgard.saga,否则扫描不到该feignClient。例如:@EnableFeignClients("io.choerodon")

例如创建一个用户时,启动一个Saga:

  1. @Transactional
  2. public AsgardUser createUser(@Valid @RequestBody AsgardUser user) {
  3. // 业务代码
  4. sagaClient.startSaga("asgard-create-user", new StartInstanceDTO(input, "", ""));
  5. }

通过TransactionalProducer启动一个Saga

  • 注入TransactionalProducer,通过StartSagaBuilder启动一个Saga
  • 将业务代码和producer.applyAndReturn()放在一个事务中。
  • 当不需要消费端消费该Saga实例时,添加choerodon.saga.consumer.enabled: false配置,这样不会创建消费端拉取消息和消息消费的bean和线程。
  • 方法
    • withSagaCode(String sagaCode): 传入要启用的sagacode字段,对应@Saga里的code
    • withLevel(ResourceLevel level): 传入层级。取值”site”、”organization”、”project”、”user”
    • withRefType(String refType): 传入关联业务类型,该字段用于并发策略
    • withRefId(String refId): 传入关联业务Id。非必须,该字段用于并发策略
    • withPayloadAndSerialize(Object payload): 传入输入对象例如创建一个用户时,启动一个Saga
  1. producer.applyAndReturn(
  2. StartSagaBuilder
  3. .newBuilder()
  4. .withLevel(ResourceLevel.ORGANIZATION)
  5. .withRefType("organization")
  6. .withSagaCode("asgard-create-user"),
  7. builder -> {
  8. asgardService.createuser(user);
  9. builder
  10. .withPayloadAndSerialize(sagaPayload)
  11. .withRefId(String.valueOf(orgId))
  12. .withSourceId(orgId);
  13. return sagaPayload;
  14. });

同时在代码中添加如下处理逻辑:

  1. @SagaTask(code = "devopsCreateUser",
  2. description = "devops创建用户",
  3. sagaCode = "asgard-create-user",
  4. concurrentLimitNum = 2,
  5. concurrentLimitPolicy = SagaDefinition.ConcurrentLimitPolicy.NONE,
  6. seq = 2)
  7. public DevopsUser devopsCreateUser(String data) throws IOException {
  8. AsgardUser asgardUser = objectMapper.readValue(data, AsgardUser.class);
  9. LOGGER.info("===== asgardUser {}", asgardUser);
  10. DevopsUser devopsUser = new DevopsUser();
  11. devopsUser.setId(asgardUser.getId());
  12. devopsUser.setGroup("test");
  13. LOGGER.info("===== devopsCreateUser {}", devopsUser);
  14. return devopsUser;
  15. }

方法返回值为该任务的输出,本次sagaTask的输出是下一个sagaTask的输入。

里面执行封装了事务,不需要再加事务,如果需要加外部事务,可通过@SagaTasktransactionDefinition设置事务传播行为。

输出合并

同一个Saga下的多个SagaTaskseq相同,则并行执行。这多个SagaTask的输出进行合并后,成为下个SagaTask的输入。

合并操作如下:Saga1codecode1Saga2codecode2,如果输出结果完全相同,则合并结果为1或者2的输出。

Saga1输出Saga2输出合并结果
{"name":"23"}null{"name":"23"}
{"name":"23"}{"name":"23333"}{"name":"23333"}结果被最后一个覆盖
{"name":"23"}{"age":23}{"name":"23333","age":23}
[{"id":1},{"id":2}]{"age":23}{"code1":[{"id":1},{"id":2}],"age":23}
falsenull{“code1”:false}
"test"23{"code1":"test","code2":23}
"test""23"{"code1":"test","code2":"23"}

如下:如果这次的输出和输入一样,直接将接收数据返回即可。

  1. @SagaTask(code = "test", sagaCode = "iam-create-project", seq = 1)
  2. public String iamCreateUser(String data) {
  3. return data;
  4. }

这样默认根据方法返回值即String生成的outputChema是错误的,最好手动指定,即:

  1. @SagaTask(code = "test", sagaCode = "iam-create-project", seq = 1, outputSchemaClass = AsgardUser.class)
  2. public String iamCreateUser(String data) {
  3. return data;
  4. }

或者指定正确的返回值

  1. @SagaTask(code = "test", sagaCode = "iam-create-project", seq = 1)
  2. public AsgardUser iamCreateUser(String data) {
  3. AsgardUser asgardUser = objectMapper.readValue(data, AsgardUser.class);
  4. return asgardUser;
  5. }

消费端模型

一个定时任务线程定时拉取消息,拉取的消息放到一个线程安全的set里,再由消息消费线程池异步消费,每消费完成(无论成功还是失败)set从中删除,直到set为空再进行下一次拉取消费。

消费端事务

  • @SagaTask注解的方法封装了事务,有如下事务属性可配置。
    字段作用
    transactionIsolation事务的隔离级别
    transactionManager使用的事务管理器

`

  • 如果@SagaTask方法里面自己又添加了事务,则形成嵌套事务,自己添加的事务设置合适的事务传播行为即可。

  • @SagaTask的方法执行遇到任何异常都会回滚事务,如果无需回滚,则手动捕获该异常即可,如下:

  1. @SagaTask(code = "book-tour-hotel",
  2. description = "预定酒店",
  3. sagaCode = "book-tour-package",
  4. concurrentLimitNum = 2,
  5. seq = 5)
  6. public TourDTO bookHotel(String data) throws IOException {
  7. TourDTO tour = mapper.readValue(data, TourDTO.class);
  8. TourHotel hotel = new TourHotel();
  9. hotel.setUserId(tour.getUserId());
  10. hotel.setTourId(tour.getTourId());
  11. if (tourHotelMapper.insert(hotel) != 1) {
  12. throw new CommonException("error.tour.bookHotel");
  13. }
  14. tour.setHotelId(hotel.getId());
  15. //比如该feign做一些清理,成功与否无关紧要,则可以手动捕获该异常。
  16. try {
  17. XXXFeign.cleanup(tour.getUserId());
  18. } catch (Exception e) {
  19. LOGGER
  20. }
  21. return tour;
  22. }
  • @SagaTask的方法里含有feign调用, 最好能保证feign调用的”幂等性”

Asgard 服务

在北欧神话中,阿斯加德(古诺斯语:Ásgarðr,英语:Asgard)是神的领域,亦可称作阿萨神域。在Choerodon 中,我们用Asgard。来管理choerodon 中所有的分布式事务。

asgard-service 启动后,会主动拉取@Saga@SagaTask的注解配置。

  • 不存在则插入

  • 存在则更新

  • 原本存在后来删除注解,SagaTask会删除,Saga不做处理。

为了防止消费端多实例拉取出现消费,对每条消息设置一个实例锁,锁为·sagaCode + taskCode`。

  • 当消息实例锁为空时,消费端拉取该条消息并更新实例锁,更新成功,则拉取可以成功

  • 当消息实例锁不为空时,查询消息实例是否为拉取的消费端实例,是则允许拉取不是则不允许拉取该条消息。

更多有关的信息可以从asgard-service获取。