Hadoop FileSystem 连接器

这个连接器可以向所有 Hadoop FileSystem 支持的文件系统写入分区文件。使用前,需要在工程里添加下面的依赖:

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-connector-filesystem_2.11</artifactId>
  4. <version>1.9.0</version>
  5. </dependency>

注意连接器目前还不是二进制发行版的一部分,添加依赖、打包配置以及集群运行信息请参考 这里

分桶文件 Sink

关于分桶的配置我们后面会有讲述,这里先创建一个分桶 sink,默认情况下这个 sink 会将数据写入到按照时间切分的滚动文件中:

  1. DataStream<String> input = ...;
  2. input.addSink(new BucketingSink<String>("/base/path"));
  1. val input: DataStream[String] = ...
  2. input.addSink(new BucketingSink[String]("/base/path"))

初始化时只需要一个参数,这个参数表示分桶文件存储的路径。分桶 sink 可以通过指定自定义的 bucketer、 writer 和 batch 值进一步配置。

默认情况下,当数据到来时,分桶 sink 会按照系统时间对数据进行切分,并以 "yyyy-MM-dd—HH" 的时间格式给每个桶命名。然后 DateTimeFormatter 按照这个时间格式将当前系统时间以 JVM 默认时区转换成分桶的路径。用户可以自定义时区来生成分桶的路径。每遇到一个新的日期都会产生一个新的桶。例如,如果时间的格式以分钟为粒度,那么每分钟都会产生一个桶。每个桶都是一个目录,目录下包含了几个部分文件(part files):每个 sink 的并发实例都会创建一个属于自己的部分文件,当这些文件太大的时候,sink 会产生新的部分文件。当一个桶不再活跃时,打开的部分文件会刷盘并且关闭。如果一个桶最近一段时间都没有写入,那么这个桶被认为是不活跃的。sink 默认会每分钟检查不活跃的桶、关闭那些超过一分钟没有写入的桶。这些行为可以通过 BucketingSinksetInactiveBucketCheckInterval()setInactiveBucketThreshold() 进行设置。

可以调用BucketingSinksetBucketer() 方法指定自定义的 bucketer,如果需要的话,也可以使用一个元素或者元组属性来决定桶的路径。

默认的 writer 是 StringWriter。数据到达时,通过 toString() 方法得到内容,内容以换行符分隔,StringWriter 将数据内容写入部分文件。可以通过 BucketingSinksetWriter() 指定自定义的 writer。SequenceFileWriter 支持写入 HadoopSequenceFiles,并且可以配置是否开启压缩。

关闭部分文件和打开新部分文件的时机可以通过两个配置来确定:

  • 设置文件大小(默认文件大小是384MB)
  • 设置文件滚动周期,单位是毫秒(默认滚动周期是 Long.MAX_VALUE

当上述两个条件中的任意一个被满足,都会生成一个新的部分文件。

示例:

  1. DataStream<Tuple2<IntWritable,Text>> input = ...;
  2. BucketingSink<Tuple2<IntWritable,Text>> sink = new BucketingSink<Tuple2<IntWritable,Text>>("/base/path");
  3. sink.setBucketer(new DateTimeBucketer<>("yyyy-MM-dd--HHmm", ZoneId.of("America/Los_Angeles")));
  4. sink.setWriter(new SequenceFileWriter<IntWritable, Text>());
  5. sink.setBatchSize(1024 * 1024 * 400); // this is 400 MB,
  6. sink.setBatchRolloverInterval(20 * 60 * 1000); // this is 20 mins
  7. input.addSink(sink);
  1. // the SequenceFileWriter only works with Flink Tuples
  2. import org.apache.flink.api.java.tuple.Tuple2
  3. val input: DataStream[Tuple2[A, B]] = ...
  4. val sink = new BucketingSink[Tuple2[IntWritable, Text]]("/base/path")
  5. sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm", ZoneId.of("America/Los_Angeles")))
  6. sink.setWriter(new SequenceFileWriter[IntWritable, Text])
  7. sink.setBatchSize(1024 * 1024 * 400) // this is 400 MB,
  8. sink.setBatchRolloverInterval(20 * 60 * 1000); // this is 20 mins
  9. input.addSink(sink)

上述代码会创建一个 sink,这个 sink 按下面的模式写入桶文件:

  1. /base/path/{date-time}/part-{parallel-task}-{count}

date-time 是我们从日期/时间格式获得的字符串,parallel-task 是 sink 并发实例的索引,count 是因文件大小或者滚动周期而产生的文件的编号。

更多信息,请参考 BucketingSink