Kafka

Overview

The Kafka Load Node supports to write data into Kafka topics. It can support to write data in the normal fashion and write data in the upsert fashion. The upsert-kafka connector can consume a changelog stream. It will write INSERT/UPDATE_AFTER data as normal Kafka messages value, and write DELETE data as Kafka messages with null values (indicate tombstone for the key).

Supported Version

Load NodeKafka version
Kafka0.10+

Dependencies

In order to set up the Kafka Load Node, the following provides dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with Sort Connectors JAR bundles.

Maven dependency

  1. <dependency>
  2. <groupId>org.apache.inlong</groupId>
  3. <artifactId>sort-connector-kafka</artifactId>
  4. <version>2.1.0-SNAPSHOT</version>
  5. </dependency>

How to create a Kafka Load Node

Usage for SQL API

The example below shows how to create a Kafka Load Node with Flink SQL :

  • connector is kafka-inlong
  1. -- Create a Kafka table 'kafka_load_node' in Flink SQL
  2. Flink SQL> CREATE TABLE kafka_load_node (
  3. `id` INT,
  4. `name` STRINTG
  5. ) WITH (
  6. 'connector' = 'kafka-inlong',
  7. 'topic' = 'user',
  8. 'properties.bootstrap.servers' = 'localhost:9092',
  9. 'properties.group.id' = 'testGroup',
  10. 'format' = 'csv'
  11. )
  • connector is upsert-kafka
  1. -- Create a Kafka table 'kafka_load_node' in Flink SQL
  2. Flink SQL> CREATE TABLE kafka_load_node (
  3. `id` INT,
  4. `name` STRINTG,
  5. PRIMARY KEY (`id`) NOT ENFORCED
  6. ) WITH (
  7. 'connector' = 'upsert-kafka-inlong',
  8. 'topic' = 'user',
  9. 'properties.bootstrap.servers' = 'localhost:9092',
  10. 'key.format' = 'csv',
  11. 'value.format' = 'csv'
  12. )

Usage for InLong Dashboard

When creating a data flow, select Kafka for the data stream direction, and click “Add” to configure it.

Kafka Configuration

Usage for InLong Manager Client

TODO: It will be supported in the future.

Kafka Load Node Options

OptionRequiredDefaultTypeDescription
connectorrequired(none)StringSpecify which connector to use, valid values are: 1. for the Upsert Kafka use: upsert-kafka-inlong 2. for normal Kafka use: kafka-inlong
topicrequired(none)StringTopic name(s) to read data from when the table is used as source. It also supports topic list for source by separating topic by semicolon like topic-1;topic-2. Note, only one of topic-pattern and topic can be specified for sources.
topic-patternoptional(none)StringDynamic topic extraction pattern, like ‘${VARIABLE_NAME}’, which is only used in kafka multiple sink scenarios and is valid when ‘format’ is ‘raw’.
sink.multiple.formatoptional(none)StringFormat of kafka raw data, currently only supports [canal-json|debezium-json] which is only used in kafka multiple sink scenarios and is valid when ‘format’ is ‘raw’.
properties.bootstrap.serversrequired(none)StringComma separated list of Kafka brokers.
properties.*optional(none)StringThis can set and pass arbitrary Kafka configurations. Suffix names must match the configuration key defined in Kafka Configuration documentation. Flink will remove the properties. key prefix and pass the transformed key and values to the underlying KafkaClient. For example, you can disable automatic topic creation via properties.allow.auto.create.topics = false. But there are some configurations that do not support to set, because Flink will override them, e.g. key.deserializer and value.deserializer.
formatrequired for normal Kafka(none)StringThe format used to deserialize and serialize the value part of Kafka messages. Please refer to the formats page for more details and more format options. Note: Either this option or the value.format option are required.
key.formatoptional(none)StringThe format used to deserialize and serialize the key part of Kafka messages. Please refer to the formats page for more details and more format options. Note: If a key format is defined, the ‘key.fields’ option is required as well. Otherwise the Kafka records will have an empty key.
key.fieldsoptional[]List<String>Defines an explicit list of physical columns from the table schema that configure the data type for the key format. By default, this list is empty and thus a key is undefined. The list should look like ‘field1;field2’.
key.fields-prefixoptional(none)StringDefines a custom prefix for all fields of the key format to avoid name clashes with fields of the value format. By default, the prefix is empty. If a custom prefix is defined, both the table schema and ‘key.fields’ will work with prefixed names. When constructing the data type of the key format, the prefix will be removed and the non-prefixed names will be used within the key format. Please note that this option requires that ‘value.fields-include’ must be set to ‘EXCEPT_KEY’.
value.formatrequired for upsert Kafka(none)StringThe format used to deserialize and serialize the value part of Kafka messages. Please refer to the formats page for more details and more format options.
value.fields-includeoptionalALLEnum Possible values: [ALL, EXCEPT_KEY]Defines a strategy how to deal with key columns in the data type of the value format. By default, ‘ALL’ physical columns of the table schema will be included in the value format which means that key columns appear in the data type for both the key and value format
sink.partitioneroptional‘default’StringOutput partitioning from Flink’s partitions into Kafka’s partitions. Valid values are
default: use the kafka default partitioner to partition records.
fixed: each Flink partition ends up in at most one Kafka partition.
round-robin: a Flink partition is distributed to Kafka partitions sticky round-robin.
raw-hash: Extract value based on ‘sink.multiple.partition-pattern’ to ‘hash’ as the final partition, which is only used in kafka multiple sink scenarios and is valid when ‘format’ is ‘raw’. It only works when record’s keys are not specified. Custom FlinkKafkaPartitioner subclass: e.g. ‘org.mycompany.MyPartitioner’. See the following Sink Partitioning for more details.
sink.multiple.partition-patternoptional(none)StringDynamic partition extraction pattern, like ‘${VARIABLE_NAME}’ which is only used in kafka multiple sink scenarios and is valid when ‘format’ is ‘raw’.
sink.semanticoptionalat-least-onceStringDefines the delivery semantic for the Kafka sink. Valid enumerationns are ‘at-least-once’, ‘exactly-once’ and ‘none’. See Consistency guarantees for more details.
sink.parallelismoptional(none)IntegerDefines the parallelism of the Kafka sink operator. By default, the parallelism is determined by the framework using the same parallelism of the upstream chained operator.
inlong.metric.labelsoptional(none)StringInlong metric label, format of value is groupId={groupId}&streamId={streamId}&nodeId={nodeId}.

Available Metadata

It supports write metadata for format canal-json-inlong.

See the Kafka Extract Node for a list of all available metadata fields.

Features

Support Dynamic Schema Writing

Dynamic schema writing supports dynamic extraction of topic and partition from data and writes to the corresponding topic and partition. In order to support dynamic schema writing, you need to set the format of Kafka to ‘raw’, Also need to set the serialization format of the upstream data (via the option ‘sink.multiple.format’ to set, currently only supports [canal-json|debezium-json]).

Dynamic Topic Extraction

Dynamic topic extraction is by parsing the topic pattern and extracting the topic from the data. In order to support dynamic extraction of topic, you need to set the option ‘topic-pattern’, Kafka Load Node will parse ‘topic-pattern’ as the final topic, If parsing fails, it will be written to the default topic set via ‘topic’. ‘topic-pattern’ supports constants and variables, constants are string constants, variables are strictly represented by ‘${VARIABLE_NAME}’, and the value of the variable comes from the data itself, that is, through ‘sink.multiple.format’ a metadata field of a specified Format, or a physical field in the data.

Examples of ‘topic-parttern’ are as follows:

  • ‘sink.multiple.format’ is ‘canal-json’:

The upstream data is:

  1. {
  2. "data": [
  3. {
  4. "id": "111",
  5. "name": "scooter",
  6. "description": "Big 2-wheel scooter",
  7. "weight": "5.18"
  8. }
  9. ],
  10. "database": "inventory",
  11. "es": 1589373560000,
  12. "id": 9,
  13. "isDdl": false,
  14. "mysqlType": {
  15. "id": "INTEGER",
  16. "name": "VARCHAR(255)",
  17. "description": "VARCHAR(512)",
  18. "weight": "FLOAT"
  19. },
  20. "old": [
  21. {
  22. "weight": "5.15"
  23. }
  24. ],
  25. "pkNames": [
  26. "id"
  27. ],
  28. "sql": "",
  29. "sqlType": {
  30. "id": 4,
  31. "name": 12,
  32. "description": 12,
  33. "weight": 7
  34. },
  35. "table": "products",
  36. "ts": 1589373560798,
  37. "type": "UPDATE"
  38. }

‘topic-pattern’ is ‘{database}_${table}’, and the extracted topic is ‘inventory_products’ (‘database’, ‘table’ are metadata fields)

‘topic-pattern’ is ‘{database}${table}${id}’, and the extracted topic is ‘inventory_products_111’ (‘database’, ‘table’ are metadata fields, and ‘id’ are physical fields)

  • ‘sink.multiple.format’ is ‘debezium-json’:

The upstream data is:

  1. {
  2. "before": {
  3. "id": 4,
  4. "name": "scooter",
  5. "description": "Big 2-wheel scooter",
  6. "weight": 5.18
  7. },
  8. "after": {
  9. "id": 4,
  10. "name": "scooter",
  11. "description": "Big 2-wheel scooter",
  12. "weight": 5.15
  13. },
  14. "source": {
  15. "db": "inventory",
  16. "table": "products"
  17. },
  18. "op": "u",
  19. "ts_ms": 1589362330904,
  20. "transaction": null
  21. }

‘topic-pattern’ is ‘{source.db}_${source.table}’, and the extracted topic is ‘inventory_products’ (‘source.db’, ‘source.table’ are metadata fields)

‘topic-pattern’ is ‘{source.db}${source.table}${id}’, and the extracted topic is ‘inventory_products_4’ (‘source.db’, ‘source.table’ are metadata fields, and ‘id’ are physical fields)

Dynamic Partition Extraction

Dynamic partition extraction is to extract Partition from data by parsing partition pattern, which is similar to dynamic topic extraction. To support dynamic extraction of topics, you need to set the option ‘sink.partitioner’ to ‘raw-hash’ and option ‘sink.multiple.partition-pattern’, Kafka Load Node will parse ‘sink.multiple.partition-pattern’ as the partition key, hash the partition key and take the remainder of the partition size as the final partition, If parsing fails, it will return null and execute Kafka’s default partitioning strategy. ‘sink.multiple.partition-pattern’ support constants, variables and primary keys. Constants are string constants. Variables are strictly represented by ‘${VARIABLE_NAME}’, the value of the variable comes from the data itself, that is, it can be a metadata field of a format specified by ‘sink.multiple.format’, or it can be a physical field in the data. The primary key is a special constant ‘PRIMARY_KEY’, which extracts the primary key value of the record based on a certain format data format.

Notes: Kafka dynamic partition extraction based on ‘PRIMARY_KEY’ has a limitation that the primary key information needs to be specified in the data, For example, if Format is ‘canal-json’, then its primary key Key is ‘pkNames’. In addition, because format ‘debezium-json’ has no definition of primary key, here we agree that the primary key of ‘debezium-json’ is also ‘pkNames’ and is included in ‘source’ like other metadata fields such as ‘table’ and ‘db’, If partitioning by primary key is used, and the format is ‘debezium-json’, you need to ensure that the real data meets the above conventions.

Data Type Mapping

Kafka stores message keys and values as bytes, so Kafka doesn’t have schema or data types. The Kafka messages are deserialized and serialized by formats, e.g. csv, json, avro. Thus, the data type mapping is determined by specific formats. Please refer to Formats pages for more details.