Parquet format

Flink 支持读取 Parquet 文件并生成 Flink RowDataAvro 记录。 要使用 Parquet format,你需要将 flink-parquet 依赖添加到项目中:

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-parquet</artifactId>
  4. <version>1.16.0</version>
  5. </dependency>

要使用 Avro 格式,你需要将 parquet-avro 依赖添加到项目中:

  1. <dependency>
  2. <groupId>org.apache.parquet</groupId>
  3. <artifactId>parquet-avro</artifactId>
  4. <version>1.12.2</version>
  5. <optional>true</optional>
  6. <exclusions>
  7. <exclusion>
  8. <groupId>org.apache.hadoop</groupId>
  9. <artifactId>hadoop-client</artifactId>
  10. </exclusion>
  11. <exclusion>
  12. <groupId>it.unimi.dsi</groupId>
  13. <artifactId>fastutil</artifactId>
  14. </exclusion>
  15. </exclusions>
  16. </dependency>

为了在 PyFlink 作业中使用 Parquet format ,需要添加下列依赖:

PyFlink JAR
Download

在 PyFlink 中如何添加 JAR 包依赖参见 Python 依赖管理

此格式与新的 Source 兼容,可以同时在批和流模式下使用。 因此,你可使用此格式处理以下两类数据:

  • 有界数据: 列出所有文件并全部读取。
  • 无界数据:监控目录中出现的新文件

当你开启一个 File Source,会被默认为有界读取。 如果你想在连续读取模式下使用 File Source,你必须额外调用 AbstractFileSource.AbstractFileSourceBuilder.monitorContinuously(Duration)

Vectorized reader

Java

  1. // Parquet rows are decoded in batches
  2. FileSource.forBulkFileFormat(BulkFormat,Path...)
  3. // Monitor the Paths to read data as unbounded data
  4. FileSource.forBulkFileFormat(BulkFormat,Path...)
  5. .monitorContinuously(Duration.ofMillis(5L))
  6. .build();

Python

  1. # Parquet rows are decoded in batches
  2. FileSource.for_bulk_file_format(BulkFormat, Path...)
  3. # Monitor the Paths to read data as unbounded data
  4. FileSource.for_bulk_file_format(BulkFormat, Path...) \
  5. .monitor_continuously(Duration.of_millis(5)) \
  6. .build()

Avro Parquet reader

Java

  1. // Parquet rows are decoded in batches
  2. FileSource.forRecordStreamFormat(StreamFormat,Path...)
  3. // Monitor the Paths to read data as unbounded data
  4. FileSource.forRecordStreamFormat(StreamFormat,Path...)
  5. .monitorContinuously(Duration.ofMillis(5L))
  6. .build();

Python