ElasticJob-Lite 提供自定义的 Spring Boot Starter,可以与 Spring Boot 配合使用。 基于 ElasticJob Spring Boot Starter 使用 ElasticJob ,用户无需手动创建 CoordinatorRegistryCenter、JobBootstrap 等实例, 只需实现核心作业逻辑并辅以少量配置,即可利用轻量、无中心化的 ElasticJob 解决分布式调度问题。

作业配置

实现作业逻辑

作业逻辑实现与 ElasticJob 的其他使用方式并没有较大的区别,只需将当前作业注册为 Spring 容器中的 bean。

线程安全问题

Bean 默认是单例的,如果该作业实现会在同一个进程内被创建出多个 JobBootstrap 的实例, 可以考虑设置 Scope 为 prototype

  1. @Component
  2. public class SpringBootDataflowJob implements DataflowJob<Foo> {
  3. @Override
  4. public List<Foo> fetchData(final ShardingContext shardingContext) {
  5. // 获取数据
  6. }
  7. @Override
  8. public void processData(final ShardingContext shardingContext, final List<Foo> data) {
  9. // 处理数据
  10. }
  11. }

配置协调服务与作业

在配置文件中指定 ElasticJob 所使用的 Zookeeper。配置前缀为 elasticjob.reg-center

elasticjob.jobs 是一个 Map,key 为作业名称,value 为作业类型与配置。 Starter 会根据该配置自动创建 OneOffJobBootstrapScheduleJobBootstrap 的实例并注册到 Spring 容器中。

配置参考:

  1. elasticjob:
  2. regCenter:
  3. serverLists: localhost:6181
  4. namespace: elasticjob-lite-springboot
  5. jobs:
  6. dataflowJob:
  7. elasticJobClass: org.apache.shardingsphere.elasticjob.dataflow.job.DataflowJob
  8. cron: 0/5 * * * * ?
  9. shardingTotalCount: 3
  10. shardingItemParameters: 0=Beijing,1=Shanghai,2=Guangzhou
  11. scriptJob:
  12. elasticJobType: SCRIPT
  13. cron: 0/10 * * * * ?
  14. shardingTotalCount: 3
  15. props:
  16. script.command.line: "echo SCRIPT Job: "

作业启动

定时调度

定时调度作业在 Spring Boot 应用程序启动完成后会自动启动,无需其他额外操作。

一次性调度

一次性调度的作业的执行权在开发者手中,开发者可以在需要调用作业的位置注入 OneOffJobBootstrap, 通过 execute() 方法执行作业。

OneOffJobBootstrap bean 的名称通过属性 jobBootstrapBeanName 配置,注入时需要指定依赖的 bean 名称。 具体配置请参考配置文档

  1. elasticjob:
  2. jobs:
  3. myOneOffJob:
  4. jobBootstrapBeanName: myOneOffJobBean
  5. ....
  1. @RestController
  2. public class OneOffJobController {
  3. // 通过 "@Resource" 注入
  4. @Resource(name = "myOneOffJobBean")
  5. private OneOffJobBootstrap myOneOffJob;
  6. @GetMapping("/execute")
  7. public String executeOneOffJob() {
  8. myOneOffJob.execute();
  9. return "{\"msg\":\"OK\"}";
  10. }
  11. // 通过 "@Autowired" 注入
  12. @Autowired
  13. @Qualifier(name = "myOneOffJobBean")
  14. private OneOffJobBootstrap myOneOffJob2;
  15. @GetMapping("/execute2")
  16. public String executeOneOffJob2() {
  17. myOneOffJob2.execute();
  18. return "{\"msg\":\"OK\"}";
  19. }
  20. }

配置错误处理策略

使用 ElasticJob-Lite 过程中当作业发生异常后,可采用以下错误处理策略。

错误处理策略名称说明是否内置是否默认是否需要额外配置
记录日志策略记录作业异常日志,但不中断作业执行
抛出异常策略抛出系统异常并中断作业执行
忽略异常策略忽略系统异常且不中断作业执行
邮件通知策略发送邮件消息通知,但不中断作业执行
企业微信通知策略发送企业微信消息通知,但不中断作业执行
钉钉通知策略发送钉钉消息通知,但不中断作业执行

记录日志策略

  1. elasticjob:
  2. regCenter:
  3. ...
  4. jobs:
  5. ...
  6. jobErrorHandlerType: LOG

抛出异常策略

  1. elasticjob:
  2. regCenter:
  3. ...
  4. jobs:
  5. ...
  6. jobErrorHandlerType: THROW

忽略异常策略

  1. elasticjob:
  2. regCenter:
  3. ...
  4. jobs:
  5. ...
  6. jobErrorHandlerType: IGNORE

邮件通知策略

请参考 这里 了解更多。

Maven POM:

  1. <dependency>
  2. <groupId>org.apache.shardingsphere.elasticjob</groupId>
  3. <artifactId>elasticjob-error-handler-email</artifactId>
  4. <version>${latest.release.version}</version>
  5. </dependency>
  1. elasticjob:
  2. regCenter:
  3. ...
  4. jobs:
  5. ...
  6. jobErrorHandlerType: EMAIL
  7. props:
  8. email:
  9. host: host
  10. port: 465
  11. username: username
  12. password: password
  13. useSsl: true
  14. subject: ElasticJob error message
  15. from: from@xxx.xx
  16. to: to1@xxx.xx,to2@xxx.xx
  17. cc: cc@xxx.xx
  18. bcc: bcc@xxx.xx
  19. debug: false

企业微信通知策略

请参考 这里 了解更多。

Maven POM:

  1. <dependency>
  2. <groupId>org.apache.shardingsphere.elasticjob</groupId>
  3. <artifactId>elasticjob-error-handler-wechat</artifactId>
  4. <version>${latest.release.version}</version>
  5. </dependency>
  1. elasticjob:
  2. regCenter:
  3. ...
  4. jobs:
  5. ...
  6. jobErrorHandlerType: WECHAT
  7. props:
  8. wechat:
  9. webhook: you_webhook
  10. connectTimeout: 3000
  11. readTimeout: 5000

钉钉通知策略

请参考 这里 了解更多。

Maven POM:

  1. <dependency>
  2. <groupId>org.apache.shardingsphere.elasticjob</groupId>
  3. <artifactId>elasticjob-error-handler-dingtalk</artifactId>
  4. <version>${latest.release.version}</version>
  5. </dependency>
  1. elasticjob:
  2. regCenter:
  3. ...
  4. jobs:
  5. ...
  6. jobErrorHandlerType: DINGTALK
  7. props:
  8. dingtalk:
  9. webhook: you_webhook
  10. keyword: you_keyword
  11. secret: you_secret
  12. connectTimeout: 3000
  13. readTimeout: 5000