总览

本文面向 InLong-Manager 插件开发人员, 尝试尽可能全面地阐述开发一个 Manager 插件所经过的历程,力求消除开发者的困惑,让插件开发变得简单。

开发之前

  • Inlong 作为一个流式数据同步框架,采用 Group + Stream 的构建方式。
  • Inlong Group 可包含多个 Inlong Stream, 每个 Inlong Stream 负责一条独立的数据同步链路。
  • Inlong Group 负责任务所需的物理资源的定义及初始化,这些物理资源主要包括数据同步所需的中间件集群及 Sort 函数;该 Group 下所有的 Stream 共享这些资源。
  • Inlong Manager 通过 CreateGroupWorkflowDefinition 这个工作流创建对应的 Inlong Group 并初始化所有的物理资源, 每个工作流包括数个相互独立的 Service Task。当这个工作流被创建并执行时, 麾下的 Service Task 按照预先定义的顺序依次执行。
  • Service Task 采用监听者模式——也被称作发布-订阅模式构建, 每一个独立的 task 会注册一个或多个 Listener (监听器). Listener 接受工作流上下文信息并执行相关的逻辑。
  • 作为开发人员, 你需要开发原生 Listener 以实现自定义的操作逻辑。

流程图示

  • Inlong Manager 插件机制如下图所示:

    Manager 插件 - 图1

  • 如图所示,插件需要被放置在安装路径之下, 当 Inlong Manager 进程启动时,会自动寻找插件编译的 jar 包并加载其中的代码。

Manager 插件 - 图2

  • 作为开发人员,当你看到下图所示的日志时,可以确认插件已经加载成功了:

Manager 插件 - 图3

参考 Demo

  • 为方便开发人员理解. 我们在 Inlong Manager 目录下增加了 manager-pluin-example , 开发人员可参考 EmptyProcessPlugin 进行自己的插件开发;
  1. public class EmptyProcessPlugin implements ProcessPlugin {
  2. @Override
  3. public Map<DataSourceOperateListener, EventSelector> createSourceOperateListeners() {
  4. return new LinkedHashMap<>();
  5. }
  6. @Override
  7. public Map<QueueOperateListener, EventSelector> createQueueOperateListeners() {
  8. return new LinkedHashMap<>();
  9. }
  10. @Override
  11. public Map<SortOperateListener, EventSelector> createSortOperateListeners() {
  12. return ProcessPlugin.super.createSortOperateListeners();
  13. }
  14. @Override
  15. public Map<SinkOperateListener, EventSelector> createSinkOperateListeners() {
  16. return ProcessPlugin.super.createSinkOperateListeners();
  17. }
  18. }
  • DataSourceOperateListener,QueueOperateListener,SortOperateListener,SinkOperateListenerTaskEventListener 的子类, 分别负责源数据端,消息队列,sort 函数,目标数据端的初始化工作。 与 Listener 绑定的EventSelector决定该 Listener 是否在运行时被激活。
  1. public interface EventSelector {
  2. boolean accept(WorkflowContext context);
  3. }
  • 完成插件的开发工作后, 你需要编写对应的Yaml格式的插件定义文件, 将其放置在工程目录 resources/META-INF 下。
  1. name: example
  2. description: example for manager plugin
  3. javaVersion: 1.8
  4. pluginClass: org.apache.inlong.manager.plugin.EmptyProcessPlugin
  • 如果你不确定怎样开发一个可用的 Listener ,请参考org.apache.inlong.manager.service.workflow.ServiceTaskListenerFactory中原生 Listener 的逻辑。

写在最后

我们在 Inlong Manager 中提供插件化机制,希望可以方便开发人员在 Inlong 现有的框架下定制化开发自己的功能。 诚然,当前的插件机制还很不完善,我们会持续致力于功能的改进,也欢迎您的加入。