Avro format

Flink 内置支持 Apache Avro 格式。在 Flink 中将更容易地读写基于 Avro schema 的 Avro 数据。 Flink 的序列化框架可以处理基于 Avro schemas 生成的类。为了能够使用 Avro format,需要在自动构建工具(例如 Maven 或 SBT)中添加如下依赖到项目中。

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-avro</artifactId>
  4. <version>1.16.0</version>
  5. </dependency>

为了在 PyFlink 作业中使用 Avro format ,需要添加下列依赖:

PyFlink JAR
Download

在 PyFlink 中如何添加 JAR 包依赖参见 Python 依赖管理

如果读取 Avro 文件数据,你必须指定 AvroInputFormat

示例

  1. AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class);
  2. DataStream<User> usersDS = env.createInput(users);

注意,User 是一个通过 Avro schema生成的 POJO 类。Flink 还允许选择 POJO 中字符串类型的键。例如:

  1. usersDS.keyBy("name");

注意,在 Flink 中可以使用 GenericData.Record 类型,但是不推荐使用。因为该类型的记录中包含了完整的 schema,导致数据非常密集,使用起来可能很慢。

Flink 的 POJO 字段选择也适用于从 Avro schema 生成的 POJO 类。但是,只有将字段类型正确写入生成的类时,才能使用。如果字段是 Object 类型,则不能将该字段用作 join 键或 grouping 键。 在 Avro 中如 {"name": "type_double_test", "type": "double"}, 这样指定字段是可行的,但是如 ({"name": "type_double_test", "type": ["double"]},) 这样指定包含一个字段的复合类型就会生成 Object 类型的字段。注意,如 ({"name": "type_double_test", "type": ["null", "double"]},) 这样指定 nullable 类型字段也是可能产生 Object 类型的!

在 Python 作业中读取 Avro 文件,需要先定义 Avro schema,产生的 DataStream 元素为原生的 Python 对象 Generic。例如:

  1. schema = AvroSchema.parse_string("""
  2. {
  3. "type": "record",
  4. "name": "User",
  5. "fields": [
  6. {"name": "name", "type": "string"},
  7. {"name": "favoriteNumber", "type": ["int", "null"]},
  8. {"name": "favoriteColor", "type": ["string", "null"]}
  9. ]
  10. }
  11. """)
  12. env = StreamExecutionEnvironment.get_execution_environment()
  13. ds = env.create_input(AvroInputFormat(AVRO_FILE_PATH, schema))
  14. def json_dumps(record):
  15. import json
  16. return json.dumps(record)
  17. ds.map(json_dumps).print()