Confluent Avro Format

Format: Serialization Schema Format: Deserialization Schema

Avro Schema Registry (avro-confluent) 格式能让你读取被 io.confluent.kafka.serializers.KafkaAvroSerializer 序列化的记录,以及可以写入成能被 io.confluent.kafka.serializers.KafkaAvroDeserializer 反序列化的记录。

当以这种格式读取(反序列化)记录时,将根据记录中编码的 schema 版本 id 从配置的 Confluent Schema Registry 中获取 Avro writer schema ,而从 table schema 中推断出 reader schema。

当以这种格式写入(序列化)记录时,Avro schema 是从 table schema 中推断出来的,并会用来检索要与数据一起编码的 schema id。我们会在配置的 Confluent Schema Registry 中配置的 subject 下,检索 schema id。subject 通过 avro-confluent.subject 参数来制定。

Avro Schema Registry 格式只能与 Apache Kafka SQL 连接器Upsert Kafka SQL 连接器一起使用。

依赖

In order to use the Avro Schema Registry format the following dependencies are required for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.

Maven dependencySQL Client
  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-avro-confluent-registry</artifactId>
  4. <version>1.16.0</version>
  5. </dependency><dependency>
  6. <groupId>org.apache.flink</groupId>
  7. <artifactId>flink-avro</artifactId>
  8. <version>1.16.0</version>
  9. </dependency>
Copied to clipboard!
Download

For Maven, SBT, Gradle, or other build automation tools, please also ensure that Confluent’s maven repository at https://packages.confluent.io/maven/ is configured in your project’s build files.

如何创建使用 Avro-Confluent 格式的表

以下是一个使用 Kafka 连接器和 Confluent Avro 格式创建表的示例。

SQL

使用原始的 UTF-8 字符串作为 Kafka 的 key,Schema Registry 中注册的 Avro 记录作为 Kafka 的 values 的表的示例:

  1. CREATE TABLE user_created (
  2. -- 该列映射到 Kafka 原始的 UTF-8 key
  3. the_kafka_key STRING,
  4. -- 映射到 Kafka value 中的 Avro 字段的一些列
  5. id STRING,
  6. name STRING,
  7. email STRING
  8. ) WITH (
  9. 'connector' = 'kafka',
  10. 'topic' = 'user_events_example1',
  11. 'properties.bootstrap.servers' = 'localhost:9092',
  12. -- UTF-8 字符串作为 Kafka keys,使用表中的 'the_kafka_key'
  13. 'key.format' = 'raw',
  14. 'key.fields' = 'the_kafka_key',
  15. 'value.format' = 'avro-confluent',
  16. 'value.avro-confluent.url' = 'http://localhost:8082',
  17. 'value.fields-include' = 'EXCEPT_KEY'
  18. )

我们可以像下面这样将数据写入到 kafka 表中:

  1. INSERT INTO user_created
  2. SELECT
  3. -- user id 复制至映射到 kafka key 的列中
  4. id as the_kafka_key,
  5. -- 所有的 values
  6. id, name, email
  7. FROM some_table

Kafka 的 key 和 value 在 Schema Registry 中都注册为 Avro 记录的表的示例:

  1. CREATE TABLE user_created (
  2. -- 该列映射到 Kafka key 中的 Avro 字段 'id'
  3. kafka_key_id STRING,
  4. -- 映射到 Kafka value 中的 Avro 字段的一些列
  5. id STRING,
  6. name STRING,
  7. email STRING
  8. ) WITH (
  9. 'connector' = 'kafka',
  10. 'topic' = 'user_events_example2',
  11. 'properties.bootstrap.servers' = 'localhost:9092',
  12. -- 注意:由于哈希分区,在 Kafka key 的上下文中,schema 升级几乎从不向后也不向前兼容。
  13. 'key.format' = 'avro-confluent',
  14. 'key.avro-confluent.url' = 'http://localhost:8082',
  15. 'key.fields' = 'kafka_key_id',
  16. -- 在本例中,我们希望 Kafka key value Avro 类型都包含 'id' 字段
  17. -- => 给表中与 Kafka key 字段关联的列添加一个前缀来避免冲突
  18. 'key.fields-prefix' = 'kafka_key_',
  19. 'value.format' = 'avro-confluent',
  20. 'value.avro-confluent.url' = 'http://localhost:8082',
  21. 'value.fields-include' = 'EXCEPT_KEY',
  22. -- Flink 1.13 起,subjects 具有一个默认值, 但是可以被覆盖:
  23. 'key.avro-confluent.subject' = 'user_events_example2-key2',
  24. 'value.avro-confluent.subject' = 'user_events_example2-value2'
  25. )

使用 upsert-kafka 连接器,Kafka 的 value 在 Schema Registry 中注册为 Avro 记录的表的示例:

  1. CREATE TABLE user_created (
  2. -- 该列映射到 Kafka 原始的 UTF-8 key
  3. kafka_key_id STRING,
  4. -- 映射到 Kafka value 中的 Avro 字段的一些列
  5. id STRING,
  6. name STRING,
  7. email STRING,
  8. -- upsert-kafka 连接器需要一个主键来定义 upsert 行为
  9. PRIMARY KEY (kafka_key_id) NOT ENFORCED
  10. ) WITH (
  11. 'connector' = 'upsert-kafka',
  12. 'topic' = 'user_events_example3',
  13. 'properties.bootstrap.servers' = 'localhost:9092',
  14. -- UTF-8 字符串作为 Kafka keys
  15. -- 在本例中我们不指定 'key.fields',因为它由表的主键决定
  16. 'key.format' = 'raw',
  17. -- 在本例中,我们希望 Kafka key value Avro 类型都包含 'id' 字段
  18. -- => 给表中与 Kafka key 字段关联的列添加一个前缀来避免冲突
  19. 'key.fields-prefix' = 'kafka_key_',
  20. 'value.format' = 'avro-confluent',
  21. 'value.avro-confluent.url' = 'http://localhost:8082',
  22. 'value.fields-include' = 'EXCEPT_KEY'
  23. )

Format 参数

参数是否必选默认值类型描述
format
required(none)StringSpecify what format to use, here should be ‘avro-confluent’.
avro-confluent.basic-auth.credentials-source
optional(none)StringBasic auth credentials source for Schema Registry
avro-confluent.basic-auth.user-info
optional(none)StringBasic auth user info for schema registry
avro-confluent.bearer-auth.credentials-source
optional(none)StringBearer auth credentials source for Schema Registry
avro-confluent.bearer-auth.token
optional(none)StringBearer auth token for Schema Registry
avro-confluent.properties
optional(none)MapProperties map that is forwarded to the underlying Schema Registry. This is useful for options that are not officially exposed via Flink config options. However, note that Flink options have higher precedence.
avro-confluent.ssl.keystore.location
optional(none)StringLocation / File of SSL keystore
avro-confluent.ssl.keystore.password
optional(none)StringPassword for SSL keystore
avro-confluent.ssl.truststore.location
optional(none)StringLocation / File of SSL truststore
avro-confluent.ssl.truststore.password
optional(none)StringPassword for SSL truststore
avro-confluent.subject
optional(none)StringThe Confluent Schema Registry subject under which to register the schema used by this format during serialization. By default, ‘kafka’ and ‘upsert-kafka’ connectors use ‘<topic_name>-value’ or ‘<topic_name>-key’ as the default subject name if this format is used as the value or key format. But for other connectors (e.g. ‘filesystem’), the subject option is required when used as sink.
avro-confluent.url
required(none)StringThe URL of the Confluent Schema Registry to fetch/register schemas.

数据类型映射

目前 Apache Flink 都是从 table schema 去推断反序列化期间的 Avro reader schema 和序列化期间的 Avro writer schema。显式地定义 Avro schema 暂不支持。 Apache Avro Format中描述了 Flink 数据类型和 Avro 类型的对应关系。

除了此处列出的类型之外,Flink 还支持读取/写入可为空(nullable)的类型。 Flink 将可为空的类型映射到 Avro union(something, null), 其中 something 是从 Flink 类型转换的 Avro 类型。

您可以参考 Avro Specification 以获取有关 Avro 类型的更多信息。