总览

InLong-Sort是一个ETL系统,当前支持的sink类型包括hive,kafka,clickhouse以及iceberg。

本文介绍如何在InLong-Sort中扩展一个新的sink类型。

扩展flink sink

InLong-Sort是一套基于flink计算引擎的ETL系统,在扩展新的sink到InLong-Sort前,首先需要扩展一个新的flink connector。

如何扩展flink connector请参考flink官方文档DataStream Connectors.

扩展sink协议

扩展完flink sink后,需要在InLong-Sort中扩展对应的sink协议。该协议用来描述该sink所需要的一些必要信息。

扩展协议首先需要实现一个具体的类,继承父类 inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/SinkInfo.java

举例

  1. public class DemoSinkInfo extends SinkInfo {
  2. // Place necessary attributes here
  3. @JsonCreator
  4. public DemoSinkInfo(FieldInfo[] fields) {
  5. super(fields);
  6. }
  7. }

扩展完协议后,需要将该协议标记为SinkInfo的子类型,通过全局唯一的name进行标识。

举例:扩展一个名为Constants.SINK_TYPE_DEMO的协议,该协议类为DemoSinkInfo

  1. /**
  2. * The base class of the data sink in the metadata.
  3. */
  4. @JsonTypeInfo(
  5. use = JsonTypeInfo.Id.NAME,
  6. include = JsonTypeInfo.As.PROPERTY,
  7. property = "type")
  8. @JsonSubTypes({
  9. @Type(value = ClickHouseSinkInfo.class, name = Constants.SINK_TYPE_CLICKHOUSE),
  10. @Type(value = HiveSinkInfo.class, name = Constants.SINK_TYPE_HIVE),
  11. @Type(value = KafkaSinkInfo.class, name = Constants.SINK_TYPE_KAFKA),
  12. @Type(value = IcebergSinkInfo.class, name = Constants.SINK_TYPE_ICEBERG),
  13. // The new sink protocol
  14. @Type(value = DemoSinkInfo.class, name = Constants.SINK_TYPE_DEMO)}
  15. )
  16. public abstract class SinkInfo implements Serializable {
  17. private static final long serialVersionUID = 1485856855405721745L;
  18. @JsonProperty("fields")
  19. private final FieldInfo[] fields;
  20. public SinkInfo(@JsonProperty("fields") FieldInfo[] fields) {
  21. this.fields = checkNotNull(fields);
  22. }
  23. @JsonProperty("fields")
  24. public FieldInfo[] getFields() {
  25. return fields;
  26. }
  27. }

集成新的Sink到InLong-Sort的主流程

扩展完flink sink并实现了对应的sink协议后,我们就可以将该sink集成到InLong-Sort的主流程中了。

InLong-Sort本质是一个flink的job,入口为inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/Entrance.java

举例 :扩展一个名为Constants.SINK_TYPE_DEMO的sink

  1. private static void buildSinkStream(
  2. DataStream<Row> sourceStream,
  3. Configuration config,
  4. SinkInfo sinkInfo,
  5. Map<String, Object> properties,
  6. long dataflowId) throws IOException, ClassNotFoundException {
  7. final String sinkType = checkNotNull(config.getString(Constants.SINK_TYPE));
  8. switch (sinkType) {
  9. case Constants.SINK_TYPE_CLICKHOUSE:
  10. break;
  11. case Constants.SINK_TYPE_HIVE:
  12. break;
  13. case Constants.SINK_TYPE_ICEBERG:
  14. break;
  15. case Constants.SINK_TYPE_KAFKA:
  16. break;
  17. case Constants.SINK_TYPE_DEMO:
  18. // Add the extended sink function here
  19. break;
  20. default:
  21. throw new IllegalArgumentException("Unsupported sink type " + sinkType);
  22. }
  23. }