Text files format
Flink 支持使用 TextLineInputFormat
从文件中读取文本行。此 format 使用 Java 的内置 InputStreamReader 以支持的字符集编码来解码字节流。 要使用该 format,你需要将 Flink Connector Files 依赖项添加到项目中:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>1.16.0</version>
</dependency>
PyFlink 用户可直接使用相关接口,无需添加依赖。
此 format 与新 Source 兼容,可以在批处理和流模式下使用。 因此,你可以通过两种方式使用此 format:
- 批处理模式的有界读取
- 流模式的连续读取:监视目录中出现的新文件
有界读取示例:
在此示例中,我们创建了一个 DataStream,其中包含作为字符串的文本文件的行。 此处不需要水印策略,因为记录不包含事件时间戳。
Java
final FileSource<String> source =
FileSource.forRecordStreamFormat(new TextLineInputFormat(), /* Flink Path */)
.build();
final DataStream<String> stream =
env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
Python
source = FileSource.for_record_stream_format(StreamFormat.text_line_format(), *path).build()
stream = env.from_source(source, WatermarkStrategy.no_watermarks(), "file-source")
连续读取示例: 在此示例中,我们创建了一个 DataStream,随着新文件被添加到目录中,其中包含的文本文件行的字符串流将无限增长。我们每秒会进行新文件监控。 此处不需要水印策略,因为记录不包含事件时间戳。
Java
final FileSource<String> source =
FileSource.forRecordStreamFormat(new TextLineInputFormat(), /* Flink Path */)
.monitorContinuously(Duration.ofSeconds(1L))
.build();
final DataStream<String> stream =
env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
Python
source = FileSource \
.for_record_stream_format(StreamFormat.text_line_format(), *path) \
.monitor_continously(Duration.of_seconds(1)) \
.build()
stream = env.from_source(source, WatermarkStrategy.no_watermarks(), "file-source")