总览

本文面向 InLong-Agent 插件开发人员,尝试尽可能全面地阐述开发一个 Agent 插件所经过的历程,力求消除开发者的困惑,让插件开发变得简单。

开发之前

InLong Agent 本身作为数据采集框架,采用 Job + Task 架构构建。并将数据源读取和写入抽象成为 Reader/Sink 插件,纳入到整个框架中。

开发人员需要明确 Job 以及 Task 的概念:

  • Job: Job是 Agent 用以描述从一个源头到一个目的端的同步作业,是 Agent 数据同步的最小业务单元。比如:读取一个文件目录下的所有文件
  • Task: Task是把Job拆分得到的最小执行单元。比如:文件夹下有多个文件需要被读取,那么一个 job 会被拆分成为多个 task ,每个 task 读取对应的文件

一个Task包含以下各个组件:

  • Reader:Reader 为数据采集模块,负责采集数据源的数据,将数据发送给 channel。
  • Sink: Sink 为数据写入模块,负责不断向 channel 取数据,并将数据写入到目的端。
  • Channel:Channel 用于连接 reader 和 sink,作为两者的数据传输通道,并起到了数据的写入读取监控作用

作为开发人员,实际上只需要开发特定的 Source、Reader 以及 Sink 即可,数据如果需要持久化到本地磁盘,使用持久化 Channel ,如果否则使用内存 Channel

流程图示

上述介绍的 Job \ Task \ Reader \ Sink \ Channel 概念可以用下图表示: Agent 插件 - 图1

  1. 用户提交 Job(通过 manager 或者通过 curl 方式提交),Job 中定义了需要使用的 Source, Channel, Sink(通过类的全限定名定义)
  2. 框架启动 Job,通过反射机制创建出 Source
  3. 框架启动 Source,并调用 Source 的 Split 接口,生成一个或者多个 Task
  4. 生成一个 Task 时,同时生成 Reader(一种类型的 Source 会生成对应的 reader),用户配置的 Channel 以及用户配置的 Sink
  5. Task 开始执行,Reader 开始读取数据到 Channel,Sink 从 Channel 中取数进行发送
  6. Job 和 Task 执行时所需要的所有信息都封装在 JobProfile 中

参考 Demo

请开发人员通过阅读 Agent 源码中的 Job 类、Task 类、TextFileSource 类、TextFileReader 类、以及 ProxySink 类来弄懂上述流程

开发流程

1、首先开发 Source , 实现 Split 逻辑,返回 Reader 列表 2、开发对应的 Reader ,实现读取数据并写入到 Channel 的逻辑 3、开发对应的 Sink , 实现从 Channel 中取数并写入到指定 Sink 中的逻辑

编程必知接口

下面将介绍开发一款插件需要知道的类与接口,如下:

Reader

  1. private class ReaderImpl implements Reader {
  2. private int count = 0;
  3. @Override
  4. public Message read() {
  5. count += 1;
  6. return new DefaultMessage("".getBytes(StandardCharsets.UTF_8));
  7. }
  8. @Override
  9. public boolean isFinished() {
  10. return count > 99999;
  11. }
  12. @Override
  13. public String getReadSource() {
  14. return null;
  15. }
  16. @Override
  17. public void setReadTimeout(long mill) {
  18. }
  19. }

Reader接口功能如下:

  • read: 被单个 Task 调用,调用后返回读取的一条消息,Agent 内部的消息使用 Message 封装
  • isFinished: 判断是否读取完成,举例:如果是 SQL 任务,则判断是否读取完了 ResultSet 中的所有内容,如果是文件任务,则判断超过用户设置的等待时间后是否还有数据写入
  • getReadSource: 获取采集源,举例:如果是文件任务,则返回当前读取的文件名
  • setReadTimeout: 设置读取超时时间

Sink

  1. public interface Sink extends Stage {
  2. /**
  3. * Write data into data center
  4. *
  5. * @param message - message
  6. */
  7. void write(Message message);
  8. /**
  9. * set source file name where the message is generated
  10. * @param sourceName
  11. */
  12. void setSourceName(String sourceName);
  13. /**
  14. * every sink should include a message filter to filter out stream id
  15. */
  16. MessageFilter initMessageFilter(JobProfile jobConf);
  17. }

Sink接口功能如下:

  • write: 被单个 Task 调用,从 Task 中的 Channel 读取一条消息,并写入到特定的存储介质中,以 PulsarSink 为例,则需要通过 PulsarSender 发送到 Pulsar
  • setSourceName: 设置数据源名称,如果是文件,则是文件名
  • initMessageFilter: 初始化 MessageFilter , 用户可以在Job配置文件中通过设置 agent.message.filter.classname 来创建一个消息过滤器来过滤每一条消息,详情可以参考 MessageFilter 接口

Source

  1. /**
  2. * Source can be split into multiple reader.
  3. */
  4. public interface Source {
  5. /**
  6. * Split source into a list of readers.
  7. *
  8. * @param conf job conf
  9. * @return - list of reader
  10. */
  11. List<Reader> split(JobProfile conf);
  12. }

Source接口功能如下:

  • split: 被单个 Job 调用,产生多个 Reader,举例:一个读取文件任务,匹配文件夹内的多个文件,在 job 启动时,会指定 TextFileSource 作为 Source 入口, 调用 split 函数后,TextFileSource 会检测用户设置的文件夹内有多少符合路径匹配表达式的路径,并生成 TextFileReader 读取

任务配置

代码写好了,有没有想过框架是怎么找到插件的入口类的?框架是如何加载插件的呢?

在提交任务时,会发现任务中定义了插件的相关信息,包括入口类。例如:

  1. {
  2. "job": {
  3. "name": "fileAgentTest",
  4. "source": "org.apache.inlong.agent.plugin.sources.TextFileSource",
  5. "sink": "org.apache.inlong.agent.plugin.sinks.ProxySink",
  6. "channel": "org.apache.inlong.agent.plugin.channel.MemoryChannel"
  7. }
  8. }
  • source: Source 类的全限定名称,框架通过反射插件入口类的实例。
  • sink: Sink 类的全限定名称,框架通过反射插件入口类的实例。
  • channel: 使用的 Channel 类名,框架通过反射插件入口类的实例。

Message

跟一般的生产者-消费者模式一样,Reader插件和Sink插件之间也是通过channel来实现数据的传输的。 channel可以是内存的,也可能是持久化的,插件不必关心。插件通过RecordSenderchannel写入数据,通过RecordReceiverchannel读取数据。

channel中的一条数据为一个Message的对象,Message中包含一个字节数组以及一个Map表示的属性数据

Message有如下方法:

  1. public interface Message {
  2. /**
  3. * Data content of message.
  4. *
  5. * @return bytes body
  6. */
  7. byte[] getBody();
  8. /**
  9. * Data attribute of message
  10. *
  11. * @return map header
  12. */
  13. Map<String, String> getHeader();
  14. }

开发人员可以根据该接口拓展定制化的 Message ,比如 ProxyMessage 中,就包含了 InLongGroupId, InLongStreamId 等属性

Last but not Least

新增插件都必须在InLong官方wiki中有一篇文档,文档需要包括但不限于以下内容:

  1. 快速介绍:介绍插件的使用场景,特点等。
  2. 实现原理:介绍插件实现的底层原理,比如sqlReader通过执行Sql查询来读取数据库中的数据
  3. 配置说明
    • 给出典型场景下的同步任务的json配置文件。
    • 介绍每个参数的含义、是否必选、默认值、取值范围和其他约束。
  4. 约束限制:是否存在其他的使用限制条件。
  5. FAQ:用户经常会遇到的问题。