Text files format

Flink 支持使用 TextLineInputFormat 从文件中读取文本行。此 format 使用 Java 的内置 InputStreamReader 以支持的字符集编码来解码字节流。 要使用该 format,你需要将 Flink Connector Files 依赖项添加到项目中:

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-connector-files</artifactId>
  4. <version>1.17.2</version>
  5. </dependency>

PyFlink 用户可直接使用相关接口,无需添加依赖。

此 format 与新 Source 兼容,可以在批处理和流模式下使用。 因此,你可以通过两种方式使用此 format:

  • 批处理模式的有界读取
  • 流模式的连续读取:监视目录中出现的新文件

有界读取示例:

在此示例中,我们创建了一个 DataStream,其中包含作为字符串的文本文件的行。 此处不需要水印策略,因为记录不包含事件时间戳。

Java

  1. final FileSource<String> source =
  2. FileSource.forRecordStreamFormat(new TextLineInputFormat(), /* Flink Path */)
  3. .build();
  4. final DataStream<String> stream =
  5. env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");

Python

  1. source = FileSource.for_record_stream_format(StreamFormat.text_line_format(), *path).build()
  2. stream = env.from_source(source, WatermarkStrategy.no_watermarks(), "file-source")

连续读取示例: 在此示例中,我们创建了一个 DataStream,随着新文件被添加到目录中,其中包含的文本文件行的字符串流将无限增长。我们每秒会进行新文件监控。 此处不需要水印策略,因为记录不包含事件时间戳。

Java

  1. final FileSource<String> source =
  2. FileSource.forRecordStreamFormat(new TextLineInputFormat(), /* Flink Path */)
  3. .monitorContinuously(Duration.ofSeconds(1L))
  4. .build();
  5. final DataStream<String> stream =
  6. env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");

Python

  1. source = FileSource \
  2. .for_record_stream_format(StreamFormat.text_line_format(), *path) \
  3. .monitor_continously(Duration.of_seconds(1)) \
  4. .build()
  5. stream = env.from_source(source, WatermarkStrategy.no_watermarks(), "file-source")