任务插件开发

提醒:目前任务插件开发暂不支持热部署

基于SHELL的任务

基于YARN的计算(参见MapReduceTask)

  • 需要在 cn.dolphinscheduler.server.worker.task 下的 TaskManager 类中创建自定义任务(也需在TaskType注册对应的任务类型)
  • 需要继承cn.dolphinscheduler.server.worker.task 下的 AbstractYarnTask
  • 构造方法调度 AbstractYarnTask 构造方法
  • 继承 AbstractParameters 自定义任务参数实体
  • 重写 AbstractTaskinit 方法中解析自定义任务参数
  • 重写 buildCommand 封装command

基于非YARN的计算(参见ShellTask)

  • 需要在 cn.dolphinscheduler.server.worker.task 下的 TaskManager 中创建自定义任务

  • 需要继承cn.dolphinscheduler.server.worker.task 下的 AbstractTask

  • 构造方法中实例化 ShellCommandExecutor

    1. public ShellTask(TaskProps props, Logger logger) {
    2. super(props, logger);
    3. this.taskDir = props.getTaskDir();
    4. this.processTask = new ShellCommandExecutor(this::logHandle,
    5. props.getTaskDir(), props.getTaskAppId(),
    6. props.getTenantCode(), props.getEnvFile(), props.getTaskStartTime(),
    7. props.getTaskTimeout(), logger);
    8. this.processDao = DaoFactory.getDaoInstance(ProcessDao.class);
    9. }

    传入自定义任务的 TaskProps和自定义Logger,TaskProps 封装了任务的信息,Logger分装了自定义日志信息

  • 继承 AbstractParameters 自定义任务参数实体

  • 重写 AbstractTaskinit 方法中解析自定义任务参数实体

  • 重写 handle 方法,调用 ShellCommandExecutorrun 方法,第一个参数传入自己的command,第二个参数传入 ProcessDao,设置相应的 exitStatusCode

基于非SHELL的任务(参见SqlTask)

  • 需要在 cn.dolphinscheduler.server.worker.task 下的 TaskManager 中创建自定义任务
  • 需要继承cn.dolphinscheduler.server.worker.task 下的 AbstractTask
  • 继承 AbstractParameters 自定义任务参数实体
  • 构造方法或者重写 AbstractTaskinit 方法中,解析自定义任务参数实体
  • 重写 handle 方法实现业务逻辑并设置相应的exitStatusCode