Apache Kafka SQL Connector

Scan Source: Unbounded Sink: Streaming Append Mode

The Kafka connector allows for reading data from and writing data into Kafka topics.

Dependencies

Apache Flink ships with multiple Kafka connectors: universal, 0.10, and 0.11. This universal Kafka connector attempts to track the latest version of the Kafka client. The version of the client it uses may change between Flink releases. Modern Kafka clients are backwards compatible with broker versions 0.10.0 or later. For most users the universal Kafka connector is the most appropriate. However, for Kafka versions 0.11.x and 0.10.x, we recommend using the dedicated 0.11 and 0.10 connectors, respectively. For details on Kafka compatibility, please refer to the official Kafka documentation.

Kafka VersionMaven dependencySQL Client JAR
universalflink-connector-kafka_2.11Download
0.11.xflink-connector-kafka-0.11_2.11Download
0.10.xflink-connector-kafka-0.10_2.11Download

The Kafka connectors are not currently part of the binary distribution. See how to link with them for cluster execution here.

How to create a Kafka table

The example below shows how to create a Kafka table:

  1. CREATE TABLE kafkaTable (
  2. user_id BIGINT,
  3. item_id BIGINT,
  4. category_id BIGINT,
  5. behavior STRING,
  6. ts TIMESTAMP(3)
  7. ) WITH (
  8. 'connector' = 'kafka',
  9. 'topic' = 'user_behavior',
  10. 'properties.bootstrap.servers' = 'localhost:9092',
  11. 'properties.group.id' = 'testGroup',
  12. 'format' = 'csv',
  13. 'scan.startup.mode' = 'earliest-offset'
  14. )

Connector Options

OptionRequiredDefaultTypeDescription
connector
required(none)StringSpecify what connector to use, for Kafka the options are: ‘kafka’, ‘kafka-0.11’, ‘kafka-0.10’.
topic
required(none)StringTopic name from which the table is read.
properties.bootstrap.servers
required(none)StringComma separated list of Kafka brokers.
properties.group.id
required by source(none)StringThe id of the consumer group for Kafka source, optional for Kafka sink.
format
required(none)StringThe format used to deserialize and serialize Kafka messages. The supported formats are ‘csv’, ‘json’, ‘avro’, ‘debezium-json’ and ‘canal-json’. Please refer to Formats page for more details and more format options.
scan.startup.mode
optionalgroup-offsetsStringStartup mode for Kafka consumer, valid values are ‘earliest-offset’, ‘latest-offset’, ‘group-offsets’, ‘timestamp’ and ‘specific-offsets’. See the following Start Reading Position for more details.
scan.startup.specific-offsets
optional(none)StringSpecify offsets for each partition in case of ‘specific-offsets’ startup mode, e.g. ‘partition:0,offset:42;partition:1,offset:300’.
scan.startup.timestamp-millis
optional(none)LongStart from the specified epoch timestamp (milliseconds) used in case of ‘timestamp’ startup mode.
sink.partitioner
optional(none)StringOutput partitioning from Flink’s partitions into Kafka’s partitions. Valid values are
  • fixed: each Flink partition ends up in at most one Kafka partition.
  • round-robin: a Flink partition is distributed to Kafka partitions round-robin.
  • Custom FlinkKafkaPartitioner subclass: e.g. ‘org.mycompany.MyPartitioner’.

Features

Start Reading Position

The config option scan.startup.mode specifies the startup mode for Kafka consumer. The valid enumerations are:

  • group-offsets: start from committed offsets in ZK / Kafka brokers of a specific consumer group.
  • earliest-offset: start from the earliest offset possible.
  • latest-offset: start from the latest offset.
  • timestamp: start from user-supplied timestamp for each partition.
  • specific-offsets: start from user-supplied specific offsets for each partition.

The default option value is group-offsets which indicates to consume from last committed offsets in ZK / Kafka brokers.

If timestamp is specified, another config option scan.startup.timestamp-millis is required to specify a specific startup timestamp in milliseconds since January 1, 1970 00:00:00.000 GMT.

If specific-offsets is specified, another config option scan.startup.specific-offsets is required to specify specific startup offsets for each partition, e.g. an option value partition:0,offset:42;partition:1,offset:300 indicates offset 42 for partition 0 and offset 300 for partition 1.

Changelog Source

Flink natively supports Kafka as a changelog source. If messages in Kafka topic is change event captured from other databases using CDC tools, then you can use a CDC format to interpret messages as INSERT/UPDATE/DELETE messages into Flink SQL system. Flink provides two CDC formats debezium-json and canal-json to interpret change events captured by Debezium and Canal. The changelog source is a very useful feature in many cases, such as synchronizing incremental data from databases to other systems, auditing logs, materialized views on databases, temporal join changing history of a database table and so on. See more about how to use the CDC formats in debezium-json and canal-json.

Sink Partitioning

The config option sink.partitioner specifies output partitioning from Flink’s partitions into Kafka’s partitions. By default, a Kafka sink writes to at most as many partitions as its own parallelism (each parallel instance of the sink writes to exactly one partition). In order to distribute the writes to more partitions or control the routing of rows into partitions, a custom sink partitioner can be provided. The round-robin partitioner is useful to avoid an unbalanced partitioning. However, it will cause a lot of network connections between all the Flink instances and all the Kafka brokers.

Consistency guarantees

By default, a Kafka sink ingests data with at-least-once guarantees into a Kafka topic if the query is executed with checkpointing enabled.

Data Type Mapping

Kafka stores message keys and values as bytes, so Kafka doesn’t have schema or data types. The Kafka messages are deserialized and serialized by formats, e.g. csv, json, avro. Thus, the data type mapping is determined by specific formats. Please refer to Formats pages for more details.