Overview

InLong-Sort is known as a real-time ETL system. Currently, supported sinks are hive, kafka, clickhouse and iceberg. This article introduces how to extend a new type of sink in InLong-Sort.

Extend a new sink function

InLong-Sort is based on flink, when extending a new type of sink in InLong-Sort, either a new type of flink sink or a predefined sink in flink is required. refer to DataStream Connectors.

Extend a new sink protocol

Firstly, implement a new sink protocol which extends inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/SinkInfo.java

All necessary attributes used by the corresponding flink sink which extended before should be placed in the protocol

Example

  1. public class DemoSinkInfo extends SinkInfo {
  2. // Place necessary attributes here
  3. @JsonCreator
  4. public DemoSinkInfo(FieldInfo[] fields) {
  5. super(fields);
  6. }
  7. }

Secondly, mark the new sink protocol as subtype of SinkInfo and give it a name

Example : A new sink protocol DemoSinkInfo whose subtype name is Constants.SINK_TYPE_DEMO

  1. /**
  2. * The base class of the data sink in the metadata.
  3. */
  4. @JsonTypeInfo(
  5. use = JsonTypeInfo.Id.NAME,
  6. include = JsonTypeInfo.As.PROPERTY,
  7. property = "type")
  8. @JsonSubTypes({
  9. @Type(value = ClickHouseSinkInfo.class, name = Constants.SINK_TYPE_CLICKHOUSE),
  10. @Type(value = HiveSinkInfo.class, name = Constants.SINK_TYPE_HIVE),
  11. @Type(value = KafkaSinkInfo.class, name = Constants.SINK_TYPE_KAFKA),
  12. @Type(value = IcebergSinkInfo.class, name = Constants.SINK_TYPE_ICEBERG),
  13. // The new sink protocol
  14. @Type(value = DemoSinkInfo.class, name = Constants.SINK_TYPE_DEMO)}
  15. )
  16. public abstract class SinkInfo implements Serializable {
  17. private static final long serialVersionUID = 1485856855405721745L;
  18. @JsonProperty("fields")
  19. private final FieldInfo[] fields;
  20. public SinkInfo(@JsonProperty("fields") FieldInfo[] fields) {
  21. this.fields = checkNotNull(fields);
  22. }
  23. @JsonProperty("fields")
  24. public FieldInfo[] getFields() {
  25. return fields;
  26. }
  27. }

Bundle the new sink, make it an alternative sink in Inlong_sort

After extending a new flink sink and a new sink protocol in InLong-Sort, we can bundle the new sink into the flink job (the entrance of InLong-Sort)

The entrance of the flink job is inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/Entrance.java

Example

  1. private static void buildSinkStream(
  2. DataStream<Row> sourceStream,
  3. Configuration config,
  4. SinkInfo sinkInfo,
  5. Map<String, Object> properties,
  6. long dataflowId) throws IOException, ClassNotFoundException {
  7. final String sinkType = checkNotNull(config.getString(Constants.SINK_TYPE));
  8. switch (sinkType) {
  9. case Constants.SINK_TYPE_CLICKHOUSE:
  10. break;
  11. case Constants.SINK_TYPE_HIVE:
  12. break;
  13. case Constants.SINK_TYPE_ICEBERG:
  14. break;
  15. case Constants.SINK_TYPE_KAFKA:
  16. break;
  17. case Constants.SINK_TYPE_DEMO:
  18. // Add the extended sink function here
  19. break;
  20. default:
  21. throw new IllegalArgumentException("Unsupported sink type " + sinkType);
  22. }
  23. }