Kafka

Overview

The Kafka Load Node supports to write data into Kafka topics. It can support to write data in the normal fashion and write data in the upsert fashion. The upsert-kafka connector can consume a changelog stream. It will write INSERT/UPDATE_AFTER data as normal Kafka messages value, and write DELETE data as Kafka messages with null values (indicate tombstone for the key).

Supported Version

Load NodeKafka version
Kafka0.10+

Dependencies

In order to set up the Kafka Load Node, the following provides dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with Sort Connectors JAR bundles.

Maven dependency

  1. <dependency>
  2. <groupId>org.apache.inlong</groupId>
  3. <artifactId>sort-connector-kafka</artifactId>
  4. <version>1.3.0-SNAPSHOT</version>
  5. </dependency>

How to create a Kafka Load Node

Usage for SQL API

The example below shows how to create a Kafka Load Node with Flink SQL :

  • connector is kafka-inlong
  1. -- Create a Kafka table 'kafka_load_node' in Flink SQL
  2. Flink SQL> CREATE TABLE kafka_load_node (
  3. `id` INT,
  4. `name` STRINTG
  5. ) WITH (
  6. 'connector' = 'kafka-inlong',
  7. 'topic' = 'user',
  8. 'properties.bootstrap.servers' = 'localhost:9092',
  9. 'properties.group.id' = 'testGroup',
  10. 'format' = 'csv'
  11. )
  • connector is upsert-kafka
  1. -- Create a Kafka table 'kafka_load_node' in Flink SQL
  2. Flink SQL> CREATE TABLE kafka_load_node (
  3. `id` INT,
  4. `name` STRINTG,
  5. PRIMARY KEY (`id`) NOT ENFORCED
  6. ) WITH (
  7. 'connector' = 'upsert-kafka-inlong',
  8. 'topic' = 'user',
  9. 'properties.bootstrap.servers' = 'localhost:9092',
  10. 'key.format' = 'csv',
  11. 'value.format' = 'csv'
  12. )

Usage for InLong Dashboard

When creating a data flow, select Kafka for the data stream direction, and click “Add” to configure it.

Kafka Configuration

Usage for InLong Manager Client

TODO: It will be supported in the future.

Kafka Load Node Options

OptionRequiredDefaultTypeDescription
connectorrequired(none)StringSpecify which connector to use, valid values are: 1. for the Upsert Kafka use: upsert-kafka-inlong 2. for normal Kafka use: kafka-inlong
topicrequired(none)StringTopic name(s) to read data from when the table is used as source. It also supports topic list for source by separating topic by semicolon like topic-1;topic-2. Note, only one of topic-pattern and topic can be specified for sources.
properties.bootstrap.serversrequired(none)StringComma separated list of Kafka brokers.
properties.*optional(none)StringThis can set and pass arbitrary Kafka configurations. Suffix names must match the configuration key defined in Kafka Configuration documentation. Flink will remove the properties. key prefix and pass the transformed key and values to the underlying KafkaClient. For example, you can disable automatic topic creation via properties.allow.auto.create.topics = false. But there are some configurations that do not support to set, because Flink will override them, e.g. key.deserializer and value.deserializer.
formatrequired for normal Kafka(none)StringThe format used to deserialize and serialize the value part of Kafka messages. Please refer to the formats page for more details and more format options. Note: Either this option or the value.format option are required.
key.formatoptional(none)StringThe format used to deserialize and serialize the key part of Kafka messages. Please refer to the formats page for more details and more format options. Note: If a key format is defined, the ‘key.fields’ option is required as well. Otherwise the Kafka records will have an empty key.
key.fieldsoptional[]List<String>Defines an explicit list of physical columns from the table schema that configure the data type for the key format. By default, this list is empty and thus a key is undefined. The list should look like ‘field1;field2’.
key.fields-prefixoptional(none)StringDefines a custom prefix for all fields of the key format to avoid name clashes with fields of the value format. By default, the prefix is empty. If a custom prefix is defined, both the table schema and ‘key.fields’ will work with prefixed names. When constructing the data type of the key format, the prefix will be removed and the non-prefixed names will be used within the key format. Please note that this option requires that ‘value.fields-include’ must be set to ‘EXCEPT_KEY’.
value.formatrequired for upsert Kafka(none)StringThe format used to deserialize and serialize the value part of Kafka messages. Please refer to the formats page for more details and more format options.
value.fields-includeoptionalALLEnum Possible values: [ALL, EXCEPT_KEY]Defines a strategy how to deal with key columns in the data type of the value format. By default, ‘ALL’ physical columns of the table schema will be included in the value format which means that key columns appear in the data type for both the key and value format
sink.partitioneroptional‘default’StringOutput partitioning from Flink’s partitions into Kafka’s partitions. Valid values are
default: use the kafka default partitioner to partition records.
fixed: each Flink partition ends up in at most one Kafka partition.
round-robin: a Flink partition is distributed to Kafka partitions sticky round-robin. It only works when record’s keys are not specified. Custom FlinkKafkaPartitioner subclass: e.g. ‘org.mycompany.MyPartitioner’. See the following Sink Partitioning for more details.
sink.semanticoptionalat-least-onceStringDefines the delivery semantic for the Kafka sink. Valid enumerationns are ‘at-least-once’, ‘exactly-once’ and ‘none’. See Consistency guarantees for more details.
sink.parallelismoptional(none)IntegerDefines the parallelism of the Kafka sink operator. By default, the parallelism is determined by the framework using the same parallelism of the upstream chained operator.
inlong.metricoptional(none)StringInlong metric label, format of value is groupId&streamId&nodeId.

Available Metadata

It supports write metadata for format canal-json-inlong.

See the Kafka Extract Node for a list of all available metadata fields.

Data Type Mapping

Kafka stores message keys and values as bytes, so Kafka doesn’t have schema or data types. The Kafka messages are deserialized and serialized by formats, e.g. csv, json, avro. Thus, the data type mapping is determined by specific formats. Please refer to Formats pages for more details.