Parquet format
Flink 支持读取 Parquet 文件并生成 Flink RowData 和 Avro 记录。 要使用 Parquet format,你需要将 flink-parquet 依赖添加到项目中:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parquet</artifactId>
<version>1.16.0</version>
</dependency>
要使用 Avro 格式,你需要将 parquet-avro 依赖添加到项目中:
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.12.2</version>
<optional>true</optional>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</exclusion>
<exclusion>
<groupId>it.unimi.dsi</groupId>
<artifactId>fastutil</artifactId>
</exclusion>
</exclusions>
</dependency>
为了在 PyFlink 作业中使用 Parquet format ,需要添加下列依赖:
PyFlink JAR |
---|
Download |
在 PyFlink 中如何添加 JAR 包依赖参见 Python 依赖管理。
此格式与新的 Source 兼容,可以同时在批和流模式下使用。 因此,你可使用此格式处理以下两类数据:
- 有界数据: 列出所有文件并全部读取。
- 无界数据:监控目录中出现的新文件
当你开启一个 File Source,会被默认为有界读取。 如果你想在连续读取模式下使用 File Source,你必须额外调用
AbstractFileSource.AbstractFileSourceBuilder.monitorContinuously(Duration)
。
Vectorized reader
Java
// Parquet rows are decoded in batches
FileSource.forBulkFileFormat(BulkFormat,Path...)
// Monitor the Paths to read data as unbounded data
FileSource.forBulkFileFormat(BulkFormat,Path...)
.monitorContinuously(Duration.ofMillis(5L))
.build();
Python
# Parquet rows are decoded in batches
FileSource.for_bulk_file_format(BulkFormat, Path...)
# Monitor the Paths to read data as unbounded data
FileSource.for_bulk_file_format(BulkFormat, Path...) \
.monitor_continuously(Duration.of_millis(5)) \
.build()
Avro Parquet reader
Java
// Parquet rows are decoded in batches
FileSource.forRecordStreamFormat(StreamFormat,Path...)
// Monitor the Paths to read data as unbounded data
FileSource.forRecordStreamFormat(StreamFormat,Path...)
.monitorContinuously(Duration.ofMillis(5L))
.build();
Python