作业配置

ElasticJob-Lite 采用构建器模式创建作业配置对象。 代码示例如下:

  1. JobConfiguration jobConfig = JobConfiguration.newBuilder("myJob", 3).cron("0/5 * * * * ?").shardingItemParameters("0=Beijing,1=Shanghai,2=Guangzhou").build();

作业启动

ElasticJob-Lite 调度器分为定时调度和一次性调度两种类型。 每种调度器启动时均需要注册中心配置、作业对象(或作业类型)以及作业配置这 3 个参数。

定时调度

  1. public class JobDemo {
  2. public static void main(String[] args) {
  3. // 调度基于 class 类型的作业
  4. new ScheduleJobBootstrap(createRegistryCenter(), new MyJob(), createJobConfiguration()).schedule();
  5. // 调度基于 type 类型的作业
  6. new ScheduleJobBootstrap(createRegistryCenter(), "MY_TYPE", createJobConfiguration()).schedule();
  7. }
  8. private static CoordinatorRegistryCenter createRegistryCenter() {
  9. CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration("zk_host:2181", "elastic-job-demo"));
  10. regCenter.init();
  11. return regCenter;
  12. }
  13. private static JobConfiguration createJobConfiguration() {
  14. // 创建作业配置
  15. ...
  16. }
  17. }

一次性调度

  1. public class JobDemo {
  2. public static void main(String[] args) {
  3. OneOffJobBootstrap jobBootstrap = new OneOffJobBootstrap(createRegistryCenter(), new MyJob(), createJobConfiguration());
  4. // 可多次调用一次性调度
  5. jobBootstrap.execute();
  6. jobBootstrap.execute();
  7. jobBootstrap.execute();
  8. }
  9. private static CoordinatorRegistryCenter createRegistryCenter() {
  10. CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration("zk_host:2181", "elastic-job-demo"));
  11. regCenter.init();
  12. return regCenter;
  13. }
  14. private static JobConfiguration createJobConfiguration() {
  15. // 创建作业配置
  16. ...
  17. }
  18. }

配置作业导出端口

使用 ElasticJob-Lite 过程中可能会碰到一些分布式问题,导致作业运行不稳定。

由于无法在生产环境调试,通过 dump 命令可以把作业内部相关信息导出,方便开发者调试分析;

导出命令的使用请参见运维指南

以下示例用于展示如何通过 SnapshotService 开启用于导出命令的监听端口。

  1. public class JobMain {
  2. public static void main(final String[] args) {
  3. SnapshotService snapshotService = new SnapshotService(regCenter, 9888).listen();
  4. }
  5. private static CoordinatorRegistryCenter createRegistryCenter() {
  6. // 创建注册中心
  7. }
  8. }

配置错误处理策略

使用 ElasticJob-Lite 过程中当作业发生异常后,可采用以下错误处理策略。

错误处理策略名称说明是否内置是否默认是否需要额外配置
记录日志策略记录作业异常日志,但不中断作业执行
抛出异常策略抛出系统异常并中断作业执行
忽略异常策略忽略系统异常且不中断作业执行
邮件通知策略发送邮件消息通知,但不中断作业执行
企业微信通知策略发送企业微信消息通知,但不中断作业执行
钉钉通知策略发送钉钉消息通知,但不中断作业执行

记录日志策略

  1. public class JobDemo {
  2. public static void main(String[] args) {
  3. // 定时调度作业
  4. new ScheduleJobBootstrap(createRegistryCenter(), new MyJob(), createScheduleJobConfiguration()).schedule();
  5. // 一次性调度作业
  6. new OneOffJobBootstrap(createRegistryCenter(), new MyJob(), createOneOffJobConfiguration()).execute();
  7. }
  8. private static JobConfiguration createScheduleJobConfiguration() {
  9. // 创建定时作业配置, 并且使用记录日志策略
  10. return JobConfiguration.newBuilder("myScheduleJob", 3).cron("0/5 * * * * ?").jobErrorHandlerType("LOG").build();
  11. }
  12. private static JobConfiguration createOneOffJobConfiguration() {
  13. // 创建一次性作业配置, 并且使用记录日志策略
  14. return JobConfiguration.newBuilder("myOneOffJob", 3).jobErrorHandlerType("LOG").build();
  15. }
  16. private static CoordinatorRegistryCenter createRegistryCenter() {
  17. // 配置注册中心
  18. ...
  19. }
  20. }

抛出异常策略

  1. public class JobDemo {
  2. public static void main(String[] args) {
  3. // 定时调度作业
  4. new ScheduleJobBootstrap(createRegistryCenter(), new MyJob(), createScheduleJobConfiguration()).schedule();
  5. // 一次性调度作业
  6. new OneOffJobBootstrap(createRegistryCenter(), new MyJob(), createOneOffJobConfiguration()).execute();
  7. }
  8. private static JobConfiguration createScheduleJobConfiguration() {
  9. // 创建定时作业配置, 并且使用抛出异常策略
  10. return JobConfiguration.newBuilder("myScheduleJob", 3).cron("0/5 * * * * ?").jobErrorHandlerType("THROW").build();
  11. }
  12. private static JobConfiguration createOneOffJobConfiguration() {
  13. // 创建一次性作业配置, 并且使用抛出异常策略
  14. return JobConfiguration.newBuilder("myOneOffJob", 3).jobErrorHandlerType("THROW").build();
  15. }
  16. private static CoordinatorRegistryCenter createRegistryCenter() {
  17. // 配置注册中心
  18. ...
  19. }
  20. }

忽略异常策略

  1. public class JobDemo {
  2. public static void main(String[] args) {
  3. // 定时调度作业
  4. new ScheduleJobBootstrap(createRegistryCenter(), new MyJob(), createScheduleJobConfiguration()).schedule();
  5. // 一次性调度作业
  6. new OneOffJobBootstrap(createRegistryCenter(), new MyJob(), createOneOffJobConfiguration()).execute();
  7. }
  8. private static JobConfiguration createScheduleJobConfiguration() {
  9. // 创建定时作业配置, 并且使用忽略异常策略
  10. return JobConfiguration.newBuilder("myScheduleJob", 3).cron("0/5 * * * * ?").jobErrorHandlerType("IGNORE").build();
  11. }
  12. private static JobConfiguration createOneOffJobConfiguration() {
  13. // 创建一次性作业配置, 并且使用忽略异常策略
  14. return JobConfiguration.newBuilder("myOneOffJob", 3).jobErrorHandlerType("IGNORE").build();
  15. }
  16. private static CoordinatorRegistryCenter createRegistryCenter() {
  17. // 配置注册中心
  18. ...
  19. }
  20. }

邮件通知策略

请参考 这里 了解更多。

Maven POM:

  1. <dependency>
  2. <groupId>org.apache.shardingsphere.elasticjob</groupId>
  3. <artifactId>elasticjob-error-handler-email</artifactId>
  4. <version>${latest.release.version}</version>
  5. </dependency>
  1. public class JobDemo {
  2. public static void main(String[] args) {
  3. // 定时调度作业
  4. new ScheduleJobBootstrap(createRegistryCenter(), new MyJob(), createScheduleJobConfiguration()).schedule();
  5. // 一次性调度作业
  6. new OneOffJobBootstrap(createRegistryCenter(), new MyJob(), createOneOffJobConfiguration()).execute();
  7. }
  8. private static JobConfiguration createScheduleJobConfiguration() {
  9. // 创建定时作业配置, 并且使用邮件通知策略
  10. JobConfiguration jobConfig = JobConfiguration.newBuilder("myScheduleJob", 3).cron("0/5 * * * * ?").jobErrorHandlerType("EMAIL").build();
  11. setEmailProperties(jobConfig);
  12. return jobConfig;
  13. }
  14. private static JobConfiguration createOneOffJobConfiguration() {
  15. // 创建一次性作业配置, 并且使用邮件通知策略
  16. JobConfiguration jobConfig = JobConfiguration.newBuilder("myOneOffJob", 3).jobErrorHandlerType("EMAIL").build();
  17. setEmailProperties(jobConfig);
  18. return jobConfig;
  19. }
  20. private static void setEmailProperties(final JobConfiguration jobConfig) {
  21. // 设置邮件的配置
  22. jobConfig.getProps().setProperty(EmailPropertiesConstants.HOST, "host");
  23. jobConfig.getProps().setProperty(EmailPropertiesConstants.PORT, "465");
  24. jobConfig.getProps().setProperty(EmailPropertiesConstants.USERNAME, "username");
  25. jobConfig.getProps().setProperty(EmailPropertiesConstants.PASSWORD, "password");
  26. jobConfig.getProps().setProperty(EmailPropertiesConstants.FROM, "from@xxx.xx");
  27. jobConfig.getProps().setProperty(EmailPropertiesConstants.TO, "to1@xxx.xx,to1@xxx.xx");
  28. }
  29. private static CoordinatorRegistryCenter createRegistryCenter() {
  30. // 配置注册中心
  31. ...
  32. }
  33. }

企业微信通知策略

请参考 这里 了解更多。

Maven POM:

  1. <dependency>
  2. <groupId>org.apache.shardingsphere.elasticjob</groupId>
  3. <artifactId>elasticjob-error-handler-wechat</artifactId>
  4. <version>${latest.release.version}</version>
  5. </dependency>
  1. public class JobDemo {
  2. public static void main(String[] args) {
  3. // 定时调度作业
  4. new ScheduleJobBootstrap(createRegistryCenter(), new MyJob(), createScheduleJobConfiguration()).schedule();
  5. // 一次性调度作业
  6. new OneOffJobBootstrap(createRegistryCenter(), new MyJob(), createOneOffJobConfiguration()).execute();
  7. }
  8. private static JobConfiguration createScheduleJobConfiguration() {
  9. // 创建定时作业配置, 并且使用企业微信通知策略
  10. JobConfiguration jobConfig = JobConfiguration.newBuilder("myScheduleJob", 3).cron("0/5 * * * * ?").jobErrorHandlerType("WECHAT").build();
  11. setWechatProperties(jobConfig);
  12. return jobConfig;
  13. }
  14. private static JobConfiguration createOneOffJobConfiguration() {
  15. // 创建一次性作业配置, 并且使用企业微信通知策略
  16. JobConfiguration jobConfig = JobConfiguration.newBuilder("myOneOffJob", 3).jobErrorHandlerType("WECHAT").build();
  17. setWechatProperties(jobConfig);
  18. return jobConfig;
  19. }
  20. private static void setWechatProperties(final JobConfiguration jobConfig) {
  21. // 设置企业微信的配置
  22. jobConfig.getProps().setProperty(WechatPropertiesConstants.WEBHOOK, "you_webhook");
  23. }
  24. private static CoordinatorRegistryCenter createRegistryCenter() {
  25. // 配置注册中心
  26. ...
  27. }
  28. }

钉钉通知策略

请参考 这里 了解更多。

Maven POM:

  1. <dependency>
  2. <groupId>org.apache.shardingsphere.elasticjob</groupId>
  3. <artifactId>elasticjob-error-handler-dingtalk</artifactId>
  4. <version>${latest.release.version}</version>
  5. </dependency>
  1. public class JobDemo {
  2. public static void main(String[] args) {
  3. // 定时调度作业
  4. new ScheduleJobBootstrap(createRegistryCenter(), new MyJob(), createScheduleJobConfiguration()).schedule();
  5. // 一次性调度作业
  6. new OneOffJobBootstrap(createRegistryCenter(), new MyJob(), createOneOffJobConfiguration()).execute();
  7. }
  8. private static JobConfiguration createScheduleJobConfiguration() {
  9. // 创建定时作业配置, 并且使用企业微信通知策略
  10. JobConfiguration jobConfig = JobConfiguration.newBuilder("myScheduleJob", 3).cron("0/5 * * * * ?").jobErrorHandlerType("DINGTALK").build();
  11. setDingtalkProperties(jobConfig);
  12. return jobConfig;
  13. }
  14. private static JobConfiguration createOneOffJobConfiguration() {
  15. // 创建一次性作业配置, 并且使用钉钉通知策略
  16. JobConfiguration jobConfig = JobConfiguration.newBuilder("myOneOffJob", 3).jobErrorHandlerType("DINGTALK").build();
  17. setDingtalkProperties(jobConfig);
  18. return jobConfig;
  19. }
  20. private static void setDingtalkProperties(final JobConfiguration jobConfig) {
  21. // 设置钉钉的配置
  22. jobConfig.getProps().setProperty(DingtalkPropertiesConstants.WEBHOOK, "you_webhook");
  23. jobConfig.getProps().setProperty(DingtalkPropertiesConstants.KEYWORD, "you_keyword");
  24. jobConfig.getProps().setProperty(DingtalkPropertiesConstants.SECRET, "you_secret");
  25. }
  26. private static CoordinatorRegistryCenter createRegistryCenter() {
  27. // 配置注册中心
  28. ...
  29. }
  30. }